2017-07-10 5 views
0

kafka elasticsearchコネクタを使用してelasticsearchでインデックス付けされたドキュメントのIDは、topic+partition+offsetの形式になっています。kafka elasticsearchコネクタのelasticsearch IDの生成

私はelasticsearchによって生成されたIDを使用することをお勧めします。 topic+partition+offsetは普通はユニークではないので、私はloosing dataです。

どうすれば変更できますか?

+0

あなたのユースケースをもっと説明できますか?コネクタの分析モードまたはキー値モードを使用していますか? 'topic-partition-offset'は各レコードに対してユニークでなければならず、コネクターはElasticに発行される各レコードを発行しています。どのデータが失われていますか?あなたがリンクしているフォーラムは、あなたが失っていることを言っていません。 – Phil

答えて

0

Philはコメントで - topic-partition-offsetはユニークでなければならないので、私はこれがどのようにデータ損失を引き起こしているのか分かりません。

に関係なく、コネクタにキーを生成させるか、自分でキーを定義することができます(key.ignore=false)。他の選択肢はありません。

Kafka ConnectでSingle Message Transformationsを使用すると、データのフィールドからキーを派生させることができます。 Elasticsearchフォーラムのメッセージに基づいて、idがあなたのデータにあるように見えます。もしそれがユニークであれば、それをあなたのキーとして、そしてあなたのElasticsearchドキュメントIDとしても設定できます。ここではSMTでキーを定義する例です:私はそれを見ると同じくらい

# Add the `id` field as the key using Simple Message Transformations 
transforms=InsertKey, ExtractId 

# `ValueToKey`: push an object of one of the column fields (`id`) into the key 
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey 
transforms.InsertKey.fields=id 

# `ExtractField`: convert key from an object to a plain field 
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key 
transforms.ExtractId.field=id 

https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/経由)

+0

ありがとう@ robin-moffat、私は自分のメッセージにキーを使用しません。私はいくつかの[変換](https://kafka.apache.org/documentation/#connect_transforms)を成功裏に試みました。私はTimestampRouterとRegexpRouterの変換を試みました。 elasticsearchで生成されたIDが格納されているので、[datamountaineer elastic connector](https://github.com/datamountaineer/stream-reactor/)を使用しています。これ以上のデータは失われません。 私のカフカの設定に問題があるかもしれません。 –

0

@Robinモファット、topic-partition-offsetがあなたのカフカのクラスタをアップグレードする場合の重複を引き起こすことはできませんが、ローリングアップグレードの方法ではなく、クラスタをクラスタに置き換えるだけです(これはいつかは簡単に置き換えることができます)。この場合、データを上書きするためにデータが失われます。

あなたの優れた例に関して、これは多くの場合の解決策ですが、別のオプションを追加します。おそらく、topic-partition-offsetにepocタイムスタンプ要素を追加できるので、これはtopic-partition-offset-current_timestampのようになります。

あなたはどう思いますか?

+0

タイムスタンプをどのように追加しますか?変換を使用して? –

+0

私はそれをデフォルトの 'topic-partition-offset'にハードコードとして追加したいと思います。 しかし、手動で追加できれば、これも解決できます。 しかし、ドキュメントIDはもはやユニークではないことを覚えておく必要があります。そのため、私たちは正確に一度の意味を失うかもしれません。 – davidM

+0

ここで元の質問とは多分異なるでしょうし、投稿が実際の回答よりもコメントとして追加される可能性があります。 私はおそらくあなたがdupsを得ることができるが、通常の操作ではないことに同意します - そして、もしこのようなものを生産しようとするならば、おそらく、 –