Flinkを設定して、Apache KafkaからMongoDBへのデータストリームを変換してリダイレクトするようにします。テスト目的のために、私はflink-streaming-connectors.kafkaの例(https://github.com/apache/flink)の上に構築しています。Kafka - > Flink DataStream - > MongoDB
FlinkでKafkaストリームが適切に赤くなっていますが、それらをマップすることはできますが、受信した各メッセージをMongoDBに保存するときに問題が発生します。 MongoDBの統合について私が見つけた唯一の例はgithubのflink-mongodb-testです。残念ながら、データストリームではなく静的なデータソース(データベース)を使用しています。
MongoDBのDataStream.addSink実装があるはずですが、明らかにそうではありません。
これを達成するにはどうすればよいでしょうか?カスタムシンク関数を記述する必要があるのでしょうか、それとも何か不足していますか?多分それは別のやり方で行われるべきでしょうか?
私はどんな解決策にも縛られていないので、どんな提案も感謝します。
以下に、正確に私が入力として得ているものと、出力として保存する必要があるものの例を示します。
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
この例でわかるように、Flinkは主にKafkaのメッセージストリームバッファリングと基本的な解析に使用しています。