2016-08-23 7 views
1

私はFlinkが新しく、DataSet APIを使用しています。最後の段階として処理の全体の束の後、私は値の1つをその最大値で割ることによって正規化する必要があります。だから、私は.max()演算子を使ってmaxをとり、結果をコンストラクタの引数としてMapFunctionに渡しています。Flinkはデータフローを2回実行します

これは機能しますが、すべての処理が2回実行されます。最大値を見つけるために1つのジョブが実行され、後で別のジョブが実行されて最終結果が作成されます(最初から実行を開始します)。

final List<Tuple6<...>> maxValues = result.max(2).collect(); 
    assert maxValues.size() == 1; 
    result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...) 

@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5") 
@FunctionAnnotation.ReadFields("f2") 
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> { 

    private final Tuple6<...> maxValues; 

    public NormalizeAttributes(Tuple6<...> maxValues) { 
     this.maxValues = maxValues; 
    } 

    @Override 
    public Tuple6<...> map(Tuple6<...> value) throws Exception { 
     value.f2 /= maxValues.f2; 
     return value; 
    } 
} 

答えて

0

collect()はすぐcollect()によって要求されたデータセットまでのプログラムの実行をトリガーします。後でenv.execute()またはcollect()と再度呼び出すと、プログラムは2回実行されます。

実行の副作用に加えて、collect()を使用して後続の変換に値を配布することは、データがクライアントに転送され、後でクラスタに戻されるという欠点もあります。 Flinkは、別の変換への側面入力としてDataSetを出荷する、いわゆるブロードキャスト変数を提供しています。次のようにプログラム内で放送変数を使用して

はなります

DataSet maxValues = result.max(2); 
result 
    .map(new NormAttrs()).withBroadcastSet(maxValues, "maxValues") 
    .writeAsCsv(...); 

NormAttrs関数は次のようになります。

private static class NormAttr extends RichMapFunction<Tuple6<...>, Tuple6<...>> { 

    private Tuple6<...> maxValues; 

    @Override 
    public void open(Configuration config) { 
    maxValues = (Tuple6<...>)getRuntimeContext().getBroadcastVariable("maxValues").get(1); 
    } 

    @Override 
    public PredictedLink map(Tuple6<...> value) throws Exception { 
    value.f2 /= maxValues.f2; 
    return value; 
    } 
} 

あなたはdocumentationで放送変数の詳細情報を見つけることができます。

+0

ありがとうございました! ;) – kaser

関連する問題