2016-05-24 1 views
2

私はsparkにインポートしたい同じ構造の複数のtxtファイルを持っています。次に、識別子列を作成し、データを要約し、最終的にそれらを積み重ねます。Sparkで複数のファイルを読み込んで追加する前に処理する

など。ファイルのいずれかが、次のようになります。

Date  A B C 
2/21/2014 A1 11 2 
2/22/2014 A1 11 5 
2/23/2014 A1 21 3 
2/24/2014 A1 13 5 
2/25/2014 A1 23 4 
2/26/2014 A1 28 4 
2/27/2014 A1 32 2 
2/28/2014 B1 45 4 
3/1/2014 B1 39 4 
3/2/2014 B1 29 4 
3/3/2014 B1 49 5 
3/4/2014 B1 18 4 
3/5/2014 B1 30 3 
3/6/2014 B1 50 5 

このファイルを読んだ後、私は、ファイル名と更新されたデータを言及する列を追加したい、このようになります。

Date A B C File 
2/21/2014 A1 22 2 File1 
2/22/2014 A1 36 2 File1 
2/23/2014 A1 17 4 File1 
2/24/2014 A1 30 2 File1 
2/25/2014 A1 11 2 File1 
2/26/2014 A1 32 2 File1 
2/27/2014 A1 19 5 File1 
2/28/2014 B1 22 3 File1 
3/1/2014 B1 12 5 File1 
3/2/2014 B1 50 3 File1 
3/3/2014 B1 42 4 File1 
3/4/2014 B1 37 4 File1 
3/5/2014 B1 31 5 File1 
3/6/2014 B1 20 3 File1 

そしてデータをまとめます:

File A B C 
File1 A1 167 19 
File1 B1 214 27 

同様に、別のデータセットが作成され、要約されます。最後に一緒に積み重ねる。 2の場合は、ファイル内のデータセットは、次のようになります。

File A B C 
File1 A1 167 19 
File1 B1 214 27 
File2 Z10 167 19 
File2 X20 214 27 

私は個別のデータフレームに変換することにより、プロセス、それらを、データをインポートして、最終的にそれらを積み重ねることができます。しかし、私は自動化された方法でそれを行うことができませんでした。誰でも助けてくれますか?

ありがとうございます!

+0

ロジックを試しましたか? – WoodChopper

+0

@WoodChopper:ありがとうございます。オートメーションの場合、いいえ。しかし、私は、SQLデータフレームに変換されたデータを読んで、新しい列にファイル名を追加しました。もしあなたが望めば、今まで私が使用したコードを質問そのものに入れることができます。 – Beta

答えて

1

あなたの単一のファイルがメモリに収まる場合は、ファイル名を追加することができ、そこから(ファイル名、ファイルの内容)のwholeTextFiles

rdd = sc.wholeTextFiles("/directorypath/*") 

def appender(x): 
    i = x[0] 
    j = x[1].split("\n") 
    k = [x.split() for x in j] 
    l = [x.append(i) for x in k] 
    return k 

frdd = rdd.flatMap(appender) 

df = frdd.toDF("Date","A","B","C","FileName") 

wholeTextFiles返すタプルを使用することができます。

df.groupBy("FileName","A").count() ##sum() 
+0

お返事ありがとうございました。これは本当に役に立ちました! – Beta

関連する問題