2016-08-17 6 views
0

私のKafka Connect Sinkタスクのput()メソッドがトリガーされる間隔を制御できますか?この点でKafka Connectフレームワークの期待される動作は何ですか?理想的には、例えば、「新しいレコードX個/新しいバイトYか、最後の呼び出しからZミリ秒が経過しない限り、私に電話しないでください」と指定したいと思います。これにより、潜在的にシンクタスク内のバッチ処理ロジックがよりシンプルになる可能性があります(documentation、 "は多くの場合、内部バッファリングが便利なのでレコードのバッチ全体を一度に送信できるため、イベントをダウンストリームデータストアdeliverMessagesWorkerSinkTaskに呼び出されたときSinkTaskから入れ)。Kafka Connectシンクタスクでput()が発生する頻度は?

答えて

0

今日では、唯一と呼ばれている。良いニュースは、deliverMessagesが起こるだけの時間はそう、あなたが新しいポーリングする頻度をある程度制御することが必要poll内にあるということですレコード:overriding consumer properties

内部バッファリングを行う場合は、 HDFSConnectorがこれをどのように扱っているかを見てみましょう。implementation of SinkTask。しかし、今すぐConnectは投票によって返されたすべてのレコードを入れます。

これらのことは、ダウンストリームシステムに到達する前にメッセージをバッチすることを実際に検討している場合は、flush()が呼び出される頻度を制御するoffset.flush.interval.ms and offset.flush.timeout.msを検討するとよいでしょう。

関連する問題