1
私はFlinkにとってかなり新しいです。私はマップ、グループ、およびJSONの入力を合計するこのコードを持っています。Flink keyBy groping issue
単語カウントの例に非常に似ています。
私は(vacant,1) (occupied,2)
を取得することが期待しかし、私はあなたのコードを実行(occupied,1) (vacant,1) (occupied,2)
public static void main(String[] args) throws Exception {
String s = "{\n" +
" \"Port_128\": \"occupied\",\n" +
" \"Port_129\": \"occupied\",\n" +
" \"Port_120\": \"vacant\"\n" +
"\n" +
"}";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> in = env.fromElements(s);
SingleOutputStreamOperator<Tuple2<String, Integer>> t =
in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>>
collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(s);
node.elements().forEachRemaining(v -> {
collector.collect(new Tuple2<>(v.textValue(), 1));
});
}
}).keyBy(0).sum(1);
t.print();
env.execute();
にはどうすればKeydAggregationから毎回それをブロックすることができ、どのようにからdifferantそれは '単語数の例? – MIkCode
データストリームプログラムを設定します。 Flinkの単語数の例は、DataSetプログラムです。 2つの動作は異なっています。ストリーム内のデータは、パイプラインを通じて受信されたときに処理されるため、通過する各要素で処理されるのはなぜですか。私は、wordcountの例のようなDataSetコードを使用しているあなたのコードの変更を使って答えを更新します。あなたがそれを実行する場合、あなたは期待どおりの出力を得るでしょう。 – Jicaar
今私はそれを得た 私の間違いは私がストリームを使用していたデータセット – MIkCode