私は "トランザクション"を含む単純なテキストファイルを持っています。PySpark - テキストファイルからデータフレームを作成する
1行目は列名です。 "START_TIME"、 "END_TIME"、 "SIZE" ...約100個の列名。
ファイル内の列名は引用符で囲まれていません。
私は列名と、データフレームに、このファイルを変換するには、スパークを使用したい、
、その後は、ファイルからのすべての列が、いくつかの特定の列を削除します。
テキストファイルをデータフレームに変換するのに少し問題があります。
ここに私のコードは、これまでのところです:私が持っている
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# Load relevant objects
sc = SparkContext('local')
log_txt = sc.textFile("/path/to/text/file.txt")
sqlContext = SQLContext(sc)
# Construct fields with names from the header, for creating a DataFrame
header = log_txt.first()
fields = [StructField(field_name, StringType(), True)
for field_name in header.split(',')]
# Only columns\fields 2,3,13,92 are relevant. set them to relevant types
fields[2].dataType = TimestampType() # START_TIME in yyyymmddhhmmss format
fields[3].dataType = TimestampType() # END_TIME in yyyymmddhhmmss
fields[13].dataType = IntegerType() # DOWNSTREAM_SIZE, in bytes
fields[92].dataType = BooleanType() # IS_CELL_CONGESTED, 0 or 1
schema = StructType(fields) # Create a schema object
# Build the DataFrame
log_txt = log_txt.filter(lambda line: line != header) # Remove header from the txt file
temp_var = log_txt.map(lambda k: k.split("\t"))
log_df = sqlContext.createDataFrame(temp_var, schema) # PROBLEMATIC LINE
問題は、私は私がその最終段階の前に、いくつかのステップを欠けている恐れ、最後の行です。
どの手順が欠落しているかを教えてもらえますか?
最後の行のコードでは、多くのエラーが発生します。 必要に応じてポストで更新します。
ファイル形式は、(2つのラインの一例)である
TRANSACTION_URL,RESPONSE_CODE,START_TIME,END_TIME,.... <more names>
http://www.google.com<\t seperator>0<\t seperator>20160609182001<\t seperator>20160609182500.... <more values>
http://www.cnet.com<\t seperator>0<\t seperator>20160609192001<\t seperator>20160609192500.... <more values>
また、誰かがその構築された後、データフレームから不要な列を削除するには、私を助けてくださいことができますか?私はあなたがそれを少しoverthinkingていると思う
おかげ
あなたが作ったスキーマを、あなたは '使用するときに、あなたが提供するデータに適合しませんcreateDataFrame'。すべての列を使ってDataFrameを作成し、次に 'log_df.select(" columns "、" you "、" want ")'を実行することをお勧めしますか? –
@JamesTobin、私は理解していない - なぜ私が作ったスキーマは、私が提供する適合データではありませんか? 私は、テキストファイルのヘッダー行からスキーマを作成しています。 私の意図では、まず、関連する列のみを選択するよりも、すべての列でDataFrameを作成してください。 'fields [i] .dataType = ...'は最後に必要な列だけですので、すべての列に対してdataTypeを定義するポイントはありません。 – Adiel
"a、b、c"を持つファイルが 'temp_var'として持っていると想像してください。 "a、b、c"と言うと、スキーマ(a:String、c:String)を使用しようとします。 sparkは 'b'と何をするべきか分かりません。 defaultsはピックを列のdtypeとして検出し、列をフィルタリングしてから、選択した列のdtypeを必要なものに変更します。 –