無制限のpub/subデータソースからデータ(GPS座標、タイムスタンプ付き)を読み込み、すべてのポイント間の距離を計算する必要があります。私の考えは、1分の窓を言うことができ、サイド入力として、全体のコレクションとParDoを行うことです、私はサイド入力を使用して次のポイントを検索し、ParDo内の距離を計算する。parDoのサイド入力としてunboudedデータソースのfixedWindowを使用しますか?
パイプラインを実行すると、View.asListステップで出力が生成されないことがわかります。また、calcDistanceは出力を生成しません。 FixedWindowコレクションをサイド入力として使用する方法の例はありますか? picture of pipeline
パイプライン:
PCollection<Timepoint> inputWindow = pipeline.apply(PubsubIO.Read.topic(""))
.apply(ParDo.of(new ExtractTimestamps()))
.apply(Window.<Timepoint>into(FixedWindows.of(Duration.standardMinutes(1))));
final PCollectionView<List<Timepoint>> SideInputWindowed = inputWindow.apply(View.<Timepoint>asList());
inputWindow.apply(ParDo.named("Add Timestamp "+teams[i]).of(new AddTimeStampAsKey()))
.apply(ParDo.of(new CalcDistanceTest(SideInputWindowed)).withSideInputs(SideInputWindowed));
パルド:
static class CalcDistance extends DoFn<KV<String,Timepoint>,Timepoint> {
private final PCollectionView<List<Timepoint>> pCollectionView;
public CalcDistance(PCollectionView pCollectionView){
this.pCollectionView = pCollectionView;
}
@Override
public void processElement(ProcessContext c) throws Exception {
LOG.info("starting to calculate distance");
Timepoint input = c.element().getValue();
//doing distance calculation
c.output(input);
}
}
こんにちはルーカス!あなたのヒントをありがとう、私は本当にそれを感謝します。私はあなたが言ったように、それが今働いていることを見ることができ、今サイド入力を使用することができます。私の透かしは止まっていますが、これは別の質問だと思うし、別のスレッドを開きます。 –