カスタム実装のためにコーダー(StringUtf8Coder)をスワップアウトする方法を理解しようとしています。下位互換性のあるコーダーを使用した変換のコーダーを変更する
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.DelegateCoder;
import com.google.common.base.Charsets;
import org.xerial.snappy.Snappy;
import java.io.IOException;
public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> {
private static String decompressSnappy(byte[] input) throws IOException {
if (input == null) {
throw new CoderException("null input is not accepted");
}
if (Snappy.isValidCompressedBuffer(input)) {
return Snappy.uncompressString(input);
}
return new String(input, Charsets.UTF_8);
}
private static byte[] compressSnappy(String input) throws IOException {
return Snappy.compress(input);
}
public static CompressedByteArrayCoder of() {
return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy);
}
private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) {
super(coder, toFn, fromFn);
}
}
私はStringUtf8Coder(PubSubIO.Readのデフォルト)をスワップアウトする方法を把握しようとしています:私はコーダを実装している
はてきぱきと圧縮された文字列を処理する機能を追加しますデータフローパイプラインの更新が失敗することはありません。
私は、2つのコーダーが互換性があることをデータフローサービスランナーに伝える方法を理解しようとしています。