2017-12-27 20 views
0

Spring Cloud Streams + Kafka Bindingを使用してApache Kafkaで「正確に1つの配信」コンセプトでPoCを実行しようとしています。Spring Cloud Stream Kafkaバインダー:「IN_TRANSACTION状態からIN_TRANSACTION状態への無効な移行が試みられました」

私はApache Kafka "kafka_2.11-1.0.0"をインストールし、プロデューサに "transactionIdPrefix"を定義しました。これはSpring Kafkaでトランザクションを有効にするために必要なことですが、同じアプリケーション内で単純なソース&シンクバインディングを実行すると、一部のメッセージが受信されてコンシューマに印刷され、一部でエラーが発生することがあります。例えば

、メッセージ#6は、受信:

[49] Received message [Payload String content=FromSource1 6][Headers={kafka_offset=1957, scst_nativeHeadersPresent=true, [email protected]695c9a9, kafka_timestampType=CREATE_TIME, my-transaction-id=my-id-6, id=302cf3ef-a154-fd42-6b43-983778e275dc, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=test10, kafka_receivedTimestamp=1514384106395, timestamp=1514384106419}] 

が、メッセージ#7は、 "無効な遷移が、状態IN_TRANSACTIONに状態IN_TRANSACTIONから試行" エラーが発生しました:

2017-12-27 16:15:07.405 ERROR 7731 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafk[email protected]7d3bbc0b]; nested exception is org.apache.kafka.common.KafkaException: TransactionalId my-transaction-3: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, failedMessage=GenericMessage [payload=byte[13], headers={my-transaction-id=my-id-7, id=d31656af-3286-99b0-c736-d53aa57a5e65, contentType=application/json, timestamp=1514384107399}] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153) 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:575) 
  • 何このエラーはどういう意味ですか?
  • 設定に何か不足していますか?
  • トランザクションが有効になっているときに、ソースまたはシンクを別々に実装する必要がありますか?

UPDATE:私は、プロジェクトのgithubの上の問題を開いた 、そこの議論を参照してください。


は春のブートバージョン「2.0.0.M5とシンプルなMavenプロジェクトを作成する必要があり、+ Trasanctionsを再現するために

を有効に結合カフカと春クラウドストリームを使用する方法の例を見つけることができませんでした

server: 
    port: 8082 
spring: 
    kafka: 
    producer: 
     retries: 5555 
     acks: "all" 
    cloud: 
    stream: 
     kafka: 
     binder: 
      autoAddPartitions: true 
      transaction: 
      transactionIdPrefix: my-transaction- 
     bindings: 
     output1: 
      destination: test10 
      group: test111 
      binder: kafka 
     input1: 
      destination: test10 
      group: test111 
      binder: kafka 
      consumer: 
      partitioned: true 

私も簡単なソースおよびシンククラスを作成:、この構成で簡単なアプリケーションを作成する」と 『春・クラウド・ストリームの依存関係』バージョン 『Elmhurst.M3』

@EnableBinding(SampleSink.MultiInputSink.class) 
public class SampleSink { 

    @StreamListener(MultiInputSink.INPUT1) 
    public synchronized void receive1(Message<?> message) { 
     System.out.println("["+Thread.currentThread().getId()+"] Received message " + message); 
    } 

    public interface MultiInputSink { 
     String INPUT1 = "input1"; 

     @Input(INPUT1) 
      SubscribableChannel input1(); 

    } 
} 

と:

@EnableBinding(SampleSource.MultiOutputSource.class) 
public class SampleSource { 

    AtomicInteger atomicInteger = new AtomicInteger(1); 

    @Bean 
    @InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) 
    public synchronized MessageSource<String> messageSource1() { 
     return new MessageSource<String>() { 
      public Message<String> receive() { 
       String message = "FromSource1 "+atomicInteger.getAndIncrement(); 
       m.put("my-transaction-id","my-id-"+ UUID.randomUUID()); 
       return new GenericMessage(message, new MessageHeaders(m)); 
      } 
     }; 
    } 

    public interface MultiOutputSource { 
     String OUTPUT1 = "output1"; 

     @Output(OUTPUT1) 
      MessageChannel output1(); 

    } 
} 

答えて

0

私は、プロジェクトのgithubのにその上のチケットをオープンしました。

https://github.com/spring-cloud/spring-cloud-stream/issues/1166

をしかし、最初の答えがありました: が答えとの議論を参考にしてください

バインダーは、現在、生産が開始したトランザクションをサポートしていません。

プロセッサは、トランザクションがサポートされています(コンシューマはトランザクションを開始し、プロデューサはそのトランザクションに参加します)。

消費者がいない場合は、spring-kafkaを直接使用してプロデューサ側で トランザクションを開始できるはずです。

関連する問題