2017-12-27 7 views
1

バージョン:スパーク:バッチの終わりにカフカオフセットをコミット

  • スパーク2.2
  • カフカ0.11

Iを使用する必要がカフカのオフセットをコミットするdocumentationに従って:

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

結果として、オフセットはsta次のバッチのrt。それは「一定の」遅れを引き起こす。

現在のバッチの最後にオフセットをコミットする(ただし、引き続き同じkafkaグループをオフセットに使用する)回避策はありますか?ラグモニタリングの

例: enter image description here

答えて

1

は、現在のバッチcommitAsync API経由

ないの終わりにオフセットをコミットする任意の回避策があります。

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { 
    val untilOffsets = clamp(latestOffsets()) 

    // Create KafkaRDD and other irrelevant code 

    currentOffsets = untilOffsets 
    commitAll() 
    Some(rdd) 
} 
キューが commitAsyncによっていっぱいになっ commitAllだけポーリング

protected def commitAll(): Unit = { 
    val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() 
    var osr = commitQueue.poll() 
    while (null != osr) { 
    val tp = osr.topicPartition 
    val x = m.get(tp) 
    val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } 
    m.put(tp, new OffsetAndMetadata(offset)) 
    osr = commitQueue.poll() 
    } 
    if (!m.isEmpty) { 
    consumer.commitAsync(m, commitCallback.get) 
    } 
} 

したがってどのようなメソッド呼び出しがないと、キューアップコミットするオフセット、その後、DirectKafkaInputDStream.compute中に非同期コミットんです残念ながら、オフセットをトランザクションとしてコミットする場合は、独自のストアに別々に格納し、カフカのオフセットコミットトラッキングを使用しないでください。

+0

あなたはトランザクションを記述しました。トランザクション処理について気にしなければ、他にも可能性がありますか? – Natalia

+0

@Natalia少なくともセマンティクスを処理しても構わない場合は、次の反復でオフセットをコミットすることは問題ではありませんか? –

+0

問題は監視中です。私は処理されたオフセット(遅れは決して0に達することはありません)の遅れを持っています。 – Natalia

関連する問題