2017-08-04 4 views
3

同じトピックについて多くの質問がありました。しかし、私はまだGCSへの書き込みに問題があります。私はpubsubから話題を読んで、これをGCSにプッシュしようとしています。私はthis linkを参照しました。しかし、最新のビームパッケージでIOChannelUtilsが見つかりませんでした。無限のコレクションをGCSに書く

PCollection<String> details = pipeline 
      .apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic")); 

PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() { 
     public String apply(String s) { 
      return "constant"; 
     } 
    })); 

    PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY) 
      .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10)) 
        .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10), 
          AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS)))) 
      .discardingFiredPanes()).apply(GroupByKey.create()); 

    PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create()); 

これは、スタックオーバーフローの他の多くの類似したトピックから取りました。今、私は、TextIOはwithWindowedWritesとwithNumShardsで無制限のPCollection書き込みオプションをサポートしていることを理解しています。

REF:Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn

しかし、私はこれを行うべきか、理解していませんでした。

私は次のようにGCSに書き込もうとしています。

スタックオーバーフローのトピックにコメントを追加するのに十分なポイントがありません。そのため、私は別の質問としてそれを上げています。

答えて

2

PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows()) 
      .triggering(Repeatedly 
        .forever(AfterProcessingTime 
          .pastFirstElementInPane() 
          .plusDelayOf(Duration.standardSeconds(30)) 
         )).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()); 

streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles())); 


public static class PerWindowFiles extends FileBasedSink.FilenamePolicy { 

public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { 

// OVERRIDE THE FILE NAME CREATION 
} 

} 

の下に与えられたとして、私はこのようにそれを解決することができますが、私は、ウィンドウを変更することによってこの問題を解決することができ、私はまだここでは、ウィンドウの概念について確認していません。私はそれを見つけるときに私は詳細を追加します。誰かがより理解している場合は、詳細を追加してください。 ありがとう

3

これは、ウィンドウズファイルをGCSに書き込む完全な例を提供するPub/Sub to GCS Pipelineです。

+0

こんにちは..答えに感謝します。私はちょうど数分前にそれを完了することができました。この回答を私が取ったアプローチで更新します。もう一度ありがとう! – Balu

関連する問題