2017-01-27 15 views
2

私はディレクトリに複数のファイルを持ち、各ファイルには複数の行にまたがるテキストが含まれています。 現在私はスパークデータセット(> 2.0)へのすべてのこれらのファイルを読み取るために、次のコードを使用し各ファイルをデータセット行に書き込む

val ddf = spark.read.text("file:///input/*") 

しかし、これは各行は行ではなくファイルであるデータセットを作成します。私はデータセット内の行ごとに(文字列として)各ファイルを持っていたいと思います。

どのように各ファイルを反復処理せずに別々にRDDとして読み込むことができますか?

答えて

3

使用wholeTextFiles()

val rdd: RDD[(String, String)] = spark.sparkContext 
             .wholeTextFiles("file/path/to/read/as/rdd") 

SparkContext上SparkContext.wholeTextFilesはあなたが 複数の小さなテキストファイルを含むディレクトリを読むことができ、かつ (ファイル名、 コンテンツ)ペアとしてそれらのそれぞれを返します。これは、各ファイルの1行に1つのレコードが を返すtextFileとは対照的です。

+1

美しい答え、私が探していたものの構造を考えます。 – Tim

1

@ mrsrinivasの答えの1つは、input_file_nameでグループ化することです。

[email protected]>~/junk/so> find .   
. 
./d2 
./d2/t.txt 
./d1 
./d1/t.txt 
[email protected]>~/junk/so> cat */*.txt 
d1_1 
d1_2 
d2_1 
d2_2 

我々はそうのような入力ファイルに基づいてリストを集めることができます:

scala> val ddf = spark.read.textFile("file:///home/evan/junk/so/*"). 
    | select($"value", input_file_name as "fName") 
ddf: org.apache.spark.sql.DataFrame = [value: string, fName: string] 

scala> ddf.show(false) 
+-----+----------------------------------+ 
|value|fName        | 
+-----+----------------------------------+ 
|d2_1 |file:///home/evan/junk/so/d2/t.txt| 
|d2_2 |file:///home/evan/junk/so/d2/t.txt| 
|d1_1 |file:///home/evan/junk/so/d1/t.txt| 
|d1_2 |file:///home/evan/junk/so/d1/t.txt| 
+-----+----------------------------------+ 

scala> ddf.groupBy("fName").agg(collect_list($"value") as "value"). 
    | drop("fName").show 
+------------+ 
|  value| 
+------------+ 
|[d1_1, d1_2]| 
|[d2_1, d2_2]| 
+------------+ 
+2

'groupBy'で' input_file_name'を使ってうまくいく方法は、その関数を知らなかった:)。それでも、@ mrsrinivasの答えは少しきれいです。 – Tim

+0

確かに、私の答えは 'DataFrame'ですが、' RDD'を使った方が少し良いです。 –

+0

本当ですが、常に 'toDF'関数があります。 – Tim

関連する問題