3

kafkaでカスタムシリアライザを作成するには?

org.apache.kafka.common.serialization.StringSerializer 
org.apache.kafka.common.serialization.StringSerializer 

はどのように我々は我々自身のカスタム・シリアライザを作成することができ、わずか数シリアライザのようなありますか?

+0

この質問はカフカ0.9+ –

答えて

5

ここでは、Kafkaメッセージ値に独自のシリアライザ/デシリアライザを使用する例があります。カフカのメッセージキーについても同じことです。

MyMessageのシリアル化バージョンをKafka値として送信し、消費者側のMyMessageオブジェクトに再度逆シリアル化します。

プロデューサー側でMyMessageをシリアライズします。

あなたがorg.apache.kafka.common.serialization.Serializer

シリアル化を()メソッドは、オブジェクトを受信し、バイト配列としてシリアル化されたバージョンを返す、作業を行う実装シリアライザクラスを作成する必要があります。

public class MyValueSerializer implements Serializer<MyMessage> 
{ 
    private boolean isKey; 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) 
    { 
     this.isKey = isKey; 
    } 

    @Override 
    public byte[] serialize(String topic, MyMessage message) 
    { 
     if (message == null) { 
      return null; 
     } 

     try { 

      (serialize your MyMessage object into bytes) 

      return bytes; 

     } catch (IOException | RuntimeException e) { 
      throw new SerializationException("Error serializing value", e); 
     } 
    } 

    @Override 
    public void close() 
    { 

    } 
} 

final IntegerSerializer keySerializer = new IntegerSerializer(); 
final MyValueSerializer myValueSerializer = new MyValueSerializer(); 
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer); 

int messageNo = 1; 
int kafkaKey = messageNo; 
MyMessage kafkaValue = new MyMessage(); 
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue); 
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue)); 

消費者側のMyMessageを逆シリアル化します。

あなたはorg.apache.kafka.common.serialization.Deserializer

デシリアライズ()メソッドは、バイト配列と、あなたのオブジェクトを返すようにシリアライズされた値を受け、仕事を実装デシリアライザクラスを作成する必要があります。

public class MyValueDeserializer implements Deserializer<MyMessage> 
{ 
    private boolean isKey; 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) 
    { 
     this.isKey = isKey; 
    } 

    @Override 
    public MyMessage deserialize(String s, byte[] value) 
    { 
     if (value == null) { 
      return null; 
     } 

     try { 

      (deserialize value into your MyMessage object) 

      MyMessage message = new MyMessage(); 
      return message; 

     } catch (IOException | RuntimeException e) { 
      throw new SerializationException("Error deserializing value", e); 
     } 
    } 

    @Override 
    public void close() 
    { 

    } 
} 

次に、このようにそれを使用します。

final IntegerDeserializer keyDeserializer = new IntegerDeserializer(); 
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer(); 
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer); 

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000); 
for (ConsumerRecord<Integer, MyMessage> record : records) { 

    int kafkaKey = record.key(); 
    MyMessage kafkaValue = record.value(); 

    ... 
} 
+0

最終KafkaConsumer <整数、MyMessage>消費者=新しいKafkaConsumer <>(小道具、keyDeserializer、のためである場合は、明確にしてくださいでしたmyValueDeserializer); –

+0

上記は構文通りではありません。次に、kafkaはデシリアライザについて知ることができます –

+1

デシリアライザは、コンストラクタの3番目の引数です:myValueDeserializer。このコードはすべて作業コードから取り出したもので、名前を変更しただけです。 –

0

インターフェイス 'シリアライザ'(org.apache.kafka.common.serialization.Serializer)を実装する独自のシリアライザを作成し、プロデューサオプション 'key.serializer/value.serializer'を設定する必要があります。

関連する問題