2016-12-15 15 views
2

カフカで終わるイベントを受け取りました。これらのイベントから、私はKafka Streamsアプリケーションを使用してIDを取得し、別のトピックの(id、1)のペアとしてKafkaにポストします。 idがすでにElasticSearchに存在するかどうかを確認したい場合は、そのカウンタを更新します。そうでない場合は、カフカのIDとカウンタを1に設定したElasticSearchで新しいレコードを作成します。 ESに。カフカでアップセートが可能ElasticSearchに接続

私はこのためにKafka Connect to ElasticSearchを使用したいと考えていましたが、可能であればそれは簡単ではないようです。私は、ESにレコードを追加することはできますが、既存のレコードとマージするのは、私がまだ知りませんでした。これはすでに可能ですか?そうであれば、近くのリリースでどのようにして、どうやったら可能でしょうか?

答えて

2

私はdatamountaineer ES sink connectorをフォークしてUpsertを許可しました。これを使用すると、PKを指定してdocAsUpsertを使用してESにアップデートを実行できます。プロジェクトを取得し、Jarファイルをmy github forkからコンパイルすることができます。

関連する問題