0

私はこの全体のカフカ/スパーク事に新たなんです。私はSpark Streaming(PySpark)がKafkaプロデューサーからデータを取り込んでいます。それは1分で正常に実行され、その後常にkafka.common.OffsetOutOfRangeExceptionをスローします。 Kafkaのコンシューマはバージョン0.8です(PySparkの場合、0.10はサポートされていません)。 AWS Ubuntu 14.04に3人の作業者を持つマスターノードがあります。私はこれが関係しているかどうかは分かりませんが、Kafkaのログは比較的大きく(1〜10kb)、プロデューサ/ブローカ/コンシューマの設定を調整しました。データは、プロデューサーがおそらく生産していると思うものよりも遅いかもしれませんが(おそらくこれは問題の原因かもしれません)、細かい部分を通過しています。 Kafka OffsetOutOfRangeExceptionカフカ+スパークストリーミング:kafka.common.OffsetOutOfRangeException

しかし、私の滞留時間は一時間で、サイズは、各ノードのserver.propertiesで1ギガバイトで、そしてもっと重要なのは、スパークの時間に変化はありません。

同様の問題は、ここでの保持時間/サイズを増やすことで解決しました障害と設定された保持時間/サイズ。

調整のために、多分コンフィグストリーミングスパーク上の他の可能性はありますか?私がオンラインで見るすべての答えは、カフカのプロビジョニングと関係がありますが、私の場合には違いはありません。

EDIT 1:私が試した)生産者からの読み取り複数のストリームを有し、B)time.sleep(1.0)とプロデューサストリーム自体を遅くします。どちらも永続的な効果はありませんでした。

すなわち

n_secs = 1 
ssc = StreamingContext(sc, n_secs) 
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], { 
        'bootstrap.servers':'localhost:9092', 
        'group.id':'test-video-group', 
        'fetch.message.max.bytes':'15728640', 
        'auto.offset.reset':'largest'}) for _ in range(n_streams)] 

stream = ssc.union(*kds) 
+0

あなたが0.8、右に新しい消費者を使用しているように見えますか?私はzk接続の代わりにブートストラップサーバによってこれを推測しています。どのようにオフセットをコミットしますか? – dawsaw

+0

@dawsaw自動的にコミットされますが、週末にはSpark Streamingのバックプレッシャーの問題であると私は確信しています。 – thefourtheye

答えて

0

それが1Gは、各ブローカーでは十分ではありませんように、あなたのプロデューサーが速すぎて、あまりにも多くのメッセージを生成することは可能ですか? 1Gはすべての現実において非常に低いようです。 Spark Streamingがマイクロバッチで処理する必要があるオフセット範囲を決定し、オフセットに基づいてブローカからメッセージを取得しようとすると、サイズ制限によってメッセージが消えてしまいます。ブローカーのサイズを100Gのように大きくして、問題が解決するかどうか確認してください。

+0

私は、Spark Streamingが追いついていないメッセージを生成しているのはほぼ確実です。しかし、私はカフカのプロビジョニングを変える代わりに、私が生産しているものやスパークの設定を変更する必要があると思います。 – thefourtheye