2017-05-31 19 views

答えて

4

適切なデシリアライザ、たとえば文字列またはカスタムオブジェクトを使用してバイトをデコードする必要があります。

デコードを行わないと、単純にJavaのバイト配列のテキスト表現である[[email protected]が得られます。

カフカはメッセージの内容について何も知らないため、バイト配列をプロデューサからコンシューマに渡します。あなたがRDD[String]で動作するので、DStream[String]を得る上記のシリアライザで

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 

:(KafkaWordCount exampleを引用)あなたはキーと値のためのシリアライザを使用する必要がストリーミングスパークで

バイト配列をカスタムクラスにデシリアライズしたい場合は、カスタムSerializer(これはKafka固有でSparkとは関係ありません)を記述する必要があります。

JSONを固定スキーマまたはAvro(Kafka, Spark and Avro - Part 3, Producing and consuming Avro messagesで解説されているソリューション)で使用することをお勧めします。 Structured Streaming


次のようにしかし、パイプラインは見ることができる:

val fromKafka = spark. 
    readStream. 
    format("kafka"). 
    option("subscribe", "topic1"). 
    option("kafka.bootstrap.servers", "localhost:9092"). 
    load. 
    select('value cast "string") // <-- conversion here 
+0

を次に、スパーク構造化されたストリーミングにおける元のオブジェクトへのスキーマレジストリなしで/アブロカフカのメッセージを変換する方法? –

+0

元のオブジェクトを知り、たとえば 'map'演算子を使用する必要があります。 'from_json'でJSONのために持っているように' from_avro'はまだありません。 –

+0

私はKafkaAvroDeserializerを使用してArray [Byte]をAvroオブジェクトにマップしましたが、「データセットに格納された型のエンコーダを見つけることができませんでした」と言いました。次に、暗黙的なdef toEncoded(o:Zhima)としてエンコーダを提供します:Array [Byte] = o.toByteBuffer.array() 暗黙のdef fromEncoded(e:Array [Byte]):Zhima = valueDeserializer.deserialize(kafkaConsumeTopicName、e) asInstanceOf [Zhima] しかし、それは同じエラーをcompened、どのようにそれを解決する? –

関連する問題