org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer
はどのように我々は我々自身のカスタム・シリアライザを作成することができ、わずか数シリアライザのようなありますか?
答えて
ここでは、Kafkaメッセージ値に独自のシリアライザ/デシリアライザを使用する例があります。カフカのメッセージキーについても同じことです。
MyMessageのシリアル化バージョンをKafka値として送信し、消費者側のMyMessageオブジェクトに再度逆シリアル化します。
プロデューサー側でMyMessageをシリアライズします。
あなたがorg.apache.kafka.common.serialization.Serializer
シリアル化を()メソッドは、オブジェクトを受信し、バイト配列としてシリアル化されたバージョンを返す、作業を行う実装シリアライザクラスを作成する必要があります。
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
消費者側のMyMessageを逆シリアル化します。
あなたはorg.apache.kafka.common.serialization.Deserializer
デシリアライズ()メソッドは、バイト配列と、あなたのオブジェクトを返すようにシリアライズされた値を受け、仕事を実装デシリアライザクラスを作成する必要があります。
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}
次に、このようにそれを使用します。
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
}
最終KafkaConsumer <整数、MyMessage>消費者=新しいKafkaConsumer <>(小道具、keyDeserializer、のためである場合は、明確にしてくださいでしたmyValueDeserializer); –
上記は構文通りではありません。次に、kafkaはデシリアライザについて知ることができます –
デシリアライザは、コンストラクタの3番目の引数です:myValueDeserializer。このコードはすべて作業コードから取り出したもので、名前を変更しただけです。 –
インターフェイス 'シリアライザ'(org.apache.kafka.common.serialization.Serializer)を実装する独自のシリアライザを作成し、プロデューサオプション 'key.serializer/value.serializer'を設定する必要があります。
- 1. Akka Kafkaカスタムシリアライザ
- 2. カスタムシリアライザを書くには?
- 3. MobogDB C#Driverを使用してエンティティのカスタムシリアライザを作成する方法は?
- 4. apache kafkaでトピックを作成するには?
- 5. JsonInclude.Include.NON_DEFAULTがカスタムシリアライザで動作しない
- 6. nodejsにパーティションを持つkafkaトピックを作成するには?
- 7. ジャクソンでカスタムシリアライザとデシリアライザを書くには?
- 8. WCFカスタムシリアライザ
- 9. Kafkaコンシューマーを作成できません
- 10. kafkaフィルタリング/動的トピック作成
- 11. Kryo(カスタムシリアライザ)でオブジェクトをシリアライズ
- 12. Swagger/Swashbuckleカスタムシリアライザを構成する方法IControllerConfiguration ASP.NET WebAPI
- 13. Kafkaプロデューサーはトピックとパーティションを作成できますか?
- 14. Apache Kafka 0.10.0 API with Javaを使用してKafkaブローカクラスタを作成する
- 15. Kafka Streamsがスキーマなしでavroトピックを作成する
- 16. GSON - 特定のケースでは、カスタムシリアライザ
- 17. kafka用の独自のカスタムパーティショナーを作成
- 18. MongoDBカスタムシリアライザの実装
- 19. Spark Kryo:カスタムシリアライザを登録する
- 20. サービスファブリックにカスタムシリアライザを登録するにはどうすればよいですか?
- 21. Kafka 0.9 - java apiを使用してトピックを作成するには
- 22. Apache Kafkaはコードからトピックを作成します
- 23. Kafkaコンシューマクライアントの作成シングルトンインスタンスとスタティックメソッド
- 24. トピックを作成するjava - kafkaバージョン> 0.10.0.0
- 25. springbootでトピックごとに別々のKafkaリスナーを作成するにはどうすればいいですか?
- 26. kafka(kafka-python)をtxtファイルにダンプする
- 27. Kafkaでの操作を求める
- 28. 追加のApache Kafkaブローカーを作成できません
- 29. Kafka Connectでコネクタを作成する分散エラーが返る500エラー
- 30. SnappyData - Kafkaストリーミングテーブルを作成中にエラーが発生しました
この質問はカフカ0.9+ –