2017-10-17 7 views
0

FlinkでMarkov Modelを実装したいと思います。 まず、カフカからデータを読みます。どのように私はフリンクでトライグラムマルコフモデルを実装することができますか?Flink Markovモデルの実装

答えて

0

私は最後にマルコフモデルを実装します。このコードは、遷移行列のみを計算します。

private static class MarkovModel implements AllWindowFunction<Tuple2<String,String>, Tuple3<Long, Long,  HashMap<String,Integer>>, TimeWindow>{ 
    @Override 
    public void apply(TimeWindow window, Iterable<Tuple2<String, String>> requests, Collector<Tuple3<Long, Long, HashMap<String, Integer>>> out) throws Exception { 

     HashMap<String,Integer> map = new HashMap<>(); 

     String first = ""; 
     String second = ""; 
     String third = ""; 

     for (Tuple2<String, String> request : requests) { 
      if(first == ""){ 
       third = second; 
       second = first; 
       first = request.f1; 
      }else if(second == ""){ 
       third = second; 
       second = request.f1; 
      }else if(third == ""){ 
       third = request.f1; 
      }else{ 
       third = second; 
       second = first; 
       first = request.f1; 
      } 

      if(third != ""){ 
       int count = map.getOrDefault(first + second + third,0); 
       map.put(first + second + third,count + 1); 
      } 
     } 


     System.out.println(map); 
     System.out.println(map.values().stream().mapToDouble(x->x).sum()); 
     out.collect(new Tuple3(window.getStart(), window.getEnd(), map)); 
    } 
} 
関連する問題