1

ウィンドウ期間を使用して構造化ストリーミング集約を実行したいと考えています。これに続くデータスキーマを考えると、目的は、ユーザーに基づいて最新のイベントによってフィルタリングすることです。次に、各ロケーションのイベントタイプの数を集計します。スパークストラクチャードストリーミング - 最新の集計での重複排除の方法

time location user type 
1  A   1  one 
2  A   1  two 
1  B   2  one 
2  B   2  one 
1  A   3  two 
1  A   4  one 

出力例:

​​

のようなもの次

val aggTypes = df 
    .select($"location", $"time", $"user", $"type") 
    .groupBy($"user") 
    .agg(max($"timestamp") as 'timestamp) 
    .select("*") 
    .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds") 
    .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location") 
    .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo))) 
    .drop($"window") 

ように構成されたストリーミングは、ストリーミングデータフレームでサポートされていない複数の集計と非時間ベースのウィンドウをサポートしていません。 /データセット。私はそれが1つのストリーミングクエリで所望の出力を達成することが可能かどうかはわかりません。

何か助けていただければ幸いです。

答えて

0

ステートレスアグリゲーションをしようとしているようです。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroups(org.apache.spark.api.java.function.FlatMapGroupsFunction,%20org.apache.spark.sql.Encoder)

flatMapGroupsは、データセットの各グループに関数を適用する集約APIです。グループ化されたdataset.flatMapGroupsではシャッフルオーバーヘッドを増加させる部分集計はサポートされません。したがって、このAPIを使用するのは、メモリに収まる小さなバッチ集計だけです。また、reduce関数またはAggregatorを使用することをお勧めします。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/expressions/Aggregator.html

val count = words.groupByKey(x => x) 
      .flatMapGroups 
      { 
       case (x, iterator) ⇒ Iterator((x, iterator.length)) 
       }.toDF("x", "count")   


count.writeStream.format("console").outputMode(OutputMode.Append()) 
関連する問題