2017-05-02 1 views
1

無制限のpub/subデータソースからデータ(GPS座標、タイムスタンプ付き)を読み込み、すべてのポイント間の距離を計算する必要があります。私の考えは、1分の窓を言うことができ、サイド入力として、全体のコレクションとParDoを行うことです、私はサイド入力を使用して次のポイントを検索し、ParDo内の距離を計算する。parDoのサイド入力としてunboudedデータソースのfixedWindowを使用しますか?

パイプラインを実行すると、View.asListステップで出力が生成されないことがわかります。また、calcDistanceは出力を生成しません。 FixedWindowコレクションをサイド入力として使用する方法の例はありますか? picture of pipeline

パイプライン:

PCollection<Timepoint> inputWindow = pipeline.apply(PubsubIO.Read.topic("")) 
       .apply(ParDo.of(new ExtractTimestamps())) 
       .apply(Window.<Timepoint>into(FixedWindows.of(Duration.standardMinutes(1)))); 

final PCollectionView<List<Timepoint>> SideInputWindowed = inputWindow.apply(View.<Timepoint>asList()); 

inputWindow.apply(ParDo.named("Add Timestamp "+teams[i]).of(new AddTimeStampAsKey())) 
       .apply(ParDo.of(new CalcDistanceTest(SideInputWindowed)).withSideInputs(SideInputWindowed)); 

パルド:

static class CalcDistance extends DoFn<KV<String,Timepoint>,Timepoint> { 
    private final PCollectionView<List<Timepoint>> pCollectionView; 

    public CalcDistance(PCollectionView pCollectionView){ 
     this.pCollectionView = pCollectionView; 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     LOG.info("starting to calculate distance"); 
     Timepoint input = c.element().getValue(); 
     //doing distance calculation 
     c.output(input); 
    } 
} 

答えて

0

全体的な問題は、あなたのユースケースのため以来のpubsubからのその属性を読み取る際に要素のタイムスタンプは、データフローで知られていないということですデータ。

Pubsubから読み込むときは、タイムスタンプラベルがdiscussed hereのレコードを取り込むことをお勧めします。

最後に、GameStats exampleは、サイド入力を使用してスパムユーザーを検索します。あなたのケースでは、ウィンドウごとにglobalMeanScoreを計算する代わりに、すべてのタイムポイントをサイド入力に配置するだけです。

+0

こんにちはルーカス!あなたのヒントをありがとう、私は本当にそれを感謝します。私はあなたが言ったように、それが今働いていることを見ることができ、今サイド入力を使用することができます。私の透かしは止まっていますが、これは別の質問だと思うし、別のスレッドを開きます。 –

関連する問題