2017-10-09 16 views
4

分散処理環境では、 "part-000"などの "part"ファイル名を使用するのが一般的ですが、個々の出力ファイル名をに変更することもできますウィンドウのファイル名)はApache Beam?Apache Beamは出力用のカスタムファイル名をサポートしていますか?

これを行うには、ウィンドウの名前を割り当てたり、ウィンドウの内容に基づいてファイル名を推測する必要があります。私はそのようなアプローチが可能かどうかを知りたい。溶液は、ストリーミングまたはバッチされるべきか否かの

、ストリーミングモードの例はい

答えて

1

好ましいです。 documentation of TextIOあたり:

あなたはファイル名の生成方法をよりよく制御したい場合は、デフォルトのポリシーで許可よりもJKFFにより示唆されるように、カスタムFilenamePolicyもTextIO.Write.to(FilenamePolicy)

+0

いくつかのサンプルコードを教えてください。このアプローチを試してみると、私はClassCastExceptionを持っています... –

+0

あなたのコードとあなたが持っているエラーの完全なスタックトレースを含めてください。 – jkff

5

はいを​​使用して設定することができますTextIO.write.to(FilenamePolicy)を使用してこれを実現できます。

例は以下の通りです:

あなたが使用することができ、特定のローカルファイルに出力を書き込みたい場合は:。

lines.apply(TextIO.write()」(へ/パス/​​ /にfile.txt ")));

以下は接頭辞linkを使用して出力を書き込む簡単な方法です。この例はGoogleストレージ用ですが、代わりにローカル/ s3パスを使用できます。

public class MinimalWordCountJava8 { 

    public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 
    // In order to run your pipeline, you need to make following runner specific changes: 
    // 
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner 
    // or FlinkRunner. 
    // CHANGE 2/3: Specify runner-required options. 
    // For BlockingDataflowRunner, set project and temp location as follows: 
    // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
    // dataflowOptions.setRunner(BlockingDataflowRunner.class); 
    // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); 
    // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); 
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} 
    // for more details. 
    // options.as(FlinkPipelineOptions.class) 
    //  .setRunner(FlinkRunner.class); 

    Pipeline p = Pipeline.create(options); 

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) 
    .apply(FlatMapElements 
     .into(TypeDescriptors.strings()) 
     .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+")))) 
    .apply(Filter.by((String word) -> !word.isEmpty())) 
    .apply(Count.<String>perElement()) 
    .apply(MapElements 
     .into(TypeDescriptors.strings()) 
     .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) 
    // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. 
    .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); 

    p.run().waitUntilFinish(); 
    } 
} 

This example codeあなたに出力を書き込むことでより多くの制御を与える:

/** 
    * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data 
    * being written. This always includes the shard number and the total number of shards. For 
    * windowed writes, it also includes the window and pane index (a sequence number assigned to each 
    * trigger firing). 
    */ 
    protected static class PerWindowFiles extends FilenamePolicy { 

    private final ResourceId prefix; 

    public PerWindowFiles(ResourceId prefix) { 
     this.prefix = prefix; 
    } 

    public String filenamePrefixForWindow(IntervalWindow window) { 
     String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); 
     return String.format(
      "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end())); 
    } 

    @Override 
    public ResourceId windowedFilename(int shardNumber, 
             int numShards, 
             BoundedWindow window, 
             PaneInfo paneInfo, 
             OutputFileHints outputFileHints) { 
     IntervalWindow intervalWindow = (IntervalWindow) window; 
     String filename = 
      String.format(
       "%s-%s-of-%s%s", 
       filenamePrefixForWindow(intervalWindow), 
       shardNumber, 
       numShards, 
       outputFileHints.getSuggestedFilenameSuffix()); 
     return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
    } 

    @Override 
    public ResourceId unwindowedFilename(
     int shardNumber, int numShards, OutputFileHints outputFileHints) { 
     throw new UnsupportedOperationException("Unsupported."); 
    } 
    } 

    @Override 
    public PDone expand(PCollection<InputT> teamAndScore) { 
    if (windowed) { 
     teamAndScore 
      .apply("ConvertToRow", ParDo.of(new BuildRowFn())) 
      .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix)); 
    } else { 
     teamAndScore 
      .apply("ConvertToRow", ParDo.of(new BuildRowFn())) 
      .apply(TextIO.write().to(filenamePrefix)); 
    } 
    return PDone.in(teamAndScore.getPipeline()); 
    } 
+0

ありがとう!これはリリース2.1.0にはないようです。そして、2.2.0はまだ出ていません:https://issues.apache.org/jira/projects/BEAM/versions/12341044 –

+0

私はこれを完全にうまく動作するローカル環境でビームバージョン2.1.0でテストしました。 –

+1

2.1.0のドキュメントに従って、これが利用可能です:https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/TextIO.html –

0

これはビーム2.1.0で完全に有効な例です。あなたのデータ(PCollectionなど)を呼び出すことができます

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; 
import org.apache.beam.sdk.io.fs.ResourceId; 
import org.apache.beam.sdk.transforms.display.DisplayData; 

@SuppressWarnings("serial") 
public class FilePolicyExample { 

    public static void main(String[] args) { 
     FilenamePolicy policy = new WindowedFilenamePolicy("somePrefix"); 

     //data 
     data.apply(TextIO.write().to("your_DIRECTORY") 
      .withFilenamePolicy(policy) 
      .withWindowedWrites() 
      .withNumShards(4)); 

    } 

    private static class WindowedFilenamePolicy extends FilenamePolicy { 

     final String outputFilePrefix; 

     WindowedFilenamePolicy(String outputFilePrefix) { 
      this.outputFilePrefix = outputFilePrefix; 
     } 

     @Override 
     public ResourceId windowedFilename(
       ResourceId outputDirectory, WindowedContext input, String extension) { 
      String filename = String.format(
        "%s-%s-%s-of-%s-pane-%s%s%s", 
        outputFilePrefix, 
        input.getWindow(), 
        input.getShardNumber(), 
        input.getNumShards() - 1, 
        input.getPaneInfo().getIndex(), 
        input.getPaneInfo().isLast() ? "-final" : "", 
        extension); 
      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
     } 

     @Override 
     public ResourceId unwindowedFilename(
       ResourceId outputDirectory, Context input, String extension) { 
      throw new UnsupportedOperationException("Expecting windowed outputs only"); 
     } 

     @Override 
     public void populateDisplayData(DisplayData.Builder builder) { 
      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix) 
        .withLabel("File Name Prefix")); 
     } 
    } 
} 
関連する問題