0
accountId
に基づいて入力ファイルを分割しようとしていますが、このパーティションはdataFramesに1000を超えるレコードが含まれている場合にのみ実行されます。 accountId
は、認識できない動的整数です。以下のコードを検討してください列値のdataFramの行数に基づいて動的パーティションを実行する方法
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val df = sqlContext.read.json(rdd)
val filteredDF = df.filter(df("accountId")==="3")
if (filteredDF.count() > 1000) {
df.write.partitionBy("accountId").format("json").save("output")
}
}
}
ssc.start()
ssc.awaitTermination()
しかし、上記のコードは不要なaccountIdをすべて分割します。
- データフレーム内の各
accountId
の数を調べたいと思います。 - 各accountIdのレコードが1000を超える場合は、パーティション化された情報を出力ソースに書き込みます。例えば
入力ファイルのaccountId = 2のaccountId = 1と10のレコード1500件のレコードを持っている場合、次に出力源に= 1のaccountIdに基づいてフィルタリングデータフレームを分割し、memmory内のaccountId = 2つのレコードを保持します。
スパークストリーミングを使用してこれを達成するにはどうすればよいですか?