2016-12-02 2 views
1

同様に、この質問と同じように少し異なります:KStream batch process windows、私はKStreamからのメッセージを消費者にプッシュダウンする前にバッチしたいと思います。KStreamを固定サイズのリストに集めるには?

ただし、このプッシュダウンは、固定された時間枠でスケジュールするのではなく、キーごとに固定メッセージ数のしきい値に設定する必要があります。まず第2質問について

が頭に浮かぶ:

1)これは、処理されるべき方法カスタムAbstractProcessorですか?線に沿って何か:

@Override 
public void punctuate(long streamTime) { 
    KeyValueIterator<String, Message[]> it = messageStore.all(); 
    while (it.hasNext()) 
     KeyValue<String, Message[]> entry = it.next(); 
     if (entry.value.length > 10) { 
      this.context.forward(entry.key, entry.value); 
      entry.value = new Message[10](); 
     } 
    } 
} 

2)StateStoreが潜在的に「ごみコレクト」は、このための最良の方法は何か、(場合にエントリ値を転送するために、しきい値に達することはありません)爆発するので、 ?私はタイムベースのスケジュールを実行し、古すぎるキーを削除することができます...しかし、それは非常にDIYとエラーが発生しやすいようです。

答えて

2

これはうまくいくと思います。時間ベースの「ガベージコレクション」の適用も合理的です。もちろん、DSLの代わりにProcessor APIを使用することでDIYの味があります.PAPIの目的は最初です(ユーザーに必要な処理をさせる)。しかし

いくつかのコメント:

  • あなたは、より複雑なデータ構造が必要になります。punctuate()がストリーム時の進捗状況に基づいて呼び出されたので、あなたが2の間に1つのキーのための10件の以上のレコードを持っていることを発生することがありますコール。したがって、キーごとに複数のバッチを格納できるようにするには、KeyValueIterator<String, List<Message[]>> it = messageStore.all();のようなものが必要です。
  • 私は、あなたのスケジュールがあまりにもタイトで、多くのバッチがまだ完了しておらず、CPUを浪費しているかもしれない場合、あなたのスケジュールがあまりにも緩い場合には、厄介なスケジュールを微調整する必要があると思います。多くのメモリが必要になります。下流の通信事業者は、すぐに多くのものを放出するので、多くのデータを取得します。ダウンストリームのデータを送信することが問題になる可能性があります。
  • 店舗全体をスキャンするのは費用がかかります。バッチサイズに従ってキーと値のペアを「並べ替える」ことをお勧めします。これにより、すべてのキーの代わりにバッチを完了したキーにのみタッチすることができます。おそらく、バッチを複雑にしたキーのメモリ内のリストを保持し、それらのルックアップのみを行うことができます(失敗した場合は、メモリ内のリストを再作成するためにストアのすべてのキーを1回パスする必要があります)。
+0

これらの貴重なご意見ありがとうございます。より具体的な質問は実装に続くだろう、間違いない。 – Raf

関連する問題