2017-02-10 3 views
2

私はAvroシリアライザでApache Kafkaを使用しています。自分のカスタムクラスを作成しようとしていて、カフカのメッセージ値として使用しています。Kafka Avroシリアライザ:org.apache.avro.AvroRuntimeException:オープンしていません

Exception in thread "main" org.apache.avro.AvroRuntimeException: not open 
    at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82) 
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287) 
    at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

マイアブロスキーマは以下のようにファイル:

{ 
    "namespace": "customer.avro", 
    "type": "record", 
    "name": "Customer", 
    "fields": [{ 
     "name": "id", 
     "type": "int" 
    }, { 
     "name": "name", 
     "type": "string" 
    }] 
} 

顧客クラス:

public class Customer { 
    public int id; 
    public String name; 

    public Customer() { 
    } 

    public Customer(int id, String name) { 
     this.id = id; 
     this.name = name; 
    } 

    public int getId() { 
     return id; 
    } 

    public void setId(int id) { 
     this.id = id; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 
} 

データのシリアル化がアブロを使用して、私がメッセージを送信しようとしていたとき、私は次の例外を取得しています:

public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) { 
     kafkaProducer.send(record); 
    } 

Customer customer1 = new Customer(1001, "James"); 

Parser parser = new Parser(); 
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro")); 

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer); 
dataFileWriter.append(customer1); 
dataFileWriter.close(); 

ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry", 
     "Customer One", dataFileWriter 
); 
fireAndForget(record1); 

一般的なものの代わりにSpecificDatumWriterライターを使いたいです。このエラーは何と関連していますか?

答えて

1

Kafkaはシリアル化するキー値のペアを受け取り、シリアル化する値ではないDataFileWriterを渡していますが、それはうまくいかないでしょう。あなたがする必要がどのような

BinaryEncoderByteArrayOutputStream経由でシリアル化されたアブロのバイト配列を作成し、ProducerRecord<String, byte[]>に渡しです:

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
ByteArrayOutputStream os = new ByteArrayOutputStream(); 

try { 
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 
    writer.write(customer1, encoder); 
    e.flush(); 

    byte[] avroBytes = os.toByteArray(); 
    ProducerRecord<String, byte[]> record1 = 
    new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 

    kafkaProducer.send(record1); 
} finally { 
    os.close(); 
} 
+0

@Yuvalねえ、この解決策によると、私は取得しています'スレッド「main」の例外java.lang.ClassCastException:com.harmeetsingh13.java.producers.avroserializer.Customerはorg.apache.avro.generic.IndexedRecord'にキャストできません例外 –

+0

@HarmeetSinghTaaraそれは別の問題ですあなたは 'Customer'の定義と完全なスタックトレースを含む別の質問を投稿します。シリアライズする型に必要な 'ISpecificRecord'インタフェースを実装していないようです。それ以外の場合は、代わりに 'GenericDatumWriter'を使用できます。 –

+0

Okay @Yuvalしかし、この解決策によれば、私たちのデータをバイトに変換する必要があります。その後、kafkaにメッセージを送ります。私たちのスキーマをavroに指定することはできません。 –

関連する問題