を使用できRichFlatMapFunction
クラスを拡張するために
class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
var userCollection: MongoCollection[Document] = _
override def open(parameters: Configuration): Unit = {
// do something here like opening connection
val client: MongoClient = MongoClient("mongodb://localhost:10000")
userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
super.open(parameters)
}
override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {
// Do something here per record and this function can make use of objects initialized via open
userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
(result: Document) => {
// println(result)
},
(t: Throwable) =>{
println(t)
},
()=>{
out.collect(event)
}
)
}
}
}
基本的にopen
機能は、作業者ごとに一度だけ実行されるとflatmap
は、レコードごとにそれを実行します。例はMongoのためのものですが、私はあなたのパイプラインの最初のステップではなく、あなたが同様にあなた自身のRichSourceFunction
を書くべきRichFlatMapFunction
を書くよりも、カサンドラからデータを読み取って理解するのと同様にあなたのケースではカサンドラ
[このドキュメントとコード例](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html)をご覧になりましたか? – user909481
この例ではWRITE(挿入)についてのみ説明していますが、READ操作も探しています。 –
リンクされたドキュメントは、Flinkがシンクしか提供していないストリーミングAPIを指します。バッチ(DataSet)APIには、潜在的に再利用できるCassandra Input-/Outputformatsがあります。 –