2017-01-02 1 views
1

2つのKTablesに結合しようとしています。カフカストリーム - 2つのktablesを結合すると2回結合関数が呼び出されます

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(), 
    new JsonPOJOSerde<>(RecordBean.class), 
    bidTopic, RECORDS_STORE); 

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(), 
    new JsonPOJOSerde<>(ImpressionBean.class), 
    impressionTopic, IMPRESSIONS_STORE); 

KTable<String, RecordBean> mergedByTxId = recordsTable 
    .join(impressionsTable, merge()); 

マージ関数は非常に単純です。私はちょうど1つのbeanから別のbeanへ値をコピーしています。

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() { 
return (v1, v2) -> { 
    v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount()); 
    return v1; 
}; 

しかし、何らかの理由により、結合機能は、単一の生成レコードで2回コールしています。

Properties streamsConfiguration = new Properties(); 
streamsConfiguration 
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions"); 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); 

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect()); 
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp") 
    .getAbsolutePath()); 

return streamsConfiguration; 

プロデューサーの設定の下にストリーミング/プロデューサーの設定を参照してください -

Properties producerConfig = new Properties(); 
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); 
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); 
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); 
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 

return producerConfig; 

次の私は、ストリームごとに単一のレコードを提出しています。どちらのレコードも同じキーを持っています。 出力として単一レコードを受け取る予定です。

しかし、ValueJoinerは2回トリガし、代わりに2つの同一の出力レコードを取得しています。トリガ時間中、両方のストリームからの両方の値が存在します.2番目の実行をトリガしているものは取得できません。

参加しない - この現象は再現できません。 私は2 ktable joinの実例を見つけることができません - 私のアプローチで何が間違っているのか理解できません。

私が密集メールグループに似た質問を投稿後の説明を以下しまった

KStreamBuilder builder = new KStreamBuilder(); 

KTable<String, String> first = builder.table("stream1", "storage1"); 
KTable<String, String> second = builder.table("stream2", "storage2"); 

KTable<String, String> joined = first.join(second, (value1, value2) -> value1); 

joined.to("output"); 

KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties()); 

streams.start(); 

IntegrationTestUtils.produceKeyValuesSynchronously("stream1", 
    Arrays.asList(new KeyValue("1", "first stream")), 
    getProducerProperties()); 

IntegrationTestUtils.produceKeyValuesSynchronously("stream2", 
    Arrays.asList(new KeyValue("1", "second stream")), 
    getProducerProperties()); 

List<KeyValue<String, String>> parsedRecord = 
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(), 
     "output", 1); 
+0

Kafka Streamsのセマンティクスは扱いにくいです。詳細については、ドキュメントを参照してください:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics あなたの質問をよりよく理解するには、データの例を教えてください。 (すなわち、タイムスタンプと期待された観測結果のレコードを入力してください) –

+0

私はレコードをどのようにして作成しているのか、元のポストに詳細を追加しました。一般的に私はストリームごとに1レコードを生成しており、出力操作として1レコードを受け取ることを期待しています。何らかの理由で私は2を受け取ります。私はドキュメントを注意深く読んでいますが、 –

+0

2つの同一または2つの出力レコードを受け取っていますか?デバッグしようとしましたか?つまり、あなたの 'ValueJoiner'コードにブレークポイントを設定しましたか? –

答えて

1

同じ動作を実証し、簡単なコードを追加します。

これはキャッシュに関係すると思われます。 2つのテーブルのキャッシュは独立してフラッシュされるため、同じレコードを2回取得する可能性があります。 stream1とstream2の両方が同じキーのレコードを受信し、キャッシュがフラッシュされた場合は、次のようになります。

stream1のキャッシュがフラッシュされ、結合が実行され、レコードが生成されます。

stream2のキャッシュがフラッシュされ、結合が実行され、レコードが生成されます。

技術的には、結合の結果が別のKTableであるため、KTableの値が正しい値になるため、これは問題ありません。

次の変数を0に設定した後、StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG、0 - 問題が解決されました。 まだ2つのレコードがありますが、今では1つのレコードがnullに結合されています。

+0

これは理にかなっています。これは同じレコードを2つまたは2つの異なる出力レコードにするかどうか尋ねる理由もあります。 (あなた自身の答えを受け入れることもできます。) –

関連する問題