私はspark構造化ストリーミングクエリを停止する必要があります。まずは、カフカで処理できるデータがあるかどうかチェックします。はいの場合は処理してからクエリを停止します。Kafkaトピックでレコードが利用できない場合、StreamingQueryStatus.isDataAvailableがオンになるのはなぜですか?
if (query.status.isDataAvailable) {
query.processAllAvailable()
}
query.stop()
私はJacek Laskowskiの提案を受けて、実装を変更しました。 私は、ストリーミングクエリのリストを持って、クエリのすべての停止後
streamingQueryList.foreach{query =>
query.stop()
}
、それは次の例外スロー:目的をデバッグするため
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:621)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:179)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:164)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:164)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:164)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
at org.apache.spark.
を、私はから現在のトピックを知るために、クエリのrecentProgress
を印刷していますスパークは読書です。そして、私はコンソールコンシューマを使ってそのトピックをチェックしました。そして、kafkaにはデータはありませんが、query.status.isDataAvailable
が本当に印刷します。また、たとえ最近のProgressが空であっても、データの処理を待っています。そのため、無限に待っています。
お返事ありがとうございます。メインコードでprocessAllAvailableを使用しました。基本的に私の使用例では、sparkはkafkaからデータを読み込んでS3に書き込んでいます。そして途中で、実行中のストリームクエリを停止する呼び出しが来ます。だからまず、すべての利用可能なデータがS3に書き込まれていることを確認する必要があります。それが私がこのコードスニペットを使った理由です。同じ達成するためのより良いアプローチがある場合。私を教えてください。そして、どうすればqueryAwaitTerminationを使ってそれを行うことができますか? –
処理するデータがあるかどうかわかりません。 Sparkは、処理されるデータと実際に処理されるデータがあることをSparkが知っている間にこの遅延があるので、情報を提供しません。 Sparkは、データを処理するトリガが開始されたときに処理するためのデータがあることを認識しています。そのため、現在のトリガーが終了した直後にクエリを停止させることができるので、 'stop'で十分であるはずだから、Sparkが知っているすべてのデータは既に処理されていると思います。 –
ありがとうJacek、私はそれを試しました。今は仕事ですが、私は質問をしました。やめる。それはjava.lang.InterruptedExceptionを投げた。プロセスが完了するのを待っている場合、なぜこのエラーがスローされていますか? –