2016-02-02 8 views
7

Flinkを設定して、Apache KafkaからMongoDBへのデータストリームを変換してリダイレクトするようにします。テスト目的のために、私はflink-streaming-connectors.kafkaの例(https://github.com/apache/flink)の上に構築しています。Kafka - > Flink DataStream - > MongoDB

FlinkでKafkaストリームが適切に赤くなっていますが、それらをマップすることはできますが、受信した各メッセージをMongoDBに保存するときに問題が発生します。 MongoDBの統合について私が見つけた唯一の例はgithubのflink-mongodb-testです。残念ながら、データストリームではなく静的なデータソース(データベース)を使用しています。

MongoDBのDataStream.addSink実装があるはずですが、明らかにそうではありません。

これを達成するにはどうすればよいでしょうか?カスタムシンク関数を記述する必要があるのでしょうか、それとも何か不足していますか?多分それは別のやり方で行われるべきでしょうか?

私はどんな解決策にも縛られていないので、どんな提案も感謝します。

以下に、正確に私が入力として得ているものと、出力として保存する必要があるものの例を示します。

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String) 
Apache Kafka Broker --------------> Flink: DataStream<String> 

Flink: DataStream.map({ 
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD") 
}) 
.rebalance() 
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection 

この例でわかるように、Flinkは主にKafkaのメッセージストリームバッファリングと基本的な解析に使用しています。

答えて

3

現在、Flinkで利用できるストリーミングMongoDBシンクはありません。

しかし、MongoDBのにデータを書き込むための2つの方法があります。

  • FLINKのDataStream.write()コールを使用してください。ストリーミングでOutputFormat(バッチAPIから)を使用することができます。 FlinkのHadoopOutputFormatWrapperを使用して、公式のMongoDB Hadoopコネクタを使用できます。

  • シンクを実装してください。 Streaming APIを使ってシンクを実装するのは簡単ですが、MongoDBにはJavaクライアントライブラリがあります。

いずれの方法も高度な処理保証を提供していません。しかし、FlinkとKafka(そしてチェックポイント機能が有効)を使用しているときは、少なくとも一度は意味があります。エラーの場合、データは再びMongoDBシンクにストリーミングされます。 偶数の更新を行っている場合は、これらの更新をやり直しても矛盾は生じません。

MongoDBに対して正確に一度のセマンティクスが必要な場合は、おそらくJIRA in Flinkを提出し、これを実装する方法についてコミュニティと話し合ってください。

2

Robert Metzgerの回答に代わるものとして、結果をKafkaに再度書き込んで、管理されたkafkaのコネクタの1つを使用して、MongoDBデータベース内のトピックの内容を削除することができます。

カフカ - > FLINK - >カフカ - >あなたは "意味論では、少なくともワンス" behaivourをmantainすることができます。このアプローチではモンゴ/エニシング

関連する問題