2017-03-23 3 views
0

キーと値がGenericRecordの2つのストリームを正しく結合するために問題を解決できる人がいますか? まず、キーと値のAVROスキーマを使用して2つのトピックを作成しています。その後、私は両方のストリームに参加することだし、出力話題に私は新しいGenericRecord、投影スキーマを持つ、いわゆる投影レコードを作成し、コードスニペットの後に示されるように、私は、例外が発生しました:Kafkaはジェネリックレコードとしてキー値を持つストリームを2つ結合すると例外をスローする

@Test 
public void joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord() throws Exception { 
    String methodName = new Object() { 
    }.getClass().getEnclosingMethod().getName(); 

    long timestamp = new Date().getTime(); 

    String firstTopic = String.format("%1$s_1_%2$s", methodName, timestamp); 
    String secondTopic = String.format("%1$s_2_%2$s", methodName, timestamp); 

    String outputTopic = String.format("%1$s_output_%2$s", methodName, timestamp); 

    String firstStorage = String.format("%1$s_store_1_%2$s", methodName, timestamp); 
    String secondStorage = String.format("%1$s_store_2_%2$s", methodName, timestamp); 

    String appIdConfig = String.format("%1$s_app_id_%2$s", methodName, timestamp); 
    String groupIdConfig = String.format("%1$s_group_id_%2$s", methodName, timestamp); 

    String schemaIdNamespace = String.format("%1$s_id_ns_%2$s", methodName, timestamp); 
    String schemaNameNamespace = String.format("%1$s_name_ns_%2$s", methodName, timestamp); 
    String schemaScopeNamespace = String.format("%1$s_scope_ns_%2$s", methodName, timestamp); 
    String schemaProjectionNamespace = String.format("%1$s_proj_ns_%2$s", methodName, timestamp); 

    String schemaIdRecord = String.format("%1$s_id_rec_%2$s", methodName, timestamp); 
    String schemaNameRecord = String.format("%1$s_name_rec_%2$s", methodName, timestamp); 
    String schemaScopeRecord = String.format("%1$s_scope_rec_%2$s", methodName, timestamp); 
    String schemaProjectionRecord = String.format("%1$s_proj_rec_%2$s", methodName, timestamp); 

    try { 
     Integer partitions = 1; 
     Integer replication = 1; 
     Properties topicConfig = new Properties(); 

     RestUtils.createTopic(firstTopic, partitions, replication, topicConfig); 
     RestUtils.createTopic(secondTopic, partitions, replication, topicConfig); 
     RestUtils.createTopic(outputTopic, partitions, replication, topicConfig); 

     Properties streamsConfiguration = new Properties(); 
     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG); 
     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/"); //TestUtils.tempDirectory().getAbsolutePath()); 
     streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG); 

     Serializer kafkaAvroSerializer = new KafkaAvroSerializer(); 
     kafkaAvroSerializer.configure(streamsConfiguration, false); 

     Deserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); 
     kafkaAvroDeserializer.configure(streamsConfiguration, false); 

     Serde<GenericRecord> avroSerde = Serdes.serdeFrom(kafkaAvroSerializer, kafkaAvroDeserializer); 

     //----- 

     Schema idSchema = SchemaBuilder.record(schemaIdRecord).namespace(schemaIdNamespace).fields() 
       .name("Id").type().nullable().intType().noDefault() 
       .endRecord(); 

     Schema nameSchema = SchemaBuilder.record(schemaNameRecord).namespace(schemaNameNamespace).fields() 
       .name("Id").type().nullable().intType().noDefault() 
       .name("Name").type().nullable().stringType().noDefault() 
       .endRecord(); 

     Schema scopeSchema = SchemaBuilder.record(schemaScopeRecord).namespace(schemaScopeNamespace).fields() 
       .name("Scope").type().nullable().stringType().noDefault() 
       .endRecord(); 

     Schema projectionSchema = SchemaBuilder.record(schemaProjectionRecord).namespace(schemaProjectionNamespace).fields() 
       .name("Id").type().nullable().intType().noDefault() 
       .name("Name").type().nullable().stringType().noDefault() 
       .name("Scope").type().nullable().stringType().noDefault() 
       .endRecord(); 

     GenericRecord idRecord1 = new GenericData.Record(idSchema); 
     idRecord1.put("Id", 1); 
     GenericRecord idRecord2 = new GenericData.Record(idSchema); 
     idRecord2.put("Id", 2); 
     GenericRecord idRecord3 = new GenericData.Record(idSchema); 
     idRecord3.put("Id", 3); 
     GenericRecord idRecord4 = new GenericData.Record(idSchema); 
     idRecord4.put("Id", 4); 

     GenericRecord nameRecord1 = new GenericData.Record(nameSchema); 
     nameRecord1.put("Id", 1); 
     nameRecord1.put("Name", "Bruce Eckel"); 
     GenericRecord nameRecord2 = new GenericData.Record(nameSchema); 
     nameRecord2.put("Id", 2); 
     nameRecord2.put("Name", "Robert Lafore"); 
     GenericRecord nameRecord3 = new GenericData.Record(nameSchema); 
     nameRecord3.put("Id", 3); 
     nameRecord3.put("Name", "Andrew Tanenbaum"); 
     GenericRecord nameRecord4 = new GenericData.Record(nameSchema); 
     nameRecord4.put("Id", 4); 
     nameRecord4.put("Name", "Programming in Scala"); 

     GenericRecord scopeRecord1 = new GenericData.Record(scopeSchema); 
     scopeRecord1.put("Scope", "Modern Operating System"); 
     GenericRecord scopeRecord2 = new GenericData.Record(scopeSchema); 
     scopeRecord2.put("Scope", "Thinking in Java"); 
     GenericRecord scopeRecord3 = new GenericData.Record(scopeSchema); 
     scopeRecord3.put("Scope", "Computer Architecture"); 
     GenericRecord scopeRecord4 = new GenericData.Record(scopeSchema); 
     scopeRecord4.put("Scope", "Programming in Scala"); 

     List<KeyValue<GenericRecord, GenericRecord>> list1 = Arrays.asList(
       new KeyValue<>(idRecord1, nameRecord1), 
       new KeyValue<>(idRecord2, nameRecord2), 
       new KeyValue<>(idRecord3, nameRecord3) 
     ); 

     List<KeyValue<GenericRecord, GenericRecord>> list2 = Arrays.asList(
       new KeyValue<>(idRecord3, scopeRecord1), 
       new KeyValue<>(idRecord1, scopeRecord2), 
       new KeyValue<>(idRecord3, scopeRecord3), 
       new KeyValue<>(idRecord4, scopeRecord4) 
     ); 

     GenericRecord projectionRecord1 = new GenericData.Record(projectionSchema); 
     projectionRecord1.put("Id", nameRecord1.get("Id")); 
     projectionRecord1.put("Name", nameRecord1.get("Name")); 
     projectionRecord1.put("Scope", scopeRecord1.get("Scope")); 

     GenericRecord projectionRecord2 = new GenericData.Record(projectionSchema); 
     projectionRecord2.put("Id", nameRecord2.get("Id")); 
     projectionRecord2.put("Name", nameRecord2.get("Name")); 
     projectionRecord2.put("Scope", scopeRecord2.get("Scope")); 

     GenericRecord projectionRecord3 = new GenericData.Record(projectionSchema); 
     projectionRecord3.put("Id", nameRecord3.get("Id")); 
     projectionRecord3.put("Name", nameRecord3.get("Name")); 
     projectionRecord3.put("Scope", scopeRecord3.get("Scope")); 

     List<KeyValue<GenericRecord, GenericRecord>> expectedResults = Arrays.asList(
       new KeyValue<>(idRecord3, projectionRecord3), 
       new KeyValue<>(idRecord1, projectionRecord1), 
       new KeyValue<>(idRecord3, projectionRecord3) 
     ); 

     //----- 

     KStreamBuilder builder = new KStreamBuilder(); 

     KStream<GenericRecord, GenericRecord> firstStream = builder.stream(avroSerde, avroSerde, firstTopic); 

     KStream<GenericRecord, GenericRecord> secondStream = builder.stream(avroSerde, avroSerde, secondTopic); 

     KStream<GenericRecord, GenericRecord> outputStream = firstStream.join(secondStream, 
       new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() { 
        @Override 
        public GenericRecord apply(GenericRecord l, GenericRecord r) { 
         GenericRecord projectionRecord = new GenericData.Record(projectionSchema); 
         projectionRecord.put("Id", l.get("Id")); 
         projectionRecord.put("Name", l.get("Name")); 
         projectionRecord.put("Scope", r.get("Scope")); 
         return projectionRecord; 
        } 
       }, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), avroSerde, avroSerde, avroSerde); 

     outputStream.to(avroSerde, avroSerde, outputTopic); 

     KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 

     streams.start(); 

     Properties cfg1 = new Properties(); 
     cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     cfg1.put(ProducerConfig.ACKS_CONFIG, "all"); 
     cfg1.put(ProducerConfig.RETRIES_CONFIG, 0); 
     cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     cfg1.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG); 
     IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, list1, cfg1); 

     Properties cfg2 = new Properties(); 
     cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     cfg2.put(ProducerConfig.ACKS_CONFIG, "all"); 
     cfg2.put(ProducerConfig.RETRIES_CONFIG, 0); 
     cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     cfg2.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG); 
     IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, list2, cfg2); 

     Properties consumerConfig = new Properties(); 
     consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig); 
     consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG); 

     List<KeyValue<GenericRecord, GenericRecord>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size()); 

     streams.close(); 

     //----- 

     assertThat(actualResults).containsExactlyElementsOf(expectedResults); 

     //----- 
    } finally { 
     RestUtils.deleteTopics(firstTopic, secondTopic, outputTopic); 
    } 
} 

