私のストリームは、キーがusr12345
であり、値がストリーム出力に{"_key":"usr12345","_temperature":46.6}
FLINKストリーミング:
.print()
あるタイプTuple2<String,String>
.toString()
出力(usr12345,{"_key":"usr12345","_temperature":46.6})
の記録を生産しているシリアル化された文字列のメッセージで予期しないcharaters正しく値:
(usr12345,{"_key":"usr12345","_temperature":46.6})
しかし、私はカフカにストリームを書き込む際にキーがusr12345
(先頭に空白を含む)と値({"_key":"usr12345","_temperature":46.6}
お知らせスペースの先頭にキーと左括弧の初めになると値。
非常に奇妙です。なぜこれが起こるのでしょうか?
TypeInformation<String> resultType = TypeInformation.of(String.class);
KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream,
"topic",
schema,
kafkaProducerProperties);
、あなたはカフカのシンクを作成し、stream.addsink(kafkaSinkを)やってみましたがありますか?問題が解決するかもしれませんか? –
@BiplobBiswasまあ、私はFlink Kafkaの文書に書かれている指示に従った。 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producerこれ0.10+カフカ、これは、Javaを使用するための正しい方法であることによります私は使用しています。 – Beckham