1
PubSubから受け取ったメッセージを、Apache BeamのTextIOを使用してGCSのテキストファイルに書き込む方法はありますか? withWindowedWrites()やwithFilenamePolicy()のようないくつかのメソッドを見たが、ドキュメンテーションでそれを見つけることができなかった。Apache Beamを使用してGCSにストリーミングデータを書き込む
PubSubから受け取ったメッセージを、Apache BeamのTextIOを使用してGCSのテキストファイルに書き込む方法はありますか? withWindowedWrites()やwithFilenamePolicy()のようないくつかのメソッドを見たが、ドキュメンテーションでそれを見つけることができなかった。Apache Beamを使用してGCSにストリーミングデータを書き込む
ここでは、Java SDK(BEAM 2.1.0)を使用している場合の例を示します。
PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.begin()
.apply("PubsubIO",PubsubIO.readStrings()
.withTimestampAttribute("timestamp")
.fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION"))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L))))
.apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites());
あなたはSDKがTextIO.Write.expand(PCollection入力)で「拡大」の方法を模索することによってファイル命名に使用するデフォルト値を見ることができます。具体的には、DefaultFilenamePolicy.javaを見てください。
setStreamingの呼び出しは不要です。さらに重要なことは、withWindowedWritesを使用するためにいくつかのウィンドウを追加する必要があることです。 – jkff
私はStreamingOptionsを読んだだけです。パイプラインに無制限のPCollectionが含まれていると、デフォルト値はtrueになっています。 また、コードスニペットをウィンドウロジックを含むように更新し、PipelineOptionsで冗長なsetStreaming構成を削除しました。 –
ありがとうございました@NalseezDukeは私のために働いた! – rish0097