分散システムでコードを実行すると、ParseData
関数の並列インスタンスが互いに独立して実行されるというのは難しい部分です。
ParseData
のローカルIDカウンタを使用して一意のIDを割り当てることはできます。重複を避けるためのトリックは、正しい初期化とカウンタの増分です。 4つの並列性があると仮定すると、という4つのインスタンスが得られます(PD1 ... PD4
と呼ぶ)。次のIDの割り当てを行うだろう:
PD1: 0, 4, 8, 12, ...
PD2: 1, 5, 9, 13, ...
PD3, 2, 6, 10, 14, ...
PD4: 3, 7, 11, 15, ...
あなたが異なる値(詳細は下記)と並行インスタンスを初期化することによって、これを実現して、並列処理(すなわち、ID += parallelism
)によって、各インスタンスでカウントをインクリメントすることができます。
Flinkでは、並列関数のインスタンス化されたすべてが、一意の番号(いわゆるタスクインデックス)を自動的に取得します。この番号を使用してIDカウンターを初期化することができます。タスクインデックスはRuntimeContext.getIndexOfThisSubtask()
で取得できます。またRuntimeContext
がParseData
を実装し、open()
でgetRuntimeContext()
を呼び出すためにRichMapFunction
を使用取得するにはRuntimeContext.getNumberOfParallelSubtasks()
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
を介して操作/機能の並列処理を受けることができます。このような
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html
何か(関連する方法を示しのみ):
class ParseDate extends RichMapFunction {
private long parallelism;
private long idCounter;
public void open(Configuration parameters) {
RuntimeContext ctx = getRuntimeContext();
parallelism = ctx.getNumberOfParallelSubtasks();
idCounter = ctx.getIndexOfThisSubtask();
}
public OutputDataType map(InputDataType value) {
OutputDataType output = new OutputDataType();
output.setID(idCounter);
idCounter += parallelism;
// further processing
return output;
}
}
おかげで、私のために働きました。私は 'public void open(Configuration parameters)'を追加しなければなりませんでした。しかし、このように最後のIDは連続していません(実行ごとに別々に割り当てられます)が、これは各インスタンスに割り当てられた要素の数と関係があります。 –
私の答えでは、開いている方法を修正しました - それを指摘してくれてありがとう。 はい、データが均等に分散されないと、グローバルな共有状態が必要になるため、非常に難しい連続したIDを取得できないことがあります。私はあなたの質問でこの細部を見落としました。 –