2016-12-21 8 views
0

から見直しの例では、私はこれをたくさん見アブロバイトをお読みください。ApacheのFLINKは、[]カフカ

バイト[]を汎用レコードに読み込んで を取得してスキーマを取得するまでは、スキーマがわかりません。

を(それがレコードにレコードから変更される可能性としては)誰かが、私は一般的な録音にそのbyte[]をロードし、その後、いくつかの先行ビットを削除することができるようにbyte[]からマップフィルタに読み込むFlinkKafkaConsumer08に私を指すことができますか?

答えて

0

私は、カスタムデシリアライザであなたのメインのコードパスでは似たような(私は09消費者を使用しています)

をやってる:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
       parameterTool.getRequired("topic"), new MyDeserializationSchema<>(), 
       parameterTool.getProperties()); 

カスタムデシリアライズスキーマは、バイトを読み込み、割り出しスキーマおよび/またはスキーマレジストリから取得し、GenericRecordに逆シリアル化してGenericRecordオブジェクトを返します。

public class MyDeserializationSchema<T> implements DeserializationSchema<T> { 


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class; 

    @Override 
    public T deserialize(byte[] arg0) throws IOException { 
     //do your stuff here, strip off your bytes 
     //deserialize and create your GenericRecord 
     return (T) (myavroevent); 
    } 

    @Override 
    public boolean isEndOfStream(T nextElement) { 
     return false; 
    } 

    @Override 
    public TypeInformation<T> getProducedType() { 
     return TypeExtractor.getForClass(avrotype); 
    } 

} 
+0

うわー箱から出してすぐに動作します。これは私がここに詳しい説明を掲載コンフルエントのKafkaAvroSerializer

を使用して、上記のように非常に似た方法で行うことができます。ありがとう、私はこれを見て今明らかです。 – Don

0

Confluentのスキーマレジストリを使用する場合は、Confluentが提供するAvro serdeを使用することをお勧めします。このようにして、deserialize()と呼びます。使用するAvroスキーマの最新バージョンの解像度は、シーンの背後で自動的に行われ、バイト操作は不要です。それはこのような何かに要約(Scalaではサンプルコード、Javaソリューションは非常に似ています)

import io.confluent.kafka.serializers.KafkaAvroDeserializer 

... 

val valueDeserializer = new KafkaAvroDeserializer() 
valueDeserializer.configure(
    Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
    false) 

... 

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
         topic: String, partition: Int, offset: Long): KafkaKV = { 

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord] 
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord] 

    KafkaKV(key, value) 
    } 

... 

このメソッドは、メッセージプロデューサは、スキーマレジストリと統合し、出版していることが必要ですスキーマがあります。 How to integrate Flink with Confluent's schema registry

関連する問題