2017-08-10 7 views
6

ストリームを2つの異なるウィンドウで集約し、コンソールに出力しようとしています。ただし、最初のストリーミングクエリのみが印刷されています。 tenSecsQはコンソールには印刷されません。スパークストラクチャードストリーミングで別々のストリーミングクエリを実行する

SparkSession spark = SparkSession 
    .builder() 
    .appName("JavaStructuredNetworkWordCountWindowed") 
    .config("spark.master", "local[*]") 
    .getOrCreate(); 

Dataset<Row> lines = spark 
    .readStream() 
    .format("socket") 
    .option("host", host) 
    .option("port", port) 
    .option("includeTimestamp", true) 
    .load(); 

Dataset<Row> words = lines 
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) 
    .toDF("word", "timestamp"); 

// 5 second window 
Dataset<Row> fiveSecs = words 
    .groupBy(
     functions.window(words.col("timestamp"), "5 seconds"), 
     words.col("word") 
    ).count().orderBy("window"); 

// 10 second window 
Dataset<Row> tenSecs = words 
    .groupBy(
      functions.window(words.col("timestamp"), "10 seconds"), 
      words.col("word") 
    ).count().orderBy("window"); 

トリガーストリーミング5秒間および10秒間の集約ストリームのクエリ。 10秒間の出力は出力されません。コンソールには5秒しか表示されません

// Start writeStream() for 5s window 
StreamingQuery fiveSecQ = fiveSecs.writeStream() 
    .queryName("5_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

// Start writeStream() for 10s window 
StreamingQuery tenSecsQ = tenSecs.writeStream() 
    .queryName("10_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

tenSecsQ.awaitTermination(); 
+0

実際、ソケットストリームの仕組みはわかりませんが、私の場合、最初のSparkストリームはソケットストリームからすべてのデータを読み込み、残りのものは何も残っていないようです。 –

答えて

5

私はこの質問を調査しています。

概要:構造化ストリーミングの各クエリは、sourceのデータを消費します。ソケットソースは、定義された各クエリに対して新しい接続を作成します。この場合に見られる動作は、ncが最初の接続に入力データを配信するだけであるためです。

以降、接続されたソケットソースが開いている各接続に同じデータを確実に渡すことができない限り、ソケット接続で複数の集約を定義することはできません。


私はこの質問をSparkメーリングリストで議論しました。 Databricksの開発者Shixiong Zhuは答えました:

スパークはクエリごとに1つの接続を作成します。あなたが観察した振る舞いは、 "nc -lk"がどのように動作するかによるものです。 netstatを使用してtcp接続を確認すると、2つのクエリを開始するときに2つの接続が表示されます。ただし、 "nc"は入力を1つの接続にのみ転送します。

私は小さな実験定義することで、この動作を検証: まず、私は、各接続のオープンと2つのクエリを宣言し、基本的な構造化されたストリーミングジョブにランダムな単語を提供SimpleTCPWordServerを作成しました。それらの間の唯一の違いは、第二のクエリは、その出力を区別するために余分な一定の列を定義することです:

val lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", "9999") 
    .option("includeTimestamp", true) 
    .load() 

val q1 = lines.writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("5 seconds")) 
    .start() 

val q2 = lines.withColumn("foo", lit("foo")).writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("7 seconds")) 
    .start() 

StructuredStreamingは1つのストリームだけを消費することになるならば、我々は両方のクエリで配信同じ言葉が表示されるはずです。各クエリが別々のストリームを消費する場合、各クエリによって異なる単語がレポートされます。

これは、観測された出力です:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+--------+-------------------+ 
| value|   timestamp| 
+--------+-------------------+ 
|champion|2017-08-14 13:54:51| 
+--------+-------------------+ 

+------+-------------------+---+ 
| value|   timestamp|foo| 
+------+-------------------+---+ 
|belong|2017-08-14 13:54:51|foo| 
+------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-------+-------------------+---+ 
| value|   timestamp|foo| 
+-------+-------------------+---+ 
| agenda|2017-08-14 13:54:52|foo| 
|ceiling|2017-08-14 13:54:52|foo| 
| bear|2017-08-14 13:54:53|foo| 
+-------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----------+-------------------+ 
|  value|   timestamp| 
+----------+-------------------+ 
| breath|2017-08-14 13:54:52| 
|anticipate|2017-08-14 13:54:52| 
| amazing|2017-08-14 13:54:52| 
| bottle|2017-08-14 13:54:53| 
| calculate|2017-08-14 13:54:53| 
|  asset|2017-08-14 13:54:54| 
|  cell|2017-08-14 13:54:54| 
+----------+-------------------+ 

私たちは、各クエリの流れが異なっていることをはっきりと見ることができます。 socket sourceによって提供されるデータに対して複数の集約を定義することは不可能であると思われます。ただし、TCPバックエンドサーバーが開いている各接続にまったく同じデータを確実に提供できることが保証されている場合を除きます。

関連する問題