0

を複製onQueryProgress、私が見つかりました。私は1つのメッセージを送るなぜ結果が火花が

Query made progress: { 
    "id" : "e76a8789-738c-49f6-b7f4-d85356c28600", 
    "runId" : "d8ce0fad-db38-4566-9198-90169efeb2d8", 
    "name" : null, 
    "timestamp" : "2017-08-15T07:28:27.077Z", 
    "numInputRows" : 1, 
    "processedRowsPerSecond" : 0.3050640634533252, 
    "durationMs" : { 
    "addBatch" : 2452, 
    "getBatch" : 461, 
    "queryPlanning" : 276, 
    "triggerExecution" : 3278 
    }, 
    "stateOperators" : [ ], 
    "sources" : [ { 
    "description" : "KafkaSource[Subscribe[test1]]", 
    "startOffset" : { 
     "test1" : { 
     "0" : 19 
     } 
    }, 
    "endOffset" : { 
     "test1" : { 
     "0" : 20 
     } 
    }, 
    "numInputRows" : 1, 
    "processedRowsPerSecond" : 0.3050640634533252 
    } ], 
    "sink" : { 
    "description" : "[email protected]" 
    } 
} 
Query made progress: { 
    "id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7", 
    "runId" : "8caea640-8772-4aab-ab13-84c1e952fb77", 
    "name" : null, 
    "timestamp" : "2017-08-15T07:28:27.075Z", 
    "numInputRows" : 1, 
    "processedRowsPerSecond" : 0.272108843537415, 
    "durationMs" : { 
    "addBatch" : 2844, 
    "getBatch" : 445, 
    "queryPlanning" : 293, 
    "triggerExecution" : 3672 
    }, 
    "stateOperators" : [ ], 
    "sources" : [ { 
    "description" : "KafkaSource[Subscribe[test1]]", 
    "startOffset" : { 
     "test1" : { 
     "0" : 19 
     } 
    }, 
    "endOffset" : { 
     "test1" : { 
     "0" : 20 
     } 
    }, 
    "numInputRows" : 1, 
    "processedRowsPerSecond" : 0.272108843537415 
    } ], 
    "sink" : { 
    "description" : "[email protected]" 
    } 
} 

あるonQueryProgress

override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { 

     if(queryProgress.progress.numInputRows!=0) { 

      println("Query made progress: " + queryProgress.progress) 

     } 

上で重複し、それは二つの異なる結果を持っています。

ps: 1.私の主なプログラムの問題は、00:00-00:05,00:05-00:10のように、毎回5分ごとにcalデータを使用する必要があります。私の考えは、構造化されたストリーミングを使用して特定のデータをフィルタリングすることであり、フィルタリングはデータベースを格納することではなく、次にデータベースと構造化ストリーミングを読み込むことです。
データベースを読む時間を更新するバッチ。

答えて

0

これらの2つのイベントは、異なるクエリからのものです。 idrunIdが異なることがわかります。

+0

ああ、ありがとう、私は2つの書き込みストリームを持っています。特定のストリーミングにリスナーを追加する方法 – Aaron