2017-07-10 9 views
2

Spark Scala App Kafka APIをv10.0にアップグレードしています。私はバイト文字列形式で来るメッセージの逆シリアル化のためのカスタムメソッドを作成するために使用しました。Kafka Consumer for Spark for Kafka API 0.10:カスタムAVROデシリアライザ

私は、StringDeserializerまたはByteArrayDeserializerをキーまたは値のいずれかのパラメータとして渡す方法があることを認識しました。

しかし、カスタムのAvroスキーマデシリアライザを作成する方法に関する情報は見つかりません。そのため、kafkaStreamはcreateDirectStreamとKafkaのデータを使用するときにkafkaStreamを使用できます。

は可能ですか?

答えて

5

可能です。 org.apache.kafka.common.serializationで定義されたDeserializer<T>インターフェイスをオーバーライドする必要があります。クラスを使用して、カスタムクラスにkey.deserializerまたはvalue.deserializerを指定する必要があります。たとえば、次のように

import org.apache.kafka.common.serialization.Deserializer 

class AvroDeserializer extends Deserializer[Array[Byte]] { 
    override def configure(map: util.Map[String, _], b: Boolean): Unit = ??? 
    override def close(): Unit = ??? 
    override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ??? 
} 

そして:

import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import my.location.with.AvroDeserializer 

val ssc: StreamingContext = ??? 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092,anotherhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[AvroDeserializer], 
    "group.id" -> "use_a_separate_group_id_for_each_stream", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val topics = Array("sometopic") 
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 
関連する問題