2016-12-28 16 views
1

私のパイプラインは出力データfileをGCSに保存します。 このファイルを圧縮したいと思います。 TextIOは圧縮されたファイルを解凍していますが、 ですが、ファイルを圧縮していないと思います。 出力ファイルをどのように圧縮できますか?Dataflow Java SDKで出力ファイルを圧縮する方法は?

答えて

1

これは、現在、データフローのためのオープンfeature requestである、しかし仕事はすでにビームで行われています。 Dataflow 2.0がリリースされると(Beamに基づく)、これは正式にサポートされるべきです。つまり、私はFileBasedSinkクラスを拡張し、Beamのこの機能に関するJeff Payneの作業を利用して圧縮GZIPファイルを作成できました。私は私のカルマレベルでのポストあたり2つのリンクに限定だから、これはビームに合併しますPRに

aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));

+0

リンク:

public class GZIPSink<T> extends FileBasedSink<T> { private final Coder<T> coder; GZIPSink(String baseOutputFilename, Coder<T> coder) { super(baseOutputFilename, ".gz"); this.coder = coder; } @Override public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) { return new GZIPWriteOperation(this, coder); } static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> { private final Coder<T> coder; private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) { super(sink); this.coder = coder; } @Override public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception { return new GZIPBasedWriter(this, coder); } } static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); private final Coder<T> coder; private GZIPOutputStream out; public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { super(writeOperation); this.mimeType = MimeTypes.BINARY; this.coder = coder; } @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{ def.setLevel(def.BEST_COMPRESSION); }}; } @Override public void write(T value) throws Exception { coder.encode(value, out, Coder.Context.OUTER); out.write(NEWLINE); } @Override public void writeFooter() throws IOException { out.finish(); } } } 

は、そして、実際に書き込みを行うには。 https://github.com/apache/beam/commit/b7b68e6fb1aafb6b4160e5dcea022bf6c802e33f – Thang

1

TextIOは、圧縮ファイルの読み取りのみをサポートしています。圧縮されたファイルの書き込みはサポートしていません。

https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files

TextIOは現在、圧縮されたファイルへの書き込みをサポートしていません。

さらに詳しい情報:

関連する問題