2017-11-27 16 views
2

私はHDFSディレクトリから複数のファイルを読んでいて、ファイルごとに生成されたデータを使用して印刷されます。書き込みRDDデータ - スカラ

frequencies.foreach(x => println(x._1 + ": "+x._2)) 

そして、印刷データ(のためでありますFILE1.TXT):

'text': 45 
'data': 100 
'push': 150 

キー(FILE2.TXT)のような他のファイルのために異なる場合があります

'data': 45 
'lea': 100 
'jmp': 150 

すべてのファイルでキーが同じであるとは限りません。私はすべてのファイルデータは次の形式で.csvファイルに書き込まれるようにしたい:

Filename text data push lea jmp 
File1.txt 45 100 150 0 0 
File2.txt 0  45 0  100 150 .... 

誰かがこの問題の解決策を見つける私を助けてくださいことはできますか?

答えて

0

私はあなたのディレクトリ内のすべてのファイルに対して1つのデータフレームを作成し、それに応じて再形状にデータをpivotを使用することをお勧めしたい:

val df1 = sc.parallelize(Array(
("text",45 ), 
("data",100), 
("push",150))).toDF("key", "value").withColumn("Filename", lit("File1")) 

val df2 = sc.parallelize(Array(
("data",45 ), 
("lea",100), 
("jump",150))).toDF("key", "value").withColumn("Filename", lit("File2")) 

val df = df1.unionAll(df2) 

df.show 
+----+-----+--------+ 
| key|value|Filename| 
+----+-----+--------+ 
|text| 45| File1| 
|data| 100| File1| 
|push| 150| File1| 
|data| 45| File2| 
| lea| 100| File2| 
|jump| 150| File2| 
+----+-----+--------+ 


val finalDf = df.groupBy($"Filename").pivot("key").agg(first($"value")).na.fill(0) 

finalDf.show 
+--------+----+----+---+----+----+ 
|Filename|data|jump|lea|push|text| 
+--------+----+----+---+----+----+ 
| File1| 100| 0| 0| 150| 45| 
| File2| 45| 150|100| 0| 0| 
+--------+----+----+---+----+----+ 

あなたはDataFrameWriterを使用してCSVとしてそれを書くことができます

df.write.csv(..) 

これはWHIからFilenameのための余分な列で、各ファイルごとに異なるデータフレームを作成することになると難しい部分chデータフレームが作成されました

+0

df.write.csvを使って 'finalDf'をcsvに書き込むことができず、このエラーが発生しました:'値csvはorg.apache.spark.sql.DataFrameWriterのメンバーではありません。ありがとう@philantrovert –

+0

Spark 1.6を使用している場合、アプリケーションにdatabricks csv jarを追加する必要があります。そしてコードは 'df.write.format(" com.databricks.spark.csv ")'に変わります。もっと詳しい情報はこちら:https://github.com/databricks/spark-csv – philantrovert

+0

申し訳ありませんが、私はそれについてたくさん調べており、このgithubリポジトリからjarファイルを作成する方法や、アプリケーションに追加する方法を見つけられませんでした。もしあなたがこれらのステップをここで言及すれば大きな助けになるでしょう..ありがとう@philatrovert –

1

ファイルが十分に大きくない場合は、火花を鳴らさずに行うことができます。 ここに私のコード例があります、CSV形式は古いスタイルです、あなたの期待される出力が好きではありませんが、簡単に調整できます。

import scala.io.Source 
    import org.apache.hadoop.fs._ 
    val sparkSession = ... // I created it to retrieve hadoop configuration, you can create your own Configuration. 
    val inputPath = ... 
    val outputPath = ... 

    val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration) 
    // read all files content to Array of Map[String,String] 
    val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt")) 
    .map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines() 
        .map(_.split(":").map(_.trim)) 
        .filter(_.length == 2) 
        .map(p => (p.head, p.last)).toMap)) 
    // create default Map with all possible keys 
    val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap 
    val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2)) 
    .map(s => (s._1, s._2.values.mkString(","))) 
    .map(s => s"${s._1},${s._2}") 
    .mkString("\n") 
    val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",") 
    val csv = csvHeader + "\n" + csvContent 

    new PrintWriter(fs.create(new Path(outputPath))){ 
    write(csv) 
    close() 
    } 
関連する問題