2016-07-29 5 views
0

accountIdに基づいて入力ファイルを分割しようとしていますが、このパーティションはdataFramesに1000を超えるレコードが含まれている場合にのみ実行されます。 accountIdは、認識できない動的整数です。以下のコードを検討してください列値のdataFramの行数に基づいて動的パーティションを実行する方法

val ssc = new StreamingContext(sc, Seconds(2)) 
val lines = ssc.textFileStream("input") 
lines.print() 

lines.foreachRDD { rdd => 
    val count = rdd.count() 
    if (count > 0) { 
    val df = sqlContext.read.json(rdd) 
    val filteredDF = df.filter(df("accountId")==="3") 
    if (filteredDF.count() > 1000) { 
     df.write.partitionBy("accountId").format("json").save("output") 
    } 
    } 
} 

ssc.start() 
ssc.awaitTermination() 

しかし、上記のコードは不要なaccountIdをすべて分割します。

  1. データフレーム内の各accountIdの数を調べたいと思います。
  2. 各accountIdのレコードが1000を超える場合は、パーティション化された情報を出力ソースに書き込みます。例えば

入力ファイルのaccountId = 2のaccountId = 1と10のレコード1500件のレコードを持っている場合、次に出力源に= 1のaccountIdに基づいてフィルタリングデータフレームを分割し、memmory内のaccountId = 2つのレコードを保持します。

スパークストリーミングを使用してこれを達成するにはどうすればよいですか?

答えて

1

Should'dあなたは

filteredDF.write.partitionBy("accountId").format("json").save("output") 

代わりの

df.write.partitionBy("accountId").format("json").save("output") 
をやっています
関連する問題