2017-05-05 23 views
0

私は、JMSベースのシステムから新しいシステムとレガシーシステム間のトランザクションを同期させるKafkaに移行しています。新しいシステムによって発行されるすべてのメッセージは、サブスクライバ/コンシューマによって正常に処理されなければなりません。私はメッセージの順序について心配する必要はありません。レガシーシステムではいくつかの設計上の問題(悲観的なロック)のため、しばらくの間、メッセージが到着したときにトランザクションが失敗することはほとんどありません。その場合、遅れてメッセージが戻ってくるようにします。私はカフカでこの状況をどのように処理するかを考えようとしています。コンフルエントカフカで消費者側のメッセージ処理エラーを処理する方法は?

ソースとターゲットアプリケーションは.NET 4.6.1とc#にあります。私はConfluent.Kafka v0.9.5クライアントライブラリを使用しています。カフカはバージョン0.10です。

コンシューマ向けアプリケーションでは、自動コミットを無効にして、メッセージが正常に処理された後にcommitAsyncメソッドを明示的に呼び出してオフセットをコミットしました。ここで私は消費者を作成する方法です。

var config = new Dictionary<string, object>() 
       { 
        {"group.id", GroupId}, 
        {"client.id", ClientId}, 
        {"enable.auto.commit", false}, 
        {"bootstrap.servers", _consumerConnectionConfig.BrokerUrl}, 
        { 
         "default.topic.config", new Dictionary<string, object>() 
         { 
          {"auto.offset.reset", "latest"} 
         } 
        } 
       }; 
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)); 

これはポーリングの設定方法です。

consumer.Subscribe(topics); 

while (!SubscriberCancellationToken.IsCancellationRequested) 
{ 
     consumer.Poll(TimeSpan.FromMilliseconds(1000)); 
} 

はここで処理メッセージメッセージが正常に処理できた場合

consumer.OnMessage += (sender, message) => { 
    try 
    { 
    var payload = GetPayload(message); 
    if (_messageHandlerService.ProcessMessage(payload)) 
    {     
     consumer.CommitAsync(message).Wait(SubscriberCancellationToken); 
    } 
    else 
    { 
     //(case 1) Now what should I do????? 
    } 
    } 
    catch(Exception ex) 
    { 
     Log.Error("Unable to process xyz messages", ex); 
     throw; //??? (Case 2) should I throw the exception or should I not? 
    } 
}; 

に責任があるのonMessageイベントで、私はcommitAsyncを呼び出し、それは魔法のように動作します。私の質問は、私がメッセージを処理できない(ケース1)、または何らかの例外が発生した(ケース2)ときに、何をすべきかということです。これら2つの状況をどのように処理する必要がありますか?

JMSの世界では(ケース1)、私は遅延再発行戦略を適用しました。基本的には1分待ってから、同じトピックにメッセージを再発行して現在のメッセージをコミットするので、再発行されたメッセージが再び表示されます。何らかの理由で再発行できない場合は、処理が再開されるまで、またはプロセスが再開するまで再試行を続けます。再発行する前にプロセスが再開された場合、コミットされていないメッセージが戻ってきて、サイクルが再び開始されます。

一度再パブリッシュが成功すると、すでにトピックを待っているメッセージがある場合、列車はすべてが処理されるまで円を描く。ケース1のようにメッセージを処理できないたびに、エラーをログに記録し、それに基づいて適切なアラートを生成するので、アプリケーションサポートチームはレガシーシステムの一部のデータを修正する必要がある場合に備えて、それまでは、メッセージは失敗し続け、再発行されます。

ケース2では、詳細を記録し、JMS実装で例外をスローしました。私は今、ケース1のように扱うべきだろうかと疑問に思います。

私はカフカの世界でどのようにこれらの2つの状況を処理すべきですか?より良い選択肢はありますか?貴方の消費within

答えて

1

いくつかのオプション

一つは、それを専用コンシューマの契約をanotherカフカトピックにこれらのメッセージをプッシュするとletすることで、または、

再試行は、その特定のメッセージが処理されるまでか、certain reach閾値。これは、一時的にメッセージ消費を停止するために消費者のpauseメソッドを呼び出すことで(Javaの消費者の観点から話している、plsがC#クライアントで確認している)、リトライロジックを実行することによって行うことができます。ポールも引き続き呼び出すようにしてください(そうしないと、消費者はグループから追い出され、そのパーティションは再調整されます)。再試行が終わったら、レジュームメソッドを呼び出して処理を続行してください - 今度はポールメソッドがカフカのレコードを返します

+0

ありがとうございます。一時停止と再開機能を試してみましょう。私は結果をC#のクライアントで掲示する – Vinod

+0

遅れた応答のため申し訳ありません。 Confluent.Kafka c#clientにはまだポーズとレジュームのメソッドがありません。 – Vinod

関連する問題