2017-09-11 2 views
2

私は以下のコードで、各時間ウィンドウためカフカからアイテムの数をカウントにスパーク構造のストリーミングを使用しようとしている:(私は毎分に一度、ことが期待時間ウィンドウごとにアイテムを数えるには?

import java.text.SimpleDateFormat 
import java.util.Date 
import org.apache.spark.sql.ForeachWriter 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions.window 

object Counter extends App { 
    val dateFormatter = new SimpleDateFormat("HH:mm:ss") 
    val spark = ... 
    import spark.implicits._ 

    val df = spark.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", ...) 
    .option("subscribe", ...) 
    .load() 

    val windowDuration = "5 minutes" 
    val counts = df 
    .select("value").as[Array[Byte]] 
    .map(decodeTimestampFromKafka).toDF("timestamp") 
    .select($"timestamp" cast "timestamp") 
    .withWatermark("timestamp", windowDuration) 
    .groupBy(window($"timestamp", windowDuration, "1 minute")) 
    .count() 
    .as[((Long, Long), Long)] 

    val writer = new ForeachWriter[((Long, Long), Long)] { 
    var partitionId: Long = _ 
    var version: Long = _ 

    def open(partitionId: Long, version: Long): Boolean = { 
     this.partitionId = partitionId 
     this.version = version 
     true 
    } 

    def process(record: ((Long, Long), Long)): Unit = { 
     val ((start, end), docs) = record 
     val startDate = dateFormatter.format(new Date(start)) 
     val endDate = dateFormatter.format(new Date(end)) 
     val now = dateFormatter.format(new Date) 
     println(s"$now:$this|$partitionId|$version: ($startDate, $endDate) $docs") 
    } 

    def close(errorOrNull: Throwable): Unit = {} 
    } 

    val query = counts 
    .repartition(1) 
    .writeStream 
    .outputMode("complete") 
    .foreach(writer) 
    .start() 

    query.awaitTermination() 

    def decodeTimestampFromKafka(bytes: Array[Byte]): Long = ... 
} 

スライド期間)、(唯一の集約キーはウィンドウであるため)を出力し、最後の5分間に(ウィンドウの継続時間)のアイテム数がとなります。 はしかし、それはこのサンプルのように複数のレコード分あたり2〜3回、出力:

... 
22:44:34|[email protected]|0|8: (22:43:20, 22:43:20) 383 
22:44:34|[email protected]|0|8: (22:43:18, 22:43:19) 435 
22:44:34|[email protected]|0|8: (22:42:33, 22:42:34) 395 
22:44:34|[email protected]|0|8: (22:43:14, 22:43:14) 435 
22:44:34|[email protected]|0|8: (22:43:09, 22:43:09) 437 
22:44:34|[email protected]|0|8: (22:43:19, 22:43:19) 411 
22:44:34|[email protected]|0|8: (22:43:07, 22:43:07) 400 
22:44:34|[email protected]|0|8: (22:43:17, 22:43:17) 392 
22:44:44|[email protected]|0|9: (22:43:37, 22:43:38) 420 
22:44:44|[email protected]|0|9: (22:43:25, 22:43:25) 395 
22:44:44|[email protected]|0|9: (22:43:22, 22:43:22) 416 
22:44:44|[email protected]|0|9: (22:43:00, 22:43:00) 438 
22:44:44|[email protected]|0|9: (22:43:41, 22:43:41) 426 
22:44:44|[email protected]|0|9: (22:44:13, 22:44:13) 132 
22:44:44|[email protected]|0|9: (22:44:02, 22:44:02) 128 
22:44:44|[email protected]|0|9: (22:44:09, 22:44:09) 120 
... 

は、「追加」するために、出力モードを変更する動作を変更するようだが、それでもこれまで私が期待したものから。

私の前提がうまくいかないのですか?上記のコードが与えられた場合、サンプル出力はどのように解釈または使用されるべきですか?

答えて

1

あなたは、計算されたウィンドウ(withWatermark)を計算するために最大5分の遅れたイベントを許可しています。handling late data and watermarkingを参照してください。

関連する問題