2017-10-05 6 views
1

私は、スキーマレジストリを使用してavroトピックからデータを読み込んで、単純な変換を行い、結果をコンソールに出力するJavaアプリケーションを開発しました。デフォルトでは、キーと値にはGenericAvroSerdeクラスを使用しました。すべては私がそれなしでKafka Streamsがスキーマなしでavroトピックを作成する

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url")); 
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde(); 
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde(); 
    keyGenericAvroSerde.configure(serdeConfig, true); 
    valueGenericAvroSerde.configure(serdeConfig, false); 

などの各serdeための別途の構成を定義しなければならなかったことを除いてうまく働いた私はいつものようにエラーが発生します。

Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940 
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69 
Caused by: java.lang.NullPointerException 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63) 
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 

まあ、それは(その後、unsualが、大丈夫でした上記のようにコンフィギュレーションコールを追加したとき) - それはうまくいき、私のアプリケーションはすべての操作を実行して結果をプリントアウトできました。

しかし! 私はcall through()を使って新しいトピックにデータを投稿しようとしましたが、私は質問している問題に直面しました。トピックはスキーマなしで作成されました。 どのようにすることができますか?

興味深い事実は、データが書き込まれていることであるが、それは、次のとおり A)バイナリ形式で、非常にシンプル消費者はそれを Bを読み取ることができない)、それはスキーマいない - そうアブロ消費者がそれを読むのいずれかできません。もちろん

Processed a total of 1 messages 
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) 
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114) 
     at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140) 
     at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) 
     at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) 
     at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) 
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114) 
     at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140) 
     at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) 
     at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) 
     at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 

は私が主題のためのスキーマレジストリをチェックアウト:

curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions 
{"error_code":40401,"message":"Subject not found."} 

しかし、Javaのアプリケーションによって書かれた別のトピックに同じ呼び出し - 初期データのプロデューサーは、スキーマが存在することを示しています

curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions 
[1] 

両方のアプリケーションがただ要約すると同じ「schema.registry.url」構成 を使用する - トピックは、作成されたデータが書き込まれ、簡単な消費者で読み取ることができますが、それはバイナリで、スキーマが存在しません。

また、私はLandoopでスキーマを作成しようとしましたが、何とかデータにマッチさせましたが、成功しませんでした。そして、カフカストリームを使用する適切な方法ではありません。

助けてください!

+0

どのバージョンをお使いですか?また、デフォルトで 'StreamsConfig'にAvroSerdeを設定するか、各オペレータに個別に設定しますか?手動で、そしてアプリケーションを起動する前に、トピックの用途を作成しましたか?この例もチェックしてください:https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java#L83- L85 –

+0

私はConfluent 3.3.0、Java 1.8、kafka 0.11.0.0-cp1、avroバージョン1.7.7を使用します。私はGenericAvroSerdeをデフォルトとして設定しますが、単純な型の場合は、これらの設定(Serdes.Long、Serdes.String、Serdes.Float)をオーバーライドします。私が使用しようとしているトピックは存在しませんでしたが、私が投稿を書き始める時に書いたように、データが書かれている間に作成されました。 –

+0

この例では、例のキーでは、(キーのavroスキーマが単なる「文字列」なので)stringSerdeを使用している間に、例のキーがbytearrayとしてデコードされている点を除いて、すべて同じです。スキーマregstry urlが利用可能である場合、私がkafkaストリームアプリケーションの初期データを読み取ることができない場合は、利用できます。しかし、私はfinalstream.print()を実行したときに最終ストリームが正しく出力されます。 –

答えて

0

throughが呼び出されると、ユーザが特に上書きしない限り、StreamsConfigで定義されたデフォルトのserdeが使用されます。どのデフォルトのserdeを使用しましたか?正しいことを確認するには、AbstractKafkaAvroSerializerを使用する必要があります。AbstractKafkaAvroSerializerは、そのトピックのスキーマを自動的に登録します。

関連する問題