2つのKTablesに結合しようとしています。カフカストリーム - 2つのktablesを結合すると2回結合関数が呼び出されます
KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(RecordBean.class),
bidTopic, RECORDS_STORE);
KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(ImpressionBean.class),
impressionTopic, IMPRESSIONS_STORE);
KTable<String, RecordBean> mergedByTxId = recordsTable
.join(impressionsTable, merge());
マージ関数は非常に単純です。私はちょうど1つのbeanから別のbeanへ値をコピーしています。
public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
return v1;
};
しかし、何らかの理由により、結合機能は、単一の生成レコードで2回コールしています。
Properties streamsConfiguration = new Properties();
streamsConfiguration
.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
.getAbsolutePath());
return streamsConfiguration;
プロデューサーの設定の下にストリーミング/プロデューサーの設定を参照してください -
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerConfig;
次の私は、ストリームごとに単一のレコードを提出しています。どちらのレコードも同じキーを持っています。 出力として単一レコードを受け取る予定です。
しかし、ValueJoinerは2回トリガし、代わりに2つの同一の出力レコードを取得しています。トリガ時間中、両方のストリームからの両方の値が存在します.2番目の実行をトリガしているものは取得できません。
参加しない - この現象は再現できません。 私は2 ktable joinの実例を見つけることができません - 私のアプローチで何が間違っているのか理解できません。
私が密集メールグループに似た質問を投稿後の説明を以下しまった
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");
KTable<String, String> joined = first.join(second, (value1, value2) -> value1);
joined.to("output");
KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
Arrays.asList(new KeyValue("1", "first stream")),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
Arrays.asList(new KeyValue("1", "second stream")),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
"output", 1);
Kafka Streamsのセマンティクスは扱いにくいです。詳細については、ドキュメントを参照してください:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics あなたの質問をよりよく理解するには、データの例を教えてください。 (すなわち、タイムスタンプと期待された観測結果のレコードを入力してください) –
私はレコードをどのようにして作成しているのか、元のポストに詳細を追加しました。一般的に私はストリームごとに1レコードを生成しており、出力操作として1レコードを受け取ることを期待しています。何らかの理由で私は2を受け取ります。私はドキュメントを注意深く読んでいますが、 –
2つの同一または2つの出力レコードを受け取っていますか?デバッグしようとしましたか?つまり、あなたの 'ValueJoiner'コードにブレークポイントを設定しましたか? –