Spark 2.1.1で実行中のストリーミングジョブがKafka 0.10をポーリングしています。 Spark KafkaUtilsクラスを使用してDStreamを作成していますが、保持ポリシーのためにトピックから経過したデータがあるまでは、すべて正常に動作しています。私の問題は、データがトピックから老化した場合に私の仕事を止めて、私のオフセットが範囲外であると言うエラーを受け取った場合に起こります。私はsparkのソースコードを見ることを含む多くの研究を行いました。この問題のコメントのようなコメントがたくさんあります:SPARK-19680 - 基本的にはデータを黙って失わないはずです - したがってauto.offset.resetはsparkによって無視されます。私の大きな疑問は、今私は何ができるのでしょうか?私の話題はスパークではポーリングされません。スタートアップ時にはオフセットの例外が発生します。オフセットをリセットする方法がわからないので、私の仕事はちょうど再び始まるでしょう。私は、チェックポイントを有効にしていないのは、これらのものがこの用途では信頼できないということです。私は、オフセットを管理するためのコードの多くを持っていましたが、すべてのコミットがある場合に火花が要求されたオフセットを無視することが表示されますので、私は現在、このようなオフセットを管理しています:私は私を変えてきた問題を回避するにはSpark Streaming from Kafkaトピックがストリームを再開するオプションなしで範囲外にスローされる
val stream = KafkaUtils.createDirectStream[String, T](
ssc,
PreferConsistent,
Subscribe[String, T](topics, kafkaParams))
stream.foreachRDD { (rdd, batchTime) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Log.debug("processing new batch...")
val values = rdd.map(x => x.value())
val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist
consumer.processDataset(incomingFrame, batchTime)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()
をグループIDは本当に不自由です。私はこれが予想される動作であることを知っていて、起こらないはずです。ストリームを再度実行する方法を知る必要があります。どんな助けもありがとう。
私は最初にこれを試みました。私は、KafkaUtilsクラスが、あなたがあまりにも無意味であるとみなしてそのパラメータを空白にしていると読んでいます: 17/10/06 15:03:55 WARN KafkaUtils:実行者のためにauto.offset.resetをnoneにオーバーライドします – absmiths