1

最終的に一貫性のある分散アーキテクチャの実装は苦痛であることが判明しました。どのようにしてそれを行うのかを物語っているが、実際にそれを行う方法(コード)を示していない大量のブログ記事がある。Spring Cloud Stream Kafka - 最終的な一貫性 - Kafkaは未確認のメッセージを自動再試行しますか(autocommitoffset = falseを使用した場合)

私が苦しんでいる側面の1つは、メッセージがack'dされていないときに手動で再試行することです。

たとえば、私の注文サービスは有料イベントをKafkaに送信します。決済サービスがそれに加入し、それを処理し、支払いと答えている[OK]または支払い失敗

  1. 支払いを求める:Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service

  2. 支払OK: - >Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service

  3. 支払いは失敗 - >Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service

ポイントは次のとおりです。

私は、同期送信を使用してメッセージがKafkaに配信されたことがわかっています。しかし、支払いが支払サービスによって処理されたことを知る唯一の方法は、回答イベント(Payment ok | Payment failure)を期待することです。

これにより、Orderサーバーで再試行メカニズムを実装する必要があります。しばらくしても答えが得られなかった場合は、新しいPayイベントで再試行してください。

これはまた、実際に処理されたが回答が注文サービスに届かなかった場合に、支払いサービスで重複したメッセージを処理することを余儀なくさせます。

消費者がメッセージの新しいオフセットを確認しなかった場合、Kafkaに組み込みのメカニズムが組み込まれているのだろうかと思いました。

春クラウドストリームでは、falseにautoCommitOffsetプロパティを設定し、消費者にオフセットのACKを処理することができます:

我々は実行しない場合はどうなり
@StreamListener(Sink.INPUT) 
public void process(Message<?> message) { 
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
    if (acknowledgment != null) { 
     System.out.println("Acknowledgment provided"); 
     acknowledgment.acknowledge(); 
    } 
} 

acknowledgment.acknowledge();メッセージが自動的に再送信されますこの消費者にカフカによって?

ことが可能であるならば、我々はもはや手動で再試行する必要はありませんし、このようなものを行うことができます:再試行期間がどのように、これが可能だった場合

@Autowired 
private PaymentBusiness paymentBusiness; 

@StreamListener(Sink.INPUT) 
public void process(Order order) { 
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
    if (acknowledgment != null) { 
     paymentBusiness(order);    
     //If we don't get here because of an exception 
     //Kafka would retry... 
     acknowledgment.acknowledge(); 
    } 
} 

Paymenサービスは、カフカで設定されていますか?

シナリオでは、これはサポートされていないため、手動で再試行する必要があります。 Kafkaを使用して最終的な一貫性を扱うSpring Cloud Streamアプリケーションの実際の例を知っていますか?

答えて

1

acknowledgement.acknowledge()を実行しないとどうなりますか。メッセージは自動的にカフカからこの消費者に再送信されますか?

いいえKafkaコンシューマは、クライアントが開いている間、メッセージを順番に読み取ります。 Kafkaは、個々のメッセージの確認応答、特定のコンシューマグループとパーティショントピックのオフセットの更新など、より洗練された確認モードをサポートしていません。 Spring Cloud Streamは、非同期で処理されるシナリオ(つまり、メッセージの損失を防ぐシナリオ)のために、Spring Cloud Stream内のメッセージの手動承認をサポートしていますが、メッセージが手動で承認されるとそのオフセットが保存されるため、 partition-topicは '読み込み'とみなされます。失敗したメッセージを取り除きたい場合は、DLQサポートを使用して、その後のコンシューマにそれらを受信させることができます。コンシューマを再起動すると、最後に保存されたオフセットからの読み取りが再開されるため、一連の正常に処理されなかったメッセージのオフセットを保存しないという選択肢があります。

ザ・春クラウドストリームの消費者が内蔵されている再試行およびDLQサポート - デフォルトの消費者のプロパティの一部として提供さhttp://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_propertiesenableDlqだけでなく、再試行の設定を参照してください。http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_consumer_properties

関連する問題