2016-07-05 9 views
1

Googleのクラウドデータフローを使用してストリームデータから指定された単語を含むテキストを検索したいと考えています。グーグルクラウドのデータフローで別のパイプライン値でパイプラインを検索するには

詳しくは、以下の2つのストリームを扱います。

  • ストリームA:ストリームの要素が "単語" である
  • ストリームB:ストリームの要素が "テキスト" です。各テキストは「単語」で構成されています。このテキストはストリームAに「単語」を持つかもしれません。

多くの「テキスト」がストリームBに頻繁に流れ込みます。一方、時には「単語」が流れAに流れ込む。

"word"がストリームAに流れ込むと、5分前に "word"を持ってストリームBに流れ込む "text"を検索したいと思います。

time stream A : stream B 
00:01 -   this is an apple 
00:02 -   this is an orange 
00:03 -   I have an apple 
00:04 apple      <= "this is an apple" and "I have an apple" are found 
00:05 this       <= "this is an apple" and "this is an orange" are found 

私はGoogleのクラウドデータフローでテキストを検索することができますか?

+0

マッチの有効期限はありますか?それ以降は見続けることはありませんか? –

+0

「ワード」がストリームAに流れ込むたびにチェックしたいので、明示的にジョブを停止するまでチェックを続けたいと思います。 – tamagohan2

+0

ストリームAの全体のサイズには制限がありますか?すべての試合、または要素ごとに試合を探していますか? –

答えて

0

私があなたの質問を正しく理解していれば、あなたが欲しいものを得るための方法はいくつかあります。私は2つのバリエーションについて説明します。

私のコード例の基本的な考え方は、内部結合とSlidingWindowsを5分使用することです。 ParDoのサイド入力またはCoGroupByKeyを使用して、データサイズに応じて結合を実装できます。ここで

は、あなたの入力やウィンドウの設定方法です:

PCollection<String> streamA = ...; 
PCollection<String> streamB = ...; 

PCollection<String> windowedStreamA = streamA.apply(
    Window.into(
     SlidingWindows.of(Duration.standardMinutes(5)).every(...))); 

PCollection<String> windowedStreamB = streamB.apply(
    Window.into(
     SlidingWindows.of(Duration.standardMinutes(5)).every(...))); 

あなたはあなたの仕様&パフォーマンスのニーズを満たすために窓や期間の大きさを調整することもできます。

ここでは、サイド入力を使用して結合を行う方法の概要を示します。これにより、streamAの各要素について、streamBの5分のウィンドウ全体が繰り返されるため、ウィンドウが大きくなるとパフォーマンスが低下します。ここで

PCollectionView<Iterable<String>> streamBview = streamB.apply(View.asIterable()); 

PCollection<String> matches = windowedStreamA.apply(
    ParDo.of(new DoFn<String, String>() { 
     @Override void processElement(ProcessContext context) { 
     for (String text : context.sideInput()) { 
      if (split(text).contains(context.element())) { 
      context.output(text); 
      } 
     } 
     } 
    }); 

は、テキストを事前に分割し、そのキーワードを含む行で各キーワードを接合することでCoGroupByKeyでこれを行う方法のスケッチです。 SDKに付属のTfIdfサンプルにも同様のロジックがあります。

PCollection<KV<String, Void>> keyedStreamA = windowedStreamA.apply(
    MapElements 
     .via(word -> KV.of(word, null)) 
     .withOutputType(new TypeDescriptor<KV<String, Void>>() {})); 

PCollection<KV<String, String>> keyedStreamB = windowedStreamB.apply(
    FlatMapElements 
     .via(text -> split(text).forEach(word --> KV.of(word, text)) 
     .withOutputType(new TypeDescriptor<KV<String, String>>() {})); 


TupleTag<Void> tagA = new TupleTag<Void>() {}; 
TupleTag<String> tagB = new TupleTag<String>() {}; 

KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple 
    .of(tagA, keyedStreamA) 
    .and(tagB, keyedStreamB); 

PCollection<String> matches = coGbkInput 
    .apply(CoGroupByKey.create()) 
    .apply(FlatMapElements 
     .via(result -> result.getAll(tagB)) 
     .withOutputType(new TypeDescriptor<String>())); 

最適な方法はデータによって異なります。直前の5分間よりも多くの試合を行うことができれば、スライディングウインドウを拡大して期間を長くすることで、ウインドウ内のデータの複製量を調整できます。トリガーを使用して、出力が生成されたときに調整することもできます。

+0

お返事ありがとうございます。あなたの答えは、私が探していたことです。ありがとうございました!! – tamagohan2

関連する問題