2017-07-29 12 views
2

私はApache FlinkとKafkaConsumerを使用して、Kafkaトピックからいくつかの値を読み取ります。 ファイルを読み込んでストリームを取得しました。Apache Flink動的シンク数

受信した値に応じて、このストリームを別のカフカトピックに書きたいと思います。

基本的に、私は多くの子供につながるリーダーとのネットワークを持っています。各子供のために、リーダーは、読んだストリームを子供が読むことができるように、子供特有のカフカトピックに書き込む必要があります。 子が起動すると、リーダーから読み込まれたカフカのトピックに自身を登録します。 問題は、子供がどれくらいいるのか分からないことです。

たとえば、私はKafkaトピックから1を読みました。私はTopic1という1つのカフカトピックにストリームを書きたいと思います。 私は1-2を読む2つのカフカトピックに書きたい。 (トピック1とトピック2)

トピックに書き込むために、私はAddSinkメソッドと一緒にカフカプロデューサーを使用しているので、私の理解に(そして私の仮説から) Flinkは、シンクの数を知る必要があります。

しかし、そのような動作を得る方法はありませんか?

答えて

1

私があなたの問題をよく理解していれば、処理されているレコードに基づいてカフカのトピックを選択できるので、単一のシンクで解決できると思います。また、ソースからの1つの要素が複数のトピックに書き込まれている可能性があります。この場合、各ソースレコードをN回複製する必要があります(出力トピックごとに1つずつです)。ペアとして出力することをお勧めします別名Tupple2)と(トピック、レコード)。

DataStream<Tupple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() { 
    public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) { 
     for (String topic : topics) { 
      out.collect(Tupple2.of(topic, value)); 
     } 
    } 
}); 

その後、以前にあなたがペアの最初の要素を返すようにgetTargetTopicを実装するKeyedSerializationSchemaFlinkKafkaProducerを作成することにより、計算されたトピックを使用することができます。

stream.addSink(new FlinkKafkaProducer10<>(
     "default-topic", 
     new KeyedSerializationSchema<>() { 
      public String getTargetTopic(Tupple2<String, MyValue> element) { 
       return element.f0; 
      } 
      ... 
     }, 
     kafkaProperties) 
); 
関連する問題