2016-10-11 9 views
1

糸クラスターでは、入力としてkafka directstream(例:バッチ時間は15秒)を使用し、入力msgを別々のuserIdsに集約したいとします。 私はupdateStateByKeyまたはmapWithStateのようなステートフルなストリーミングAPIを使用しますが、APIソースからはmapWithStateのデフォルトのチェックポイントの持続時間はbatchduration * 10(私の場合は150秒)で、カフカダイレクトストリームではパーティションオフセットがチェックポイントされていますすべてのバッチ(15秒)。実際に、すべてのdstreamは異なるチェックポイントの持続時間を設定できます。 Kafka Direct InputDstreamとステートフルストリーム変換を使用しているときのチェックポイントの再認識を理解するにはどうすればよいですか?

ストリーミングアプリケーションがクラッシュしたとき、私はそれを再起動します。カフカオフセットとステートストリームrddはチェックポイントで非同期です。この場合、データを失わないようにするにはどうすればよいですか?あるいは私はチェックポイントメカニズムを誤解していますか?

答えて

1

データを失わないようにするにはどうすればよいですか?このようmapWithStateupdateStateByKeyなど

ステートフルストリームは、それは、彼らがどのように動作するかの一部だから、あなたがチェックポイントディレクトリを提供するために、必要が、彼らはすべての中間がクラッシュ時の状態を復元できるようにする状態を格納します。

これ以外にも、チェインの各DStreamは自由にチェックポイントを要求することができます。質問には「他のストリームをチェックポイントする必要がありますか」という質問があります。

アプリケーションがクラッシュした場合、Sparkはチェックポイント内に格納されているすべての状態RDDを取得し、メモリに戻します。そのため、最後にスパークチェックポイントがチェックポイントされたときと同じ状態です。あなたのアプリケーションコードを変更すると、はチェックポイントから状態を回復できないので、それを削除する必要があります。つまり、バージョンのアップグレードが必要な場合は、バージョン管理を可能にする方法で手動で保存しない限り、以前に状態に保存されていたすべてのデータが失われます。

+0

私の場合は、アプリケーションのバージョンを更新する際に問題を処理しましたが、実際に知りたいことは次のとおりです。たとえば、開始時刻は0、バッチ時刻は15秒、mapWithStateチェックポイント間隔は150秒それで、ストリーミングアプリが200秒にクラッシュした場合、再コンパイルせずに再起動すると、ステートフルなストリームが150秒にリカバリされ、kafkaは195秒に時間を記録して回復します。私は165秒、180秒でカフカ入力データを失いますか?本当に、これを避けるにはどうすればいいですか?ありがとう〜 –

+0

@HengSuチェックポイントが150秒であった場合、復旧時には150から前にやり直します。つまり、すでに処理されているデータを処理する可能性がありますが、それがチェックポイント機能の仕組みです。カフカオフセットも、チェックポイントされたデータの中に保存されます。 –

+0

@ Yuval Itzchakovごめんなさい、私はちょっと混乱しました。カフカオフセットは、すべてのバッチ(15秒)でチェックポイントされるのは正しいですか?MapWithStateDStreamが150秒のデータを回復するとき、カフカオフセットも150秒だから私はデータを失うことはありませんでしたが、いくつかのバッチを再処理するだけですか? –

関連する問題