2017-10-19 7 views
1

私はFlinkにとってかなり新しいです。私はマップ、グループ、およびJSONの入力を合計するこのコードを持っています。Flink keyBy groping issue

単語カウントの例に非常に似ています。

私は(vacant,1) (occupied,2)

を取得することが期待しかし、私はあなたのコードを実行(occupied,1) (vacant,1) (occupied,2)

public static void main(String[] args) throws Exception { 
     String s = "{\n" + 
       " \"Port_128\": \"occupied\",\n" + 
       " \"Port_129\": \"occupied\",\n" + 
       " \"Port_120\": \"vacant\"\n" + 
       "\n" + 
       "}"; 
     StreamExecutionEnvironment env = 
     StreamExecutionEnvironment.getExecutionEnvironment(); 
     DataStream<String> in = env.fromElements(s); 
     SingleOutputStreamOperator<Tuple2<String, Integer>> t = 
     in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
      @Override 
      public void flatMap(String s, Collector<Tuple2<String, Integer>> 
      collector) throws Exception { 
       ObjectMapper mapper = new ObjectMapper(); 
       JsonNode node = mapper.readTree(s); 
       node.elements().forEachRemaining(v -> { 
        collector.collect(new Tuple2<>(v.textValue(), 1)); 
       }); 

      } 
     }).keyBy(0).sum(1); 

     t.print(); 
     env.execute(); 

答えて

1

を取得しています何らかの理由で、私は得る:

あなたよりもわずかに異なっている
10/19/2017 11:27:38 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 
(occupied,1) 
(occupied,2) 
(vacant,1) 
10/19/2017 11:28:03 Keyed Aggregation -> Sink: Unnamed(1/1) switched to FINISHED 

出力が重要です。その理由は、コードがデータを受け取るときに各キーの合計を出力しているからです。まず、最初に占有されている(1を出力しています)、次に2番目に出力します(このキー処理の合計が2になります)。その空きを別のキープロセスに送り、1を出力します。これは私にとって適切な出力のようです。以下のコメントパー

EDIT

、ここであなたに所望の出力を与えるコードは次のとおりです。

public static void main(String[] args) throws Exception { 
    String s = "{\n" + 
     " \"Port_128\": \"occupied\",\n" + 
     " \"Port_129\": \"occupied\",\n" + 
     " \"Port_120\": \"vacant\"\n" + 
     "\n" + 
     "}"; 
    ExecutionEnvironment env = 
     ExecutionEnvironment.getExecutionEnvironment(); 
    DataSet<String> in = env.fromElements(s); 
    AggregateOperator<Tuple2<String, Integer>> t = 
     in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
     @Override 
     public void flatMap(String s, Collector<Tuple2<String, Integer>> 
      collector) throws Exception { 
      ObjectMapper mapper = new ObjectMapper(); 
      JsonNode node = mapper.readTree(s); 
      node.elements().forEachRemaining(v -> { 
      collector.collect(new Tuple2<>(v.textValue(), 1)); 
      }); 

     } 
     }).groupBy(0).sum(1); 

    t.print(); 
    env.execute(); 
} 
+0

にはどうすればKeydAggregationから毎回それをブロックすることができ、どのようにからdifferantそれは '単語数の例? – MIkCode

+1

データストリームプログラムを設定します。 Flinkの単語数の例は、DataSetプログラムです。 2つの動作は異なっています。ストリーム内のデータは、パイプラインを通じて受信されたときに処理されるため、通過する各要素で処理されるのはなぜですか。私は、wordcountの例のようなDataSetコードを使用しているあなたのコードの変更を使って答えを更新します。あなたがそれを実行する場合、あなたは期待どおりの出力を得るでしょう。 – Jicaar

+0

今私はそれを得た 私の間違いは私がストリームを使用していたデータセット – MIkCode

関連する問題