2
私はAvroシリアライザでApache Kafkaを使用しています。自分のカスタムクラスを作成しようとしていて、カフカのメッセージ値として使用しています。Kafka Avroシリアライザ:org.apache.avro.AvroRuntimeException:オープンしていません
Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
マイアブロスキーマは以下のようにファイル:
{
"namespace": "customer.avro",
"type": "record",
"name": "Customer",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": "string"
}]
}
顧客クラス:
public class Customer {
public int id;
public String name;
public Customer() {
}
public Customer(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
データのシリアル化がアブロを使用して、私がメッセージを送信しようとしていたとき、私は次の例外を取得しています:
public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
kafkaProducer.send(record);
}
Customer customer1 = new Customer(1001, "James");
Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();
ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
"Customer One", dataFileWriter
);
fireAndForget(record1);
一般的なものの代わりにSpecificDatumWriter
ライターを使いたいです。このエラーは何と関連していますか?
@Yuvalねえ、この解決策によると、私は取得しています'スレッド「main」の例外java.lang.ClassCastException:com.harmeetsingh13.java.producers.avroserializer.Customerはorg.apache.avro.generic.IndexedRecord'にキャストできません例外 –
@HarmeetSinghTaaraそれは別の問題ですあなたは 'Customer'の定義と完全なスタックトレースを含む別の質問を投稿します。シリアライズする型に必要な 'ISpecificRecord'インタフェースを実装していないようです。それ以外の場合は、代わりに 'GenericDatumWriter'を使用できます。 –
Okay @Yuvalしかし、この解決策によれば、私たちのデータをバイトに変換する必要があります。その後、kafkaにメッセージを送ります。私たちのスキーマをavroに指定することはできません。 –