私はFlinkが新しく、DataSet APIを使用しています。最後の段階として処理の全体の束の後、私は値の1つをその最大値で割ることによって正規化する必要があります。だから、私は.max()
演算子を使ってmaxをとり、結果をコンストラクタの引数としてMapFunctionに渡しています。Flinkはデータフローを2回実行します
これは機能しますが、すべての処理が2回実行されます。最大値を見つけるために1つのジョブが実行され、後で別のジョブが実行されて最終結果が作成されます(最初から実行を開始します)。
final List<Tuple6<...>> maxValues = result.max(2).collect();
assert maxValues.size() == 1;
result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...)
@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5")
@FunctionAnnotation.ReadFields("f2")
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> {
private final Tuple6<...> maxValues;
public NormalizeAttributes(Tuple6<...> maxValues) {
this.maxValues = maxValues;
}
@Override
public Tuple6<...> map(Tuple6<...> value) throws Exception {
value.f2 /= maxValues.f2;
return value;
}
}
ありがとうございました! ;) – kaser