2016-04-18 7 views
4

私はApache Flinkでストリーミングデータの時間枠を作りたいと思っています。私のデータはいくぶん次のようになります。FlinkのString DataStreamでtimeWindow()を実行するには?

1> {52,"mokshda",84.85} 
2> {1,"kavita",26.16} 
2> {131,"nidhi",178.9} 
3> {2,"poorvi",22.97} 
4> {115,"saheba",110.41} 

20秒ごとに、私はマークの合計たいすべての行の(最後の列、例えばMokshdaのマークは84.85です。)。 timeWindow()関数はKeyedStreamで動作するため、このDataStreamをkeyBy()する必要があります。ロールナンバー(最初の列、例えばモクシュダの場合は52)でキー入力できます。

val windowedStream = stockStream 
         .keyBy(0) 
         .timeWindow(Time.seconds(20)) 
         .sum(2) 

もちろん、Flinkは自分のデータをリストとして読み込んでいません。これは、文字列としてそれを読んでいるので、私は次の例外を取得:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: String 

私は文字列データにtimeWindowを行うことができ、またはどのように私はタプルにこのデータを変換できますか?

答えて

5

あなたは、その成分に文字列を解析してデータ型を変換し、Tupleを発するMapFunction[String, (Int, String, Double)]を使用してDataStream[(Int, String, Double)]DataStream[String]を変換することができます。

timeWindowAllをキーなしのデータストリームに適用することもできます。しかし、セマンティクスはもちろん異なり、AllWindowは並列処理でしか処理できません1

関連する問題