2017-07-07 9 views
1

InputDStreamを使用して(並列化されていない)ストリーミングを実行するために、Apache Sparkの長時間実行されるストリーミングジョブを設定しています。Apache Sparkストリーミング - タイムアウトの長時間実行のバッチ

私が達成しようとしているのは、キューのバッチが(ユーザー定義のタイムアウトに基づいて)時間がかかり過ぎると、バッチをスキップして完全に放棄できるようにしたいということです。実行。

spark APIまたはオンラインでこの問題の解決策を見つけることができませんでした - StreamingContext awaitTerminationOrTimeoutを調べましたが、これはタイムアウト時にStreamingContext全体を強制終了しますが、私がしたいのはスキップ/キルです現在のバッチ。

また、mapWithStateを使用すると考えられましたが、これはこの使用例には当てはまりません。最後に、私はStreamingListenerを設定し、バッチが開始されたときにタイマーを開始し、一定のタイムアウトしきい値に達したときにバッチを停止/スキップ/終了させることを検討していましたが、バッチを終了する方法はまだありません。

ありがとうございます!

+0

なぜ、mapWithStateがここに当てはまりませんか?バッチ上でセッションを作成するのと同じですか?このようなもの? – user1452132

+0

さて、私はペアのDStreamsで作業していません。理論的には、もし私がいたら、私はAPIについても不明であった - 私はキーのタイムアウトを設定すると、これは私が望む(バッチ内の仕事をスキップする)ことでしょうか? –

+0

これは実現するのが難しいかもしれません。リスナーは、ジョブの実行時間を監視する手段を提供しますが、取り消すことは難しいと思われます。私は(ジョブスケジューラ)[https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L47]を調べました。バッチの結果を却下するAPIフックを見ることができません。もしあなたが本当に必要としているのであれば、デッドラインキャンセルポリシーを実装するためにコードにパッチを当てる必要があると思います。 – maasg

答えて

0

私はyelpからいくつかのドキュメントを見ましたが、私はそれを自分でやっていません。イベントが最初に見られていると状態は、それが

def update_function(new_events, current_state): 
    if current_state is None: 
     current_state = init_state() 
     attach_expire_datetime(new_events) 
     ...... 
    if is_expired(current_state): 
     return None //current_state drops? 
    if new_events: 
     apply_business_logic(new_events, current_state) 

を満了した場合

  • これはそのように見える状態をドロップ初期化されるときUpdateStateByKey(update_func)またはmapWithState(stateSpec)

    1. を使用して

      はタイムアウトを添付します構造化ストリーミングウォーターマークは、タイムアウトしたときにイベントをドロップします。これがジョイに適用される場合bs/stagesタイムアウトの低下。

  • 関連する問題