2016-12-27 10 views
2

これはError in Kafka Streams using kafka-node - negative timestampの複製である可能性がありますが、確かにそうではありません。私のKafka Streamsアプリケーションは、各メッセージに対していくつかの変換ロジックを行い、それを新しいトピックに転送します。時間ベースの集計/処理はアプリにはないので、カスタムのタイムスタンプ抽出機能を使用する必要はありません。このアプリは数日間正常に動作していましたが、突然アプリが負のタイムスタンプ例外をスローしました。数時間の流れにさらなる進展がなかったとしてStreamsException:抽出されたタイムスタンプ値が負であり、許可されていません

Exception in thread "StreamThread-4" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed. 

は全てStreamThreads(合計10)からこの例外をスローした後、アプリケーションは、一種の凍結しました。その後、例外は発生しませんでした。アプリを再起動すると、新しく送信されるメッセージだけが処理され始めました。

ここで問題となるのは、例外をスローしてからアプリケーションを再起動するまでの間に発生したメッセージに何が起こったかです。その場合、メッセージにはタイムスタンプが組み込まれていません(ブローカーとプロデューサに変更が加えられていないため不可能です)。そのようなメッセージごとに例外をスローする必要がありますか?または、最初にメッセージの負のタイムスタンプを検出したときに、ストリームの進行を止めないようにしますか?この状況を処理して、負のタイムスタンプを検出した後でもアプリケーションがストリームを進行できるようにする方法はありますか?私のアプリはKafka Streamsライブラリのバージョン0.10.0.1-cp1を使用しています。

注:私は簡単に各メッセージの否定的なタイムスタンプをチェックできるカスタムタイムスタンプ抽出プログラムを置くことができますが、それは私のアプリケーションにとって不必要なオーバヘッドです。私が理解したいのは、負のタイムスタンプを持つメッセージを検出した後にストリームが進まなかった理由です。

答えて

3

カフカストリームアプリケーションは、タイムスタンプが異なるパーティションのレコードの処理順序を決定するために使用されるため、タイムスタンプ抽出プログラムから返されたタイムスタンプが有効かどうかをチェックし、すべてのパーティションは時間ベースの整合方法で消費されます。

負のタイムスタンプが検出された場合、アプリケーション(または実際には対応するスレッド)が終了します。残念ながら、現在、このような例外から回復することは不可能であり、アプリケーションを再起動する必要があります。関連項目:Confluent FAQs:http://docs.confluent.io/3.1.1/streams/faq.html#invalid-timestamp-exception

アプリケーションが終了して再起動すると、中断したところで処理を再開します。残念ながら、Kafka 0.10.0.1では、バグ(次のリリースで修正される0.10.2)があり、失敗した場合、不正なオフセットがコミットされ、アプリケーションがいくつかのレコードを「ステップオーバー」する可能性があります。私はこれがあなたのケースで起こったと仮定し、無効なタイムスタンプを持つレコードがいくつかしかない場合、それらのレコードはスキップされ、再起動後にアプリケーションを再開できるようになっている可能性があります。この動作は実際にはバグです。バグがなければ、Kafka Streamは無効なタイムスタンプを持つレコードを何度も処理しようとし、有効なタイムスタンプを返すことで問題を解決するカスタムタイムスタンプ抽出機能を提供するまで毎回失敗します。

どのようにそれを修正する:

正しい修正が無効(すなわち、負の)タイムスタンプを返すことはありませんん、カスタムのタイムスタンプ抽出を提供するだろう。

なぜ無効なタイムスタンプがあるのか​​説明がありません...これは非常に奇妙なことです。プロデューサの設定を調査して、プロデューサがタイムスタンプを入れて無効にする可能性がある場合はこれは起こりそうもない - 私は、問題の根本的な原因が何であるか他には考えていない)。

さらに発言:次のリリース(0.10.2)で

、無効なタイムスタンプを処理することは簡素化され、カフカストリームは異なり、無効なタイムスタンプを持つレコードを扱うより内蔵のタイムスタンプ抽出を提供しています。たとえば、これにより、エラー(現在の動作)を発生させるのではなく、タイムスタンプが無効なレコードを自動スキップすることができます。詳細については、KIP-93を参照してください。https://cwiki.apache.org/confluence/display/KAFKA/KIP-93%3A+Improve+invalid+timestamp+handling+in+Kafka+Streams

+0

@ matthias-j-sax、このような詳細な返信をありがとうございます。一時的な回避策として、カスタムタイムスタンプ抽出プログラムを実装しています。 – Samy

関連する問題