答えて

2

ここでは、Java SDK(BEAM 2.1.0)を使用している場合の例を示します。

PipelineOptions options = PipelineOptionsFactory.fromArgs(args) 
                .withValidation() 
                .as(PipelineOptions.class); 

Pipeline pipeline = Pipeline.create(options); 

pipeline.begin() 
       .apply("PubsubIO",PubsubIO.readStrings() 
        .withTimestampAttribute("timestamp") 
        .fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION")) 
       .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L)))) 
       .apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites()); 

あなたはSDKがTextIO.Write.expand(PCollection入力)で「拡大」の方法を模索することによってファイル命名に使用するデフォルト値を見ることができます。具体的には、DefaultFilenamePolicy.javaを見てください。

+2

setStreamingの呼び出しは不要です。さらに重要なことは、withWindowedWritesを使用するためにいくつかのウィンドウを追加する必要があることです。 – jkff

+1

私はStreamingOptionsを読んだだけです。パイプラインに無制限のPCollectionが含まれていると、デフォルト値はtrueになっています。 また、コードスニペットをウィンドウロジックを含むように更新し、PipelineOptionsで冗長なsetStreaming構成を削除しました。 –

+1

ありがとうございました@NalseezDukeは私のために働いた! – rish0097

関連する問題