私はカフカのトピックから受信して処理が完了した後にストリームの処理を終了したいと思います。停止は、(awaitTerminationOrTimeout)のような特定の時間であってはなりません。トピックが消耗した後にsparkstreamingcontextを停止する方法はありますか? Dstream [T]をT値と比較して制御フローを指示する方法はありますか?stop sparkストリーミングコンテクストkafkaDirectStream
1
A
答えて
0
私はisEmpty
がtrueを返すと、ストリームが空の場合headOption
はになしてはならないことを約80%cetainです。
0
ストリームの読み込みを開始する前に、トピック内のすべてのパーティションの最新のオフセットを取得してから、受信したオフセットがいつ取得されたかを確認するのが最善の方法です。トピックのオフセットを取得する方法については、previous answerを参照してください。
フローはビーイングを終わる:
- あなたが早い を返す
OffsetRequest
を行い、各パーティションについてSimpleConsumer
- を作成し、各ブローカのトピックのパーティションやブローカー
- を取得最新のオフセット(前回の回答を参照)
- メッセージを読むときに、 の受信メッセージのオフセットを確認して、パーティションの既知の最後のオフセットである
- すべてのオフセットは、パーティションごとに受信したら、あなたの
OffsetRequest
あなたがkafkaStream.foreachRDDの
関連する問題
- 1. docker stop spark containerが終了しない
- 2. Stop setInterval
- 3. stop tableviewcontrollerスクロール
- 4. Android force stop apps
- 5. ProgressDialog cant stop
- 6. vba start stop timer?
- 7. stop START_STICKYサービス
- 8. ゴースレッド - STOP実行
- 9. C#Stop BackgroundWorker
- 10. java swing stop function
- 11. jQuery stopクリックスパム
- 12. dotnetnuke stop personabar sliding
- 13. Gradle stop building task
- 14. jQuery stop animationメソッド
- 15. android firebase stop ChildEventListener
- 16. Jquery .Stop()slowdown
- 17. install4j stop unixサービスランチャータイムアウト
- 18. Gps location listener stop
- 19. slideToggle issue with stop()
- 20. Start stop JMSメッセージリスナー
- 21. RxJava:CountDownLatch never stop
- 22. magento stop checkout
- 23. htaccess stop processing rules
- 24. jQueryのスライドメニュー - .stop()
- 25. Adobe Air stop();
- 26. jQuery Stop Animation
- 27. Scanner + System.in stop condition
- 28. bash expect *** after stop ***
- 29. Firebase stop listening onAuthStateChanged
- 30. Supervisord - NGINX stop OSError
rdd.isEmptyはtrueを返しません行われにreceieved 最新のと同じです。しかし、sparkStreamingContext.stopでアプリケーションを停止することはありません。 –
'future({()=> while(!kafkaMessageStream.isEmpty){ Thread.sleep(100)} sparkTreamingContext。 (真) }) ' これは100ミリ秒ごとにメッセージがあるかどうかをチェックし、メッセージがない場合は停止します。 –
偽陽性を得ることができます。たとえば、ブローカがバッチ間隔よりも長い時間忙しくなった場合などです。 –