2つのオプションについて説明します。 1つは、現在のバージョンのDataflow(1.X)で動作するサイド入力を使用し、もう1つは、今後のDataflow(2.X)の一部であるDoFn
内の状態を使用します。データフロー1.X用
ソリューション、ここでの一般的な考え方は、すべての労働者へのシンボル - >名前マッピングを利用できるようにマップ値side-inputを使用することです側入力に
を使用。
このテーブルはグローバルウィンドウ内にある必要があります(何も年齢が変わることはありません)。すべての要素がトリガされる必要があります(または新しい更新が生成されるように頻繁に実行する必要があります)。また、各シンボルの最新の名前を取るためにはいくつかのロジックが必要です。
このソリューションの欠点は、新しいエントリが入るたびにルックアップテーブル全体が再生成され、すぐにすべてのワーカーにプッシュされないことです。むしろ、それぞれは将来的に新しいマッピングを「ある時点で」得るでしょう。ハイレベルで
、このパイプラインは(私はこのコードをテストしていないので、いくつかの種類があるかもしれません)のようなものになります。私たちはここにviewAsMultiMap
を使用していた
PCollection<KV<Symbol, Name>> symbolToNameInput = ...;
final PCollectionView<Map<Symbol, Iterable<Name>>> symbolToNames = symbolToNameInput
.apply(Window.into(GlobalWindows.of())
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))
.accumulatingFiredPanes())
.apply(View.asMultiMap())
注意を。つまり、すべてのシンボルの名前はとなります。すべてとなります。私たちは物事を見るときに、iterableで最新の名前を取る必要があります。ステートAPI
にこのソリューションを使用してデータフロー2.Xについて
が
PCollection<Detail> symbolDetails = ...;
symbolDetails
.apply(ParDo.withSideInputs(symbolToNames).of(new DoFn<Detail, AugmentedDetails>() {
@Override
public void processElement(ProcessContext c) {
Iterable<Name> names = c.sideInput(symbolToNames).get(c.element().symbol());
Name name = chooseName(names);
c.output(augmentDetails(c.element(), name));
}
}));
ソリューションは、今後のデータフロー2.0リリースの一部となる新機能を使用しています。プレビューリリース(現在はDataflow 2.0-beta1)の一部ではありませんが、利用可能な場合はrelease notesをご覧ください。
一般的な考え方では、キー付き状態では、特定のキーに関連付けられた値を保存できます。このケースでは、私たちが見た最新の「名前」の価値を覚えています。
ステートフルDoFn
を実行する前に、各要素を共通の要素型(NameOrDetails
)オブジェクトにラップします。これは次のようになります。
// Convert SymbolToName entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> left = symbolToName
.apply(ParDo.of(new DoFn<SymbolToName, KV<Symbol, NameOrDetails>>() {
@ProcessElement
public void processElement(ProcessContext c) {
SymbolToName e = c.element();
c.output(KV.of(e.getSymbol(), NameOrDetails.name(e.getName())));
}
});
// Convert detailed entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> right = details
.apply(ParDo.of(new DoFn<Details, KV<Symbol, NameOrDetails>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Details e = c.element();
c.output(KV.of(e.getSymobl(), NameOrDetails.details(e)));
}
});
// Flatten the two streams together
PCollectionList.of(left).and(right)
.apply(Flatten.create())
.apply(ParDo.of(new DoFn<KV<Symbol, NameOrDetails>, AugmentedDetails>() {
@StateId("name")
private final StateSpec<ValueState<String>> nameSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void processElement(ProcessContext c
@StateId("name") ValueState<String> nameState) {
NameOrValue e = c.element().getValue();
if (e.isName()) {
nameState.write(e.getName());
} else {
String name = nameState.read();
if (name == null) {
// Use symbol if we haven't received a mapping yet.
name = c.element().getKey();
}
c.output(e.getDetails().withName(name));
}
});
詳細データのソースについての詳細は教えてください。特に - それは静的ですか? Per-key?世界的に?それは複数のキーと結合されていますか?合計キーはいくつですか? –
@SamMcVeety濃縮データは動的であり、常に更新されます(キーごとに1時間あたり約1つの新しい要素)。少なくとも何百万という鍵がありますが、おそらく何億というものもあります。 –
ジョインがキーを意識していれば、両方のコレクションの同じキーを持つ要素が同じノードで処理され、適切なスケーリングが可能になることを確認することができます。 –