2016-12-19 7 views
0

私はスパークストリーミングアプリケーションでcreateDirectStreamを使用します。私はバッチ間隔を7秒に設定し、バッチジョブは約5秒以内に終了することができます。しかし、非常にまれなケースでは、バッチジョブのコストは60秒で済みます。これにより、一部のバッチジョブが遅れることになります。 合計遅延時間を短縮するために、遅延したジョブを一度に広げるより多くのストリーミングデータを処理できることを願っています。これにより、できるだけ早くストリーミングが正常に戻るのに役立ちます。スパークカフカコンシューマの入力のバッチサイズを動的に更新する

したがって、遅延が発生したときにスパークとカフカの入力のバッチサイズを動的に更新/マージする方法があることを知りたいと思います。

答えて

0

"spark.streaming.backpressure.enabled"オプションをtrueに設定できます。

バックプレッシャオプションがtrueのときにバッチ遅延が発生すると、最初は小さいバッチサイズで開始され、大きなバッチサイズに動的に変更されます。

See the spark configuration document.

あなたは以下の説明を見ることができます。

スパークストリーミングの内部バックプレッシャ機構 (1.5以降)を有効または無効にします。これにより、スパークストリーミングは現在のバッチスケジューリングの遅延時間と処理時間に基づいて受信 レートを制御することができるため、システムが処理できる速度でシステムが受信できるようになります。 これは内部的に、受信者の最大受信レートを動的に設定します( )。このレートは、値が 、上限がspark.streaming.receiver.maxRate、 のspark.streaming.kafka.maxRatePerPartitionが設定されている場合は上限になります(下記参照)。

関連する問題