私は、カスタムデシリアライザであなたのメインのコードパスでは似たような(私は09消費者を使用しています)
をやってる:
FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
parameterTool.getProperties());
カスタムデシリアライズスキーマは、バイトを読み込み、割り出しスキーマおよび/またはスキーマレジストリから取得し、GenericRecordに逆シリアル化してGenericRecordオブジェクトを返します。
public class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
@Override
public T deserialize(byte[] arg0) throws IOException {
//do your stuff here, strip off your bytes
//deserialize and create your GenericRecord
return (T) (myavroevent);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}
}
うわー箱から出してすぐに動作します。これは私がここに詳しい説明を掲載コンフルエントの
KafkaAvroSerializer
を使用して、上記のように非常に似た方法で行うことができます。ありがとう、私はこれを見て今明らかです。 – Don