カフカのトピックに50レコードを挿入しました。各レコードの間に1秒の待ち時間があり、50秒以上で50レコードになります。kafkaストリーミング最初のバッチ内のすべてのデータを読み取る
このトピックのレコードを消費しているうちに、以下のコードでわかるように、バッチ間隔を1秒に保っています。理想的には〜50のRDDが必要です。 私はforeachRDDを使用してバッチでデータを取得し、各RDDのデータを処理しています。 以下のコードでは、 "call"メソッドが呼び出されるたびに、私は各RDDのレコードを数えています。驚くべきことは、最初のRDD自体が50個のレコードを表示しており、後続のRDDが0個のレコードを表示していることです。 私はこの動作を理解していません。理想的にはRDDあたり1レコードです。
私の理解が間違っていると誰も示唆できますか?
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf、Durations.seconds(1));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<K, String>>>() {
public void call(JavaRDD<ConsumerRecord<K, V>> rdd) {
System.out.println("NUmber of elements in RDD : "+ rdd.count());
List<Row> rows = rdd.map(record -> processData(record))
.reduce((rows1, rows2) -> {
rows1.addAll(rows2);
return rows1;
});
StructType schema = DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(rows, schema);
ds.createOrReplaceTempView("trades");
ds.show();
}
});
プロデューサとストリーミングアプリケーションを同時に実行しますか?または、ストリーミングアプリの起動時にプロデューサーが既にすべてのデータを送信していますか?後者が起こっているように見えます。 – maasg
あなたは正しいです、これは一つずつ起こっていて同時に起こっていません。 しかし、これの背後にある論理は何ですか? –
レコードがトピックに上がった時間にバッチ間隔が使用されていませんか? –