2017-06-08 12 views
1

カフカのトピックに50レコードを挿入しました。各レコードの間に1秒の待ち時間があり、50秒以上で50レコードになります。kafkaストリーミング最初のバッチ内のすべてのデータを読み取る

このトピックのレコードを消費しているうちに、以下のコードでわかるように、バッチ間隔を1秒に保っています。理想的には〜50のRDDが必要です。 私はforeachRDDを使用してバッチでデータを取得し、各RDDのデータを処理しています。 以下のコードでは、 "call"メソッドが呼び出されるたびに、私は各RDDのレコードを数えています。驚くべきことは、最初のRDD自体が50個のレコードを表示しており、後続のRDDが0個のレコードを表示していることです。 私はこの動作を理解していません。理想的にはRDDあたり1レコードです。

私の理解が間違っていると誰も示唆できますか?

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf、Durations.seconds(1));

messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<K, String>>>() { 
      public void call(JavaRDD<ConsumerRecord<K, V>> rdd) { 
       System.out.println("NUmber of elements in RDD : "+ rdd.count()); 

       List<Row> rows = rdd.map(record -> processData(record)) 
         .reduce((rows1, rows2) -> { 
          rows1.addAll(rows2); 
          return rows1; 
         }); 

       StructType schema = DataTypes.createStructType(fields); 
       Dataset ds = ss.createDataFrame(rows, schema); 
       ds.createOrReplaceTempView("trades");     
       ds.show(); 
      } 
     }); 
+0

プロデューサとストリーミングアプリケーションを同時に実行しますか?または、ストリーミングアプリの起動時にプロデューサーが既にすべてのデータを送信していますか?後者が起こっているように見えます。 – maasg

+0

あなたは正しいです、これは一つずつ起こっていて同時に起こっていません。 しかし、これの背後にある論理は何ですか? –

+0

レコードがトピックに上がった時間にバッチ間隔が使用されていませんか? –

答えて

1

これはあなたが見ていることを引き起こすと予想される動作です。ストリーミングジョブを開始し、カフカのトピックにデータを入力した場合(1秒ごとに1レコード)、バッチサイズは期待どおりになっていました。 (未かなり...のでカフカのトピックパーティション)

しかし、あなたが、それはカフカのパーティションごとに動作することを心に留めておく必要がありmax.rate.per.partition

と呼ばれるスパークストリーミングパラメータを使用して、あなたが望むものを達成することが可能です。

例:

あなたのカフカのトピックに3つのパーティションがある場合。あなたのbatch size is 1 secondmax.rate.per.partition = 1

以下の構成では、RDDバッチごとに3つのアイテムが得られます。

サンプルストリーミングコンテキスト:

``` 
val sparkConf = new SparkConf(). 
    set("spark.streaming.kafka.maxRatePerPartition", 
     inputParam.maxRatePerPartition) 

val ssc = new StreamingContext(sparkConf, inputParam.batchDuration) 
``` 

あなたのカフカのトピックは1つのパーティションがある場合、あなたはあなたが望むものを正確に取得します。

+0

Thnks Manas、これは役に立ちます –

+0

これは役に立ちますこれを答えとして受け入れる。いつもより多くのポイントに興奮しています。 – Manas

+0

'max.rate.per.partition'の意図は、スパークがkafkaから要求できるデータの量を制限することです。これは、コールドスタートまたはストリームのピーク時に過負荷を回避するための安全対策として機能することを意味します。それは技術を熟知し始めている人に誤解を招く可能性があるので、それはトリックを行いますが、メッセージフローをシミュレートするために1に設定することを勧めません。むしろ、プロデューサーを走らせておき、通常のスパークストリーミングの動作を観察することをお勧めします。 – maasg

0

スパークストリーミングbatch intervalデータ到着又は収集される間に、ある「マイクロバッチ」を使用します。

各バッチ間隔の締め切り時に、その時点までに到着したデータが処理のためにSparkに送信されます。ストリーミングプロセスが開始されると、すべてのデータがすでにKafkaで受信されていれば、すべてのデータが最初の間隔で一度に処理されます。その瞬間の後に到着

enter image description here

最終的に新しいデータがさらに間隔でのプロセスになります。

+0

このプレゼンテーションでは、スケジューリング(と上記の画像)について説明しています:https://www.youtube.com/watch?v=qxsOjJnwcKQ – maasg

関連する問題