2017-10-06 4 views
1

Spark 2.1.1で実行中のストリーミングジョブがKafka 0.10をポーリングしています。 Spark KafkaUtilsクラスを使用してDStreamを作成していますが、保持ポリシーのためにトピックから経過したデータがあるまでは、すべて正常に動作しています。私の問題は、データがトピックから老化した場合に私の仕事を止めて、私のオフセットが範囲外であると言うエラーを受け取った場合に起こります。私はsparkのソースコードを見ることを含む多くの研究を行いました。この問題のコメントのようなコメントがたくさんあります:SPARK-19680 - 基本的にはデータを黙って失わないはずです - したがってauto.offset.resetはsparkによって無視されます。私の大きな疑問は、今私は何ができるのでしょうか?私の話題はスパークではポーリングされません。スタートアップ時にはオフセットの例外が発生します。オフセットをリセットする方法がわからないので、私の仕事はちょうど再び始まるでしょう。私は、チェックポイントを有効にしていないのは、これらのものがこの用途では信頼できないということです。私は、オフセットを管理するためのコードの多くを持っていましたが、すべてのコミットがある場合に火花が要求されたオフセットを無視することが表示されますので、私は現在、このようなオフセットを管理しています:私は私を変えてきた問題を回避するにはSpark Streaming from Kafkaトピックがストリームを再開するオプションなしで範囲外にスローされる

val stream = KafkaUtils.createDirectStream[String, T](
    ssc, 
    PreferConsistent, 
    Subscribe[String, T](topics, kafkaParams)) 

stream.foreachRDD { (rdd, batchTime) => 
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    Log.debug("processing new batch...") 

    val values = rdd.map(x => x.value()) 
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist 

    consumer.processDataset(incomingFrame, batchTime) 
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) 
} 
ssc.start() 
ssc.awaitTermination() 

をグループIDは本当に不自由です。私はこれが予想される動作であることを知っていて、起こらないはずです。ストリームを再度実行する方法を知る必要があります。どんな助けもありがとう。

答えて

0

早い

auto.offset.reset =最新

それとも

auto.offset.reset =早い

をお試しください:自動的早いにオフセットリセット

をオフセット最新:オフセットを最新のオフセットに自動的にリセットする

なし:コンシューマのグループに以前のオフセットが見つからない場合、コンシューマに例外をスローする。

他のもの:コンシューマに例外をスローする。

最小値と最大値に対応するオフセット値に影響するもう1つの点は、ログ保持ポリシーです。保持時間が1時間に設定されているトピックがあるとします。 10個のメッセージを作成し、1時間後にさらに10個のメッセージを投稿します。最大のオフセットは同じままですが、最小のものは0にできません。なぜなら、カフカはこれらのメッセージを既に削除しているため、最小のオフセットは10になるからです。

+0

私は最初にこれを試みました。私は、KafkaUtilsクラスが、あなたがあまりにも無意味であるとみなしてそのパラメータを空白にしていると読んでいます: 17/10/06 15:03:55 WARN KafkaUtils:実行者のためにauto.offset.resetをnoneにオーバーライドします – absmiths

関連する問題