私があなたの質問を正しく理解していれば、あなたが欲しいものを得るための方法はいくつかあります。私は2つのバリエーションについて説明します。
私のコード例の基本的な考え方は、内部結合とSlidingWindows
を5分使用することです。 ParDo
のサイド入力またはCoGroupByKey
を使用して、データサイズに応じて結合を実装できます。ここで
は、あなたの入力やウィンドウの設定方法です:
PCollection<String> streamA = ...;
PCollection<String> streamB = ...;
PCollection<String> windowedStreamA = streamA.apply(
Window.into(
SlidingWindows.of(Duration.standardMinutes(5)).every(...)));
PCollection<String> windowedStreamB = streamB.apply(
Window.into(
SlidingWindows.of(Duration.standardMinutes(5)).every(...)));
あなたはあなたの仕様&パフォーマンスのニーズを満たすために窓や期間の大きさを調整することもできます。
ここでは、サイド入力を使用して結合を行う方法の概要を示します。これにより、streamA
の各要素について、streamB
の5分のウィンドウ全体が繰り返されるため、ウィンドウが大きくなるとパフォーマンスが低下します。ここで
PCollectionView<Iterable<String>> streamBview = streamB.apply(View.asIterable());
PCollection<String> matches = windowedStreamA.apply(
ParDo.of(new DoFn<String, String>() {
@Override void processElement(ProcessContext context) {
for (String text : context.sideInput()) {
if (split(text).contains(context.element())) {
context.output(text);
}
}
}
});
は、テキストを事前に分割し、そのキーワードを含む行で各キーワードを接合することでCoGroupByKey
でこれを行う方法のスケッチです。 SDKに付属のTfIdf
サンプルにも同様のロジックがあります。
PCollection<KV<String, Void>> keyedStreamA = windowedStreamA.apply(
MapElements
.via(word -> KV.of(word, null))
.withOutputType(new TypeDescriptor<KV<String, Void>>() {}));
PCollection<KV<String, String>> keyedStreamB = windowedStreamB.apply(
FlatMapElements
.via(text -> split(text).forEach(word --> KV.of(word, text))
.withOutputType(new TypeDescriptor<KV<String, String>>() {}));
TupleTag<Void> tagA = new TupleTag<Void>() {};
TupleTag<String> tagB = new TupleTag<String>() {};
KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple
.of(tagA, keyedStreamA)
.and(tagB, keyedStreamB);
PCollection<String> matches = coGbkInput
.apply(CoGroupByKey.create())
.apply(FlatMapElements
.via(result -> result.getAll(tagB))
.withOutputType(new TypeDescriptor<String>()));
最適な方法はデータによって異なります。直前の5分間よりも多くの試合を行うことができれば、スライディングウインドウを拡大して期間を長くすることで、ウインドウ内のデータの複製量を調整できます。トリガーを使用して、出力が生成されたときに調整することもできます。
マッチの有効期限はありますか?それ以降は見続けることはありませんか? –
「ワード」がストリームAに流れ込むたびにチェックしたいので、明示的にジョブを停止するまでチェックを続けたいと思います。 – tamagohan2
ストリームAの全体のサイズには制限がありますか?すべての試合、または要素ごとに試合を探していますか? –