2

私は毎分スパークストリーミング(Kafkaからの読書)を使用していくつかのメトリックを集約して見つけようとしています。その分のデータを集計することができます。どのように私は今日のバケツを持つことができ、その日のすべての分のすべての集計値を合計することができますか?スパークストリーミングのさまざまなマイクロバッチからの集計データ

私はデータフレームを持っており、私はこれに似た何かをやっています。

sampleDF = spark.sql("select userId,sum(likes) as total from likes_dataset group by userId order by userId") 

答えて

1

あなたは

サンプルコード

import spark.implicits._ 

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } 

    val windowedCounts = words 
     .withWatermark("timestamp", "10 minutes") 
     .groupBy(
      window($"timestamp", "10 minutes", "5 minutes"), 
      $"word") 
     .count() 
+1

ありがとうございました。私はそれをやってみた。 Sparkは以前のマイクロバッチ値を保持していません。マイクロバッチ間隔が60秒で、10分のウィンドウを作成しようとすると、12:01:00の値が12:02:00の値で集計されません。 12:02:00の場合は、最近受信したデータの集計のみを検索しています。 10分ごとに集計データを保存するにはどうすればよいですか? – Passionate

+1

カフカストリームからデータを取得する主な機能があります。そして、それは各RDDのための関数を呼び出します。この関数の中で、私は値を集計します。しかし、すべてのRDDに対して、集計値はリセットされます。以前は集計された値は保持されませんでした。私は、このスパークセッションの寿命のためのグローバルな集約データフレームをどのように定義し、すべての集計データを組み合わせることができるのか分かりません。誰かが助けてくれますか? – Passionate

1

構造化ストリーミングプログラミングから「Watermarking」機能を利用することができ、私は何が起こっている考え出しました。 Sparkのステートフルなストリーミングについて知る必要があり、それが私を助けました。

私がしなければならなかったすべては、私はマイクロバッチの新しい集計データと古い集計データをマージする方法を言うために、このupdateTotalCount関数を書かなければならなかった

running_counts = countStream.updateStateByKey(updateTotalCount, initialRDD=initialStateRDD) 

です。私の場合、更新機能は次のようになります。

def updateTotalCount(currentCount, countState): 
    if countState is None: 
     countState = 0 
    return sum(currentCount) + countState 
+0

あなたの答えをありがとう。 'countState'についてもう少し説明してください。何がありますか? 'countStream'にはRDDのストリームがありますか? 'initialRDD'と' initialStateRDD'はどうやって取得できますか? – Jordon

関連する問題