1

私は "トランザクション"を含む単純なテキストファイルを持っています。PySpark - テキストファイルからデータフレームを作成する

1行目は列名です。 "START_TIME"、 "END_TIME"、 "SIZE" ...約100個の列名。

ファイル内の列名は引用符で囲まれていません。

私は列名と、データフレームに、このファイルを変換するには、スパークを使用したい、

、その後は、ファイルからのすべての列が、いくつかの特定の列を削除します。

テキストファイルをデータフレームに変換するのに少し問題があります。

ここに私のコードは、これまでのところです:私が持っている

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 

# Load relevant objects 
sc = SparkContext('local') 
log_txt = sc.textFile("/path/to/text/file.txt") 
sqlContext = SQLContext(sc) 

# Construct fields with names from the header, for creating a DataFrame 
header = log_txt.first() 
fields = [StructField(field_name, StringType(), True) 
     for field_name in header.split(',')] 

# Only columns\fields 2,3,13,92 are relevant. set them to relevant types 
fields[2].dataType = TimestampType() # START_TIME in yyyymmddhhmmss format 
fields[3].dataType = TimestampType() # END_TIME in yyyymmddhhmmss 
fields[13].dataType = IntegerType()  # DOWNSTREAM_SIZE, in bytes 
fields[92].dataType = BooleanType()  # IS_CELL_CONGESTED, 0 or 1 
schema = StructType(fields)    # Create a schema object 

# Build the DataFrame 
log_txt = log_txt.filter(lambda line: line != header) # Remove header from the txt file 
temp_var = log_txt.map(lambda k: k.split("\t")) 

log_df = sqlContext.createDataFrame(temp_var, schema) # PROBLEMATIC LINE 

問題は、私は私がその最終段階の前に、いくつかのステップを欠けている恐れ、最後の行です。

どの手順が欠落しているかを教えてもらえますか?

最後の行のコードでは、多くのエラーが発生します。 必要に応じてポストで更新します。

ファイル形式は、(2つのラインの一例)である

TRANSACTION_URL,RESPONSE_CODE,START_TIME,END_TIME,.... <more names> 
http://www.google.com<\t seperator>0<\t seperator>20160609182001<\t seperator>20160609182500.... <more values> 
http://www.cnet.com<\t seperator>0<\t seperator>20160609192001<\t seperator>20160609192500.... <more values> 

また、誰かがその構築された後、データフレームから不要な列を削除するには、私を助けてくださいことができますか?私はあなたがそれを少しoverthinkingていると思う

おかげ

+1

あなたが作ったスキーマを、あなたは '使用するときに、あなたが提供するデータに適合しませんcreateDataFrame'。すべての列を使ってDataFrameを作成し、次に 'log_df.select(" columns "、" you "、" want ")'を実行することをお勧めしますか? –

+0

@JamesTobin、私は理解していない - なぜ私が作ったスキーマは、私が提供する適合データではありませんか? 私は、テキストファイルのヘッダー行からスキーマを作成しています。 私の意図では、まず、関連する列のみを選択するよりも、すべての列でDataFrameを作成してください。 'fields [i] .dataType = ...'は最後に必要な列だけですので、すべての列に対してdataTypeを定義するポイントはありません。 – Adiel

+1

"a、b、c"を持つファイルが 'temp_var'として持っていると想像してください。 "a、b、c"と言うと、スキーマ(a:String、c:String)を使用しようとします。 sparkは 'b'と何をするべきか分かりません。 defaultsはピックを列のdtypeとして検出し、列をフィルタリングしてから、選択した列のdtypeを必要なものに変更します。 –

答えて

3

。 は、私たちが

`cat sample_data.txt` 
field1\tfield2\tfield3\tfield4 
0\tdog\t20160906182001\tgoogle.com 
1\tcat\t20151231120504\tamazon.com 

オープンpyspark以下、それほど複雑なもの、例を持っている想像し

データフレームは、あなたが実際にそのフィールドを使用しているかどうか、それは全体の来るすべてのフィールドの型を持っている必要があります
sc.setLogLevel("WARN") 
#setup the same way you have it 
log_txt=sc.textFile("/path/to/data/sample_data.txt") 
header = log_txt.first() 

#filter out the header, make sure the rest looks correct 
log_txt = log_txt.filter(lambda line: line != header) 
log_txt.take(10) 
    [u'0\\tdog\\t20160906182001\\tgoogle.com', u'1\\tcat\\t20151231120504\\tamazon.com'] 

temp_var = log_txt.map(lambda k: k.split("\\t")) 

#here's where the changes take place 
#this creates a dataframe using whatever pyspark feels like using (I think string is the default). the header.split is providing the names of the columns 
log_df=temp_var.toDF(header.split("\\t")) 
log_df.show() 
+------+------+--------------+----------+ 
|field1|field2|  field3| field4| 
+------+------+--------------+----------+ 
|  0| dog|20160906182001|google.com| 
|  1| cat|20151231120504|amazon.com| 
+------+------+--------------+----------+ 
#note log_df.schema 
#StructType(List(StructField(field1,StringType,true),StructField(field2,StringType,true),StructField(field3,StringType,true),StructField(field4,StringType,true))) 

# now lets cast the columns that we actually care about to dtypes we want 
log_df = log_df.withColumn("field1Int", log_df["field1"].cast(IntegerType())) 
log_df = log_df.withColumn("field3TimeStamp", log_df["field1"].cast(TimestampType())) 

log_df.show() 
+------+------+--------------+----------+---------+---------------+ 
|field1|field2|  field3| field4|field1Int|field3TimeStamp| 
+------+------+--------------+----------+---------+---------------+ 
|  0| dog|20160906182001|google.com|  0|   null| 
|  1| cat|20151231120504|amazon.com|  1|   null| 
+------+------+--------------+----------+---------+---------------+ 
log_df.schema 
StructType(List(StructField(field1,StringType,true),StructField(field2,StringType,true),StructField(field3,StringType,true),StructField(field4,StringType,true),StructField(field1Int,IntegerType,true),StructField(field3TimeStamp,TimestampType,true))) 

#now let's filter out the columns we want 
log_df.select(["field1Int","field3TimeStamp","field4"]).show() 
+---------+---------------+----------+ 
|field1Int|field3TimeStamp| field4| 
+---------+---------------+----------+ 
|  0|   null|google.com| 
|  1|   null|amazon.com| 
+---------+---------------+----------+ 

あなた次第。 文字列の日付を実際のタイムスタンプに変換するためにspark.SQL関数の1つを使用する必要がありますが、あまりにも厳しくすべきではありません。

希望これは

PSを支援します。あなたの特定のケースのために、初期データフレームを作るために、試してみてください。log_df=temp_var.toDF(header.split(','))

+0

私はspark-csvを使用してしまいましたが、私は存在していませんでしたが、あなたの答えは素晴らしいですし、私はそれを受け入れられた答えとして選択しているので、うまく動作します:) string'dタイムスタンプの変換'yyyymmddhhmmss'を' yyyy-mm-dd hh'のような実際のタイムスタンプに変換します。 私はプライベートであなたに連絡して助けてもらえますか?私はこのスレッドからフォーカスを取ることを望んでいません。 – Adiel

+0

チェックアウトhttp://stackoverflow.com/questions/29844144/better-way-to-convert-a-string-field-into-timestamp-in-spark –

関連する問題