2016-07-27 27 views
0

私はCSVファイルをロードしており、カスタムマップ機能を使用してすべての行をPOJOに変換しています。私のプログラムのロジックでは、POJOごとに0からnまでの一意のIDが必要です(nは総行番号です)。私の質問は、変換関数を使ってすべてのPOJOに一意のID(例えば最初の行番号)を割り当てることができますか?理想的な方法は、UDFでIterableを取得し、入力タプルを繰り返しながら変数をインクリメントし、最後に対応するPOJOを出力することです。私のコードは現在次のようになっています:Apache Flink - 一意のIDを入力に割り当てます。

DataSet<MyType> input = env.readCsvFile("/path/file.csv") 
    .includeFields("1111") 
    .types(String.class, Double.class, Double.class,Double.class) 
    .map(new ParseData()); 

ここで、ParseDataはタプルをMyType POJOに変換します。

このタスクを達成するためのベストプラクティスはありますか?

答えて

2

分散システムでコードを実行すると、ParseData関数の並列インスタンスが互いに独立して実行されるというのは難しい部分です。

ParseDataのローカルIDカウンタを使用して一意のIDを割り当てることはできます。重複を避けるためのトリックは、正しい初期化とカウンタの増分です。 4つの並列性があると仮定すると、という4つのインスタンスが得られます(PD1 ... PD4と呼ぶ)。次のIDの割り当てを行うだろう:

PD1: 0, 4, 8, 12, ... 
PD2: 1, 5, 9, 13, ... 
PD3, 2, 6, 10, 14, ... 
PD4: 3, 7, 11, 15, ... 

あなたが異なる値(詳細は下記)と並行インスタンスを初期化することによって、これを実現して、並列処理(すなわち、ID += parallelism)によって、各インスタンスでカウントをインクリメントすることができます。

Flinkでは、並列関数のインスタンス化されたすべてが、一意の番号(いわゆるタスクインデックス)を自動的に取得します。この番号を使用してIDカウンターを初期化することができます。タスクインデックスはRuntimeContext.getIndexOfThisSubtask()で取得できます。またRuntimeContextParseDataを実装し、open()getRuntimeContext()を呼び出すためにRichMapFunctionを使用取得するにはRuntimeContext.getNumberOfParallelSubtasks()

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html

を介して操作/機能の並列処理を受けることができます。このような

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html

何か(関連する方法を示しのみ):

class ParseDate extends RichMapFunction { 
    private long parallelism; 
    private long idCounter; 

    public void open(Configuration parameters) { 
     RuntimeContext ctx = getRuntimeContext(); 
     parallelism = ctx.getNumberOfParallelSubtasks(); 
     idCounter = ctx.getIndexOfThisSubtask(); 
    } 

    public OutputDataType map(InputDataType value) { 
     OutputDataType output = new OutputDataType(); 
     output.setID(idCounter); 
     idCounter += parallelism; 
     // further processing 
     return output; 
    } 
} 
+0

おかげで、私のために働きました。私は 'public void open(Configuration parameters)'を追加しなければなりませんでした。しかし、このように最後のIDは連続していません(実行ごとに別々に割り当てられます)が、これは各インスタンスに割り当てられた要素の数と関係があります。 –

+0

私の答えでは、開いている方法を修正しました - それを指摘してくれてありがとう。 はい、データが均等に分散されないと、グローバルな共有状態が必要になるため、非常に難しい連続したIDを取得できないことがあります。私はあなたの質問でこの細部を見落としました。 –

関連する問題