2017-12-18 16 views
0

現在、私はFlinkプロジェクトを行っています。このプロジェクトの主なアイデアは、JSON(ネットワークログ)のデータストリームを読み込んで相互に関連させ、異なるJSONの情報を組み合わせた新しいJSONを生成することです。flink apply関数on timeWindow

この時点で、JSONを読み込み、KeyedStreamを生成し(ログを生成するマシンに基づいて)、5秒間のウィンドウストリームを生成することができました。

次に実行する手順は、ウィンドウに適用機能を使用し、各JSONの情報を結合することです。私はそれをやる方法がちょっと分かりません。

私は現在持っているコードは、次のいずれか:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events 
       .flatMap(new JSONParser()) 
       .keyBy(0) 
       .timeWindow(Time.seconds(5)) 
       .apply(new generateMetaAlert()); 




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> { 

     @Override 
     public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2, 
       Collector<Tuple2<String, JSONObject>> arg3) throws Exception { 


     } 

.apply(新しいgenerateMetaAlert())の部分は、次のエラーで訴えている:

方法が適用される(窓関数、R、タプル、TimeWindow>)型ではWindowedStream、タプル、TimeWindow>は引数には適用されません(MetaAlertGenerator.generateMetaAlert)

1は、私が作っていることの異なる他のコード構造の提案?あなたは(匿名クラスを使用せずに)keyBy機能を適用すると

はコンパイラが判断できないため、カスタムWindowFunction(第三分野)におけるキーの種類がTupleする必要がありますあなたの助け

+0

私はこれがdupliだと思います[この質問](https://stackoverflow.com/questions/47033981/probleme-with-apply-function-windowfunction-in-flink)のケイト。答えがあなたの問題を解決する場合は、チェックして終了してください。 –

答えて

0

のために事前にありがとうあなたのキーのタイプ。このコードは、エラーなし(私はダミーのコードで空白を埋めるためにしようとした考慮に入れる)をコンパイルします。

public class Test { 

    public Test() { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     DataStream<String> events = env.readTextFile("datastream.log"); 

     DataStream<Tuple2<String, JSONObject>> MetaAlert 
       = events 
       .flatMap(new JSONParser()) 
       .keyBy(0) 
       .timeWindow(Time.seconds(5)) 
       .apply(new GenerateMetaAlert()); 

    } 

    public class JSONObject { 
    } 

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> { 
     @Override 
     public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> { 
     @Override 
     public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

} 

しかし、最も簡単な方法は、あなたがStringタイプを保つことができるように、匿名クラスを使用することです。

DataStream<Tuple2<String, JSONObject>> MetaAlert 
     = events 
     .flatMap(new JSONParser()) 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() { 
      @Override 
      public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 
       // Your code here 
      } 
     }); 

クラスを維持したいが、あなたはまた、それがあるように、キーあなたのタイプを保持したい場合は最後に、あなたがKeySelectorを実装することができます

public class Test { 

    public Test() { 

     DataStream<Tuple2<String, JSONObject>> MetaAlert 
       = events 
       .flatMap(new JSONParser()) 
       .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() { 
        @Override 
        public String getKey(Tuple2<String, JSONObject> json) throws Exception { 
         return json.f0; 
        } 
       }) 
       .timeWindow(Time.seconds(5)) 
       .apply(new GenerateMetaAlert()); 
    } 

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> { 
     @Override 
     public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

} 
+0

ありがとうございました! –

関連する問題