同じトピックについて多くの質問がありました。しかし、私はまだGCSへの書き込みに問題があります。私はpubsubから話題を読んで、これをGCSにプッシュしようとしています。私はthis linkを参照しました。しかし、最新のビームパッケージでIOChannelUtilsが見つかりませんでした。無限のコレクションをGCSに書く
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
これは、スタックオーバーフローの他の多くの類似したトピックから取りました。今、私は、TextIOはwithWindowedWritesとwithNumShardsで無制限のPCollection書き込みオプションをサポートしていることを理解しています。
REF:Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn
しかし、私はこれを行うべきか、理解していませんでした。
私は次のようにGCSに書き込もうとしています。
スタックオーバーフローのトピックにコメントを追加するのに十分なポイントがありません。そのため、私は別の質問としてそれを上げています。
こんにちは..答えに感謝します。私はちょうど数分前にそれを完了することができました。この回答を私が取ったアプローチで更新します。もう一度ありがとう! – Balu