2017-02-15 15 views
1

複数のコンシューマに同じチェックポイントの場所を使用することはできますか?私たちはダイレクトストリーミングのアプローチを使用しています。複数のコンシューマに同じチェックポイントの場所を使用する - スパーク直接ストリーミング

コードサンプル:私は、複数の消費者のために同じチェックポイントの場所を使用する場合

ssc.checkpoint(checkpointDirectory) 
val dstream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet1).map(_._2) 
val dstream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet2).map(_._2) 

があり、いずれの問題になるだろうか?我々は2つの異なる消費者を持っている理由は、両方が異なるデータと異なるトピックです。

チェックポイントの場所では、トピックのオフセットごとに個別のディレクトリを作成する予定ですか?

答えて

1

チェックポイントの場所では、トピックのオフセットごとに個別のディレクトリ を作成する予定ですか?

これは問題があります。 PREFIXcheckpoint-ある

new Path(checkpointDir, PREFIX + checkpointTime.milliseconds) 

:あなたがチェックポイント用のディレクトリを作成すると、データは次の形式で保存されます。

-rw-r--r-- 1 spark spark 9434 Feb 14 17:59 checkpoint-1487095188000 
-rw-r--r-- 1 spark spark 9456 Feb 14 17:59 checkpoint-1487095188000.bk 
-rw-r--r-- 1 spark spark 9423 Feb 14 17:59 checkpoint-1487095192000 
-rw-r--r-- 1 spark spark 9443 Feb 14 17:59 checkpoint-1487095192000.bk 
-rw-r--r-- 1 spark spark 9426 Feb 14 17:59 checkpoint-1487095196000 
-rw-r--r-- 1 spark spark 9446 Feb 14 17:59 checkpoint-1487095196000.bk 

ここで私のバッチ間隔は4000ミリ秒です。

ストリームは共通のStreamingContextで動作するため、両方が同じバッチ間隔を使用しており、互いに他のファイルをオーバーライドしようとします。さらに悪いことに、間違ってもう一方のストリームデータを回復しようとする可能性があります。

ストリームごとに個別のチェックポイントディレクトリを使用します。

複数のトピックを持つ単一の消費者がいる場合はどうなりますか?同じStreamingContextは、すべてのトピックに関連するすべてのチェックポイントを担当している、そしてそれが安全であるよりも、あなたは、単一の消費者を持っている場合は

、それをやっても問題はないはず。

+0

ありがとうございます。複数のトピックを持つ単一のコンシューマをお持ちの場合はどうなりますか?複数のトピックとチェックポイントを持つ消費者がいる別の仕事があるので、これについてもあなたの質問を更新してください。 – Shankar

+0

@Shankarが更新されました。 –

+0

あなたの詳細な回答をありがとう、私は多くのことをあなたの答えからストリーミングに関連して学んだ... – Shankar

関連する問題