2

私のストリームは、キーがusr12345であり、値がストリーム出力に{"_key":"usr12345","_temperature":46.6}FLINKストリーミング:

.print()あるタイプTuple2<String,String>

.toString()出力(usr12345,{"_key":"usr12345","_temperature":46.6})

の記録を生産しているシリアル化された文字列のメッセージで予期しないcharaters正しく値:

(usr12345,{"_key":"usr12345","_temperature":46.6})

しかし、私はカフカにストリームを書き込む際にキーがusr12345(先頭に空白を含む)と値({"_key":"usr12345","_temperature":46.6}

お知らせスペースの先頭にキーと左括弧の初めになると値。

非常に奇妙です。なぜこれが起こるのでしょうか?

TypeInformation<String> resultType = TypeInformation.of(String.class); 

KeyedSerializationSchema<Tuple2<String, String>> schema = 
     new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig()); 

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
     stream, 
     "topic",  
     schema, 
     kafkaProducerProperties); 
+0

、あなたはカフカのシンクを作成し、stream.addsink(kafkaSinkを)やってみましたがありますか?問題が解決するかもしれませんか? –

+0

@BiplobBiswasまあ、私はFlink Kafkaの文書に書かれている指示に従った。 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producerこれ0.10+カフカ、これは、Javaを使用するための正しい方法であることによります私は使用しています。 – Beckham

答えて

4

TypeInformationKeyValueSerializationSchema結果をバイナリデータとして解釈されなければならないことを意味するFLINKのカスタムシリアライザとデータをシリアル化:ここ

シリアル化コードです。 FlinkのStringシリアライザは、文字列の長さをすべての文字のエンコードに続けて書き込みます。

私はあなたがプレーンな文字列デシリアライザでカフカのトピックをデシリアライズすることを前提とします。キーの場合、シリアライズされた長さは空白文字として解釈されます。値の場合、長さは'('と解釈されます。

はプレーンな文字列としてキーと値をシリアライズまたは互換性のデシリアライザを使用する異なるシリアライザを使用してみてください。あなたが説明したことは少し奇妙です

関連する問題