私はこの全体のカフカ/スパーク事に新たなんです。私はSpark Streaming(PySpark)がKafkaプロデューサーからデータを取り込んでいます。それは1分で正常に実行され、その後常にkafka.common.OffsetOutOfRangeException
をスローします。 Kafkaのコンシューマはバージョン0.8です(PySparkの場合、0.10はサポートされていません)。 AWS Ubuntu 14.04に3人の作業者を持つマスターノードがあります。私はこれが関係しているかどうかは分かりませんが、Kafkaのログは比較的大きく(1〜10kb)、プロデューサ/ブローカ/コンシューマの設定を調整しました。データは、プロデューサーがおそらく生産していると思うものよりも遅いかもしれませんが(おそらくこれは問題の原因かもしれません)、細かい部分を通過しています。 Kafka OffsetOutOfRangeExceptionカフカ+スパークストリーミング:kafka.common.OffsetOutOfRangeException
しかし、私の滞留時間は一時間で、サイズは、各ノードのserver.properties
で1ギガバイトで、そしてもっと重要なのは、スパークの時間に変化はありません。
同様の問題は、ここでの保持時間/サイズを増やすことで解決しました障害と設定された保持時間/サイズ。
調整のために、多分コンフィグストリーミングスパーク上の他の可能性はありますか?私がオンラインで見るすべての答えは、カフカのプロビジョニングと関係がありますが、私の場合には違いはありません。
EDIT 1:私が試した)生産者からの読み取り複数のストリームを有し、B)time.sleep(1.0)
とプロデューサストリーム自体を遅くします。どちらも永続的な効果はありませんでした。
すなわち
n_secs = 1
ssc = StreamingContext(sc, n_secs)
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-video-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'}) for _ in range(n_streams)]
stream = ssc.union(*kds)
あなたが0.8、右に新しい消費者を使用しているように見えますか?私はzk接続の代わりにブートストラップサーバによってこれを推測しています。どのようにオフセットをコミットしますか? – dawsaw
@dawsaw自動的にコミットされますが、週末にはSpark Streamingのバックプレッシャーの問題であると私は確信しています。 – thefourtheye