のStackTrace:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_1_1490264134172, partition=0, offset=0 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) 
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]} 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema 
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema 
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:63) 
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:166) 
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) 
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) 
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161) 
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143) 
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99) 
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389) 
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347) 
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102) 
    at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308) 
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271) 
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267) 
    at org.glassfish.jersey.internal.Errors.process(Errors.java:315) 
    at org.glassfish.jersey.internal.Errors.process(Errors.java:297) 
    at org.glassfish.jersey.internal.Errors.process(Errors.java:267) 
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317) 
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291) 
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140) 
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403) 
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386) 
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:548) 
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:489) 
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:426) 
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) 
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585) 
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) 
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) 
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) 
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) 
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) 
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) 
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) 
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) 
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) 
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) 
    at org.eclipse.jetty.server.Server.handle(Server.java:499) 
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) 
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) 
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema. 
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:369) 
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:391) 
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:154) 
    ... 44 more 
; error code: 409 
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170) 
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187) 
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238) 
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230) 
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225) 
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59) 
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) 
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) 
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176) 
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) 
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101) 
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) 

答えて

1
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]} 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema 
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema 

キーの行は、次のとおりです。

  • "アブロスキーマの登録エラー"
  • 「登録されているスキーマは、以前のスキーマと互換性がありません」

登録に失敗したスキーマは次のとおりです。

Error registering Avro schema:{ 
    "type":"record", 
    "name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172", 
    "namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172", 
    "fields":[ 
     { 
     "name":"Id", 
     "type":[ 
      "int", 
      "null" 
     ] 
     }, 
     { 
     "name":"Name", 
     "type":[ 
      "string", 
      "null" 
     ] 
     } 
    ] 
} 

おそらく、以前のあなたのコードを試してきたし、その時にAを書かれています出力トピックとは異なる出力メッセージ(異なるAvroスキーマを使用)そのため、出力のトピックを

  1. 新しいメッセージがこの以前のスキーマと互換性のないスキーマを持っている、とあなたはスキーマレジストリを設定している
  2. は(私はそれが Confluent schema registryだと仮定):私が想像できることは、ここで何が起こっていますレジストリは互換性のないスキーマを拒否します。

出力トピックのスキーマレジストリでの互換性チェックを無効にすることができます。

0

from the documentation: は「完全に、このテスト中に作成された任意のデータを削除し、クリーンな状態に次回を開始するには、合流を実行することができる破壊」時には、スキーマの非互換性は、首の本当の痛みです

0

を。私はこのような問題を解決するために何をすべきかここで

クリアトピック:スキーマrestryのUIで

kafka-topics.bat --zookeeper 1.2.3.4:2181 --delete --topic your-topic-name 

CONFIG]タブに移動した後、あなたの互換性レベルNONE

を設定しますスキーマを簡単に更新できます。

スキーマを更新してトピックに新しいメッセージを正常に発行した後、古い互換性レベルを復元できます。

注意してください。グローバル互換性レベルを変更しないでください。

関連する問題