私の目標は、kafkaを使用してjson形式の文字列を読み込み、文字列にフィルターをかけて、の部分を選択してメッセージをシンクしてメッセージを出力します(まだjsonの文字列形式です)。テストの目的のためにjson形式の文字列の一部をFlinkでKafkaから抽出する方法012
、私の入力文字列メッセージは次のようになります。
{"a":1,"b":2,"c":"3"}
と実装の私のコードです:
def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"
val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
inputTopic,
new JSONDeserializationSchema(),
inputProperties)
val messageStream : DataStream[ObjectNode]= env
.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
.asText.equals("1") && node.get("b").asText.equals("2"))
// Need help in this part, how to extract for instance a,c and
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
node => {
node.get("a")
}
)
testStream.addSink(new FlinkKafkaProducer010[JsonNode](
outputTopic,
new SerializationSchema[JsonNode] {
override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
}, outputProperties
))
env.execute("Kafka 0.10 Example")
}
このコードのコメントで示されているように、私はどのように確認していませんメッセージの一部を適切に選択する。私はマップを使用していますが、メッセージ全体をどのように連結するかはわかりません。例えば、私がコードで行ったことは、「1」という結果しか得られないが、私が望むのは{"a": "c": "3"}
この問題を解決する別の方法。私はFlinkで見つけることができませんが、物事は、 "選択" APIがありますスパークストリーミングではありません。
そして、フリンクコミュニティの助けてくれてありがとう!これは私がこの小さなプロジェクトで達成したい最後の機能です。
ありがとうございます! makeJSONはFlinkの組み込み関数ですか?それとも、自分で関数を書いてそこに置く必要があるのですか? – teddy
@teddyいいえ、Flinkにはこのようなメソッドは含まれていませんが、説明のための擬似コードです。あなたはそれを実装することができます。 )多くのコードは必要ありません; – David
Keyed状態は 'keyed stream'、つまりこの行の 'keyBy()'操作の後でのみ使用できるというエラーが発生しました(ValueState state = getRuntimeContext()。 –
teddy