2017-11-13 10 views
1

私はspark構造化ストリーミングクエリを停止する必要があります。まずは、カフカで処理できるデータがあるかどうかチェックします。はいの場合は処理してからクエリを停止します。Kafkaトピックでレコードが利用できない場合、StreamingQueryStatus.isDataAvailableがオンになるのはなぜですか?

if (query.status.isDataAvailable) { 
        query.processAllAvailable() 
        } 
query.stop() 

私はJacek Laskowskiの提案を受けて、実装を変更しました。 私は、ストリーミングクエリのリストを持って、クエリのすべての停止後

streamingQueryList.foreach{query => 
query.stop() 
} 

、それは次の例外スロー:目的をデバッグするため

java.lang.InterruptedException 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) 
    at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:621) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:179) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:164) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:164) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:164) 
    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123) 
    at org.apache.spark. 

を、私はから現在のトピックを知るために、クエリのrecentProgressを印刷していますスパークは読書です。そして、私はコンソールコンシューマを使ってそのトピックをチェックしました。そして、kafkaにはデータはありませんが、query.status.isDataAvailableが本当に印刷します。また、たとえ最近のProgressが空であっても、データの処理を待っています。そのため、無限に待っています。

答えて

1

なぜisDataAvailabletrueになるのか分かりませんが、processAllAvailableはブロックif (query.status.isDataAvailable)で狙っているソリューションを提供します。

processAllAvailableブロックのソースで利用可能なすべてのデータが処理され、シンクにコミットされるまで。

これはあなたの使用例です。ではありませんか?

ただし、"このメソッドはテスト用です。"として"このメソッドは永遠にブロックするかもしれません"processAllAvailableのscaladocを引用)。

メインスレッドをブロックして(ストリームアプリケーションが起動しているように)、stopクエリ(およびそのバックグラウンドスレッド)を別の「監視」スレッドから都合のよい状態に保つために、代わりにawaitTerminationを使用する必要があります。

awaitTermination()このクエリの終結待ち、query.stop()によってまたは例外のいずれかによって。クエリが例外で終了した場合、例外がスローされます。

+0

お返事ありがとうございます。メインコードでprocessAllAvailableを使用しました。基本的に私の使用例では、sparkはkafkaからデータを読み込んでS3に書き込んでいます。そして途中で、実行中のストリームクエリを停止する呼び出しが来ます。だからまず、すべての利用可能なデータがS3に書き込まれていることを確認する必要があります。それが私がこのコードスニペットを使った理由です。同じ達成するためのより良いアプローチがある場合。私を教えてください。そして、どうすればqueryAwaitTerminationを使ってそれを行うことができますか? –

+0

処理するデータがあるかどうかわかりません。 Sparkは、処理されるデータと実際に処理されるデータがあることをSparkが知っている間にこの遅延があるので、情報を提供しません。 Sparkは、データを処理するトリガが開始されたときに処理するためのデータがあることを認識しています。そのため、現在のトリガーが終了した直後にクエリを停止させることができるので、 'stop'で十分であるはずだから、Sparkが知っているすべてのデータは既に処理されていると思います。 –

+0

ありがとうJacek、私はそれを試しました。今は仕事ですが、私は質問をしました。やめる。それはjava.lang.InterruptedExceptionを投げた。プロセスが完了するのを待っている場合、なぜこのエラーがスローされていますか? –