2017-09-11 3 views
0

にウィンドウストリーム内の各単語のカウンタを取得する私は、単語ごとにウィンドウ機能で単語の数を取得したい:私はこのコードを使用する場合どのようFLINK

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 

を私は出力を得ますこのような5 seconde(窓時間)後:

入力:

first input : hello 
seconde input : hello 
third input : word 
fifth input : hello 
sixth input : word 

出力

first output : hello : 3 | word : 2 

しかし、私はすべての単語のカウントを出力したいと思います。

ような

: 入力:

first input: hello 
seconde input:hello 
third input:word 
fifth input:hello 
sixth input:word 

出す:

first output: hello : 1 
seconde output:hello : 2 
third output:word : 1 
fifth output:hello : 3 
sixth output:word : 2 

私はこれをどのように行うことができますか?

答えて

0

Kafka Streaming APIのサンプルプログラムは、あなたが探しているものとまったく同じではありませんか? https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#example-program

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 
+0

の例では、すべての5秒を印刷して、私はイベントごとに印刷したいです。 – FlinkNoob

+0

少し明確にするために、(イベント時間ウィンドウとは対照的に)時間ウィンドウの処理を要求していますか?そしてウィンドウが一直線に並んでいるか(つまり、すべてのキーのカウンタを同時にゼロにクリアする5秒)? –

+0

はい、私は時間ウィンドウの処理を求めていますが、データをゼロにしたくありません。私はちょうど各単語の最後の5秒から同じ単語の数を数えたいと思う。 – FlinkNoob

関連する問題