私はKafkaからデータを取り出して、デフォルトのデコーダを使用してArray[Byte]
を逆シリアル化した後、(null,[[email protected])
、(null,[[email protected])
のように見えますが、私はこれをどのようにして元のデータにしたいのですか?カフカから元のオブジェクトにバイトを変換するには?
メッセージをAvro形式でシリアル化します。
私はKafkaからデータを取り出して、デフォルトのデコーダを使用してArray[Byte]
を逆シリアル化した後、(null,[[email protected])
、(null,[[email protected])
のように見えますが、私はこれをどのようにして元のデータにしたいのですか?カフカから元のオブジェクトにバイトを変換するには?
メッセージをAvro形式でシリアル化します。
適切なデシリアライザ、たとえば文字列またはカスタムオブジェクトを使用してバイトをデコードする必要があります。
デコードを行わないと、単純にJavaのバイト配列のテキスト表現である[[email protected]
が得られます。
カフカはメッセージの内容について何も知らないため、バイト配列をプロデューサからコンシューマに渡します。あなたがRDD[String]
で動作するので、DStream[String]
を得る上記のシリアライザで
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
:(KafkaWordCount exampleを引用)あなたはキーと値のためのシリアライザを使用する必要がストリーミングスパークで
。
バイト配列をカスタムクラスにデシリアライズしたい場合は、カスタムSerializer(これはKafka固有でSparkとは関係ありません)を記述する必要があります。
JSONを固定スキーマまたはAvro(Kafka, Spark and Avro - Part 3, Producing and consuming Avro messagesで解説されているソリューション)で使用することをお勧めします。 Structured Streamingで
次のようにしかし、パイプラインは見ることができる:
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string") // <-- conversion here
を次に、スパーク構造化されたストリーミングにおける元のオブジェクトへのスキーマレジストリなしで/アブロカフカのメッセージを変換する方法? –
元のオブジェクトを知り、たとえば 'map'演算子を使用する必要があります。 'from_json'でJSONのために持っているように' from_avro'はまだありません。 –
私はKafkaAvroDeserializerを使用してArray [Byte]をAvroオブジェクトにマップしましたが、「データセットに格納された型のエンコーダを見つけることができませんでした」と言いました。次に、暗黙的なdef toEncoded(o:Zhima)としてエンコーダを提供します:Array [Byte] = o.toByteBuffer.array() 暗黙のdef fromEncoded(e:Array [Byte]):Zhima = valueDeserializer.deserialize(kafkaConsumeTopicName、e) asInstanceOf [Zhima] しかし、それは同じエラーをcompened、どのようにそれを解決する? –