0

私はキーと値の両方のデータのシリアル化にKafka 0.10.2とAvroを使用しています。 今私はKafkaストリームを使用したいと思いますが、GenericData.RecordクラスのSerdeクラスを作成しようとしています。GenericData.Record用のKafkaAvro Serdeの書き方

import org.apache.avro.generic.GenericData.Record; 
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; 
import io.confluent.kafka.serializers.KafkaAvroDeserializer; 
import io.confluent.kafka.serializers.KafkaAvroSerializer; 
[...] 

public final class KafkaAvroSerde implements Serde<Record> { 

    private final Serde<Record> inner; 

    public KafkaAvroSerde() { 
     // Here I get the error 
     inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer()); 
    } 

    public KafkaAvroSerde(SchemaRegistryClient client) { 
     this(client, Collections.emptyMap()); 
    } 

    public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) { 
     // Here I get the error 
     inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props)); 
    } 

    @Override 
    public Serializer<Record> serializer() { 
     return inner.serializer(); 
    } 

    @Override 
    public Deserializer<Record> deserializer() { 
     return inner.deserializer(); 
    } 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) { 
     inner.serializer().configure(configs, isKey); 
     inner.deserializer().configure(configs, isKey); 
    } 

    @Override 
    public void close() { 
     inner.serializer().close(); 
     inner.deserializer().close(); 
    } 

} 

これは私が持つことができるので、私は(とない私の具体的なPOJOのための)GenericData.RecordためSerdeクラスを定義する必要がありコメント行

Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record> 

で取得していますエラーですデシリアライザは私にGenericDataを返さなければなりません(そして、私はこのステップの後に正しいPOJOを設定します)。

どうすれば完了ですか? ありがとうございました

答えて

2

すでにquestion in the Confluent mailing listと尋ねました。ここに投稿した私の答えの要約です。

私たちは、Kafka Streamsの公式のConfluent Avro serde(特定のAvro + generic Avro)で作業を終了しました。 https://github.com/confluentinc/schema-registry/tree/master/avro-serdeを参照してください。

Confluentスキンレジストリ対応/互換性のある新しいAvro serdeは、数週間後にリリースされるConfluent 3.3でリリースされる予定です。

3.3がリリースされるまで、あなたはどちらかできmaster枝から独自の成果物を作成(あなたが最初mvn installとスキーマレジストリプロジェクト、その後、mvn installconfluentinc/commonconfluentinc/rest-utilsmaster枝を構築する必要があります)、または例えばクラスを自分のコードプロジェクトにコピー&ペーストします。

注:上記のプロジェクトのmasterブランチは、開発ブランチ、つまりリリース前ブランチです。この回答の今後の読者は、これを念頭に置いておくべきです。

今後の新しいConfluent Avro serdeの使用方法の例もあります。デモはmaster支店https://github.com/confluentinc/examplesにあります。

アブロ関連の例とコンフルエント(以下のものよりも、そのような例がある)が提供するエンド・ツー・エンドの統合テストにいくつかの直接リンク:

関連する問題