は、現在のバッチcommitAsync
API経由
ないの終わりにオフセットをコミットする任意の回避策があります。
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
// Create KafkaRDD and other irrelevant code
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
キューが
commitAsync
によっていっぱいになっ
commitAll
だけポーリング
:
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
var osr = commitQueue.poll()
while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
osr = commitQueue.poll()
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
}
}
したがってどのようなメソッド呼び出しがないと、キューアップコミットするオフセット、その後、DirectKafkaInputDStream.compute
中に非同期コミットんです残念ながら、オフセットをトランザクションとしてコミットする場合は、独自のストアに別々に格納し、カフカのオフセットコミットトラッキングを使用しないでください。
あなたはトランザクションを記述しました。トランザクション処理について気にしなければ、他にも可能性がありますか? – Natalia
@Natalia少なくともセマンティクスを処理しても構わない場合は、次の反復でオフセットをコミットすることは問題ではありませんか? –
問題は監視中です。私は処理されたオフセット(遅れは決して0に達することはありません)の遅れを持っています。 – Natalia