スパークスケーラビリティ、:DirectStream経由カフカからJSONsを読んカフカ+は、我々は非常に単純なスパークストリーミングジョブ(Javaで実装)している
- は
- がJSONsを解析する(カフカメッセージのACKはオフになっています) (value = object)のタプルにPOJOをマップする
- reduceByKey(カスタム削減関数 - 常に1フィールドの品質をオブジェクトと比較する)高品質のオブジェクトインスタンスを残す)
- 店舗
- ストアHDFSに結果 JSONs 1000のIDのセットを用いて生成さ
(キー)と、すべてのイベント(mapWithStateストアキーあたり最高品質のオブジェクトを介して)状態で結果Kafkaのトピックパーティションにランダムに配布されます。これは、ジョブが各IDの最高品質のオブジェクトのみを格納しているため、オブジェクトの結果セットが最大1000であることも意味します。
我々は、以下のパラメータをAWS EMR(m4.xlarge = 4つのコア16 GBのメモリ)のパフォーマンステストを実行していた:エグゼキュータの
- 数=ノード(ノードあたりすなわち1つのエグゼキュータ)の数 カフカパーティションの
- 数=ノードの数(すなわち我々の場合も、エグゼキュータに)
- バッチサイズ= 10(S)
- スライディングウィンドウ= 20(S)
- ウィンドウサイズ= 600(S)
- ブロックサイズ= 2000(MS)
- デフォルトの並列 - デフォルト並列である場合=ノード/エグゼキュータの数
カフカクラスタが利用されるだけ1ブローカを含み、しかし、最良の結果を得る、異なる設定を試しピークロード時に最大〜30〜40%(トピックにデータをあらかじめ入力してから、テストを独立して実行します)。私たちはnum.io.threadsとnum.network.threadsを増やそうとしましたが、大きな改善はありませんでした。 - 最大を処理することができ
- 2ノード:
ザ・彼は、パフォーマンステストの結果(連続負荷の約10分)(YARNマスターとドライバのノードが怒鳴るノード数の上にある)となりました。任意の処理遅延
- 5ノードなしで150の000イベント/秒 - 380の000イベント/秒=>- 280 000イベント/秒=>25%ペナルティ "はほぼ線形スケーラビリティ"
- 10ノード予想と比較した場合50%のペナルティ期待される「ほぼ直線的なスケーラビリティ」
に比べて2つのノードの場合のCPU使用率は〜
我々はまた、などの他の設定を中心に演奏された場合: - パーティションのテストロー/ハイ番号を - defaultParallelismの低/高/デフォルト値のテスト - より多くのエグゼキュータ(つまり、リソースを例えば10の代わりに30人のエグゼクティブ) しかし、上記の設定は私たちに最高の結果を与えていました。
だから、カフカ+スパーク(ほぼ)線形スケーラブルなのですか?私たちのテストよりもはるかにスケーラビリティが必要な場合 - どのように改善できるか。私たちの目標は、何百/何千ものSparkエグゼキュータをサポートすることです(スケーラビリティは私たちにとって非常に重要です)。
あなたのユースケースでは、reduceByKeyで完全なデータシャッフルを行っています。これは、クラスタの規模を拡大するにつれてますます高価になると思います。少なくとも、グローバルパフォーマンスは、エグゼキュータを追加したときに悪化する可能性のある最悪のエグゼキュータパフォーマンスのパフォーマンスによって制限されます。 Kafkaパーティショナーを使用して、特定のIDのすべてのメッセージを単一のパーティションに入れることができますか?それは私が考えるほとんど直線的なスケールを許すべきです。 – C4stor
クラスタには何台のカフカサーバがありますか?それらの間にパーティション複製のセットアップがありますか? –
多くの異なる部分を考慮する必要があります。あなたのカフカクラスターにはいくつのパーティションがありますか?チェックポイント間隔はどれくらいですか? –