2016-03-21 1 views
0

カスタム実装のためにコーダー(StringUtf8Coder)をスワップアウトする方法を理解しようとしています。下位互換性のあるコーダーを使用した変換のコーダーを変更する

import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder; 
import com.google.cloud.dataflow.sdk.coders.Coder; 
import com.google.cloud.dataflow.sdk.coders.CoderException; 
import com.google.cloud.dataflow.sdk.coders.DelegateCoder; 
import com.google.common.base.Charsets; 
import org.xerial.snappy.Snappy; 

import java.io.IOException; 

public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> { 

    private static String decompressSnappy(byte[] input) throws IOException { 
     if (input == null) { 
      throw new CoderException("null input is not accepted"); 
     } 
     if (Snappy.isValidCompressedBuffer(input)) { 
      return Snappy.uncompressString(input); 
     } 
     return new String(input, Charsets.UTF_8); 
    } 

    private static byte[] compressSnappy(String input) throws IOException { 
     return Snappy.compress(input); 
    } 

    public static CompressedByteArrayCoder of() { 
     return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy); 
    } 

    private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) { 
     super(coder, toFn, fromFn); 
    } 
} 

私はStringUtf8Coder(PubSubIO.Readのデフォルト)をスワップアウトする方法を把握しようとしています:私はコーダを実装している

はてきぱきと圧縮された文字列を処理する機能を追加しますデータフローパイプラインの更新が失敗することはありません。

私は、2つのコーダーが互換性があることをデータフローサービスランナーに伝える方法を理解しようとしています。

答えて

1

残念ながら、現時点でGoogle Cloud Dataflowサービス上で実行中のパイプラインを更新すると、PCollectionのコーダーを変更することはできません。この場合、パイプラインを新しいデータフロージョブとして送信する必要があります。

詳しくは、Updating an Existing Pipelineを参照してください。詳しくは、互換性チェックに関するセクションを参照してください。

これは私たちが今後解決するかもしれないものです。更新については、ドキュメントを確認してください。