2017-02-17 2 views
0

データフローストリーミングパイプラインのストリームとして出力してDoFn内のファイルを読み込むのPubSubに送信 - >データフローストリーミング読むPubSubのIからファイルイベント/ O - > DoFnのUn-gzipはデータフローは、ライン

static class CustomDoFn extends DoFn<String, String>{ 

@Override 
public void processElement(ProcessContext c) throws Exception { 
    String gcsPath = c.element(); 
    Open ReadChannel with GCS 
    Get Stream from Channel 
    while((line = stream.ReadLine()) != null){ 
     c.output(line) // Is this good way to read and send line down the pipeline? 
    } 
} 

//に-なるパイプライン

pipeline.apply(PubSubIO.Read()). 
      apply(ParDO.of(new CustomDoFn())). 
      apply(new CustomTX()). 
      apply(BigQueryIO.Write()); 

疑問は以下のとおりです。
1.それは正しい方法であり、 DoFnのループで出力を生成しますか?
2. Dofnの内部でFileBasedSource.FileBasedReaderをどのように使用できますか?

+0

こんにちは、私はあなたの質問を理解していることを確認したいだけです。ファイルから読み込むストリーミングパイプラインを作成しますか?また、Pub/Subを使用してファイル名を受け取り、それらを読みたいと思っていますか?あなたのファイルサイズは非常に大きいですか?ファイルを読み込んで各行を出力する方法の1つの問題は、ファイル全体が実際に出力される前にメモリに読み込まれなければならないことです。大きなファイルの場合、これはうまく動作せず、OOMでも可能です。 –

答えて

0

現在、動的ファイル名(パイプライン構築時に指定されていないファイル名)でFileBasedSourceを使用する方法はありません。 Apache Beam 2.0(https://issues.apache.org/jira/browse/BEAM-65)の今後の改良により、この機能は有効になりますが、まだ使用する準備はできていません。あなたのアウトラインされたアプローチは、Alex Amatoが指摘しているように、大きなファイルに対してメモリ制約がありますが、そうでなければ機能するパイプラインを作り出すはずです。

+0

このユースケースに対してより具体的なJIRAが提出されました。これを実装できるように近づいています。https://issues.apache.org/jira/browse/BEAM-2511 TextIOは、ファイル名 – jkff