上では動作しません、登録コーダは動作しません。登録コーダーは、ApacheビームSDKでデータフロー
私は、BigQueryののTableSchema
でSimpleFunction
を使用したいが、それはシリアル化する必要があります。 私はCodeRegistry
にTableSchemaCoder
を追加しますが、使用していないようです。
どうすれば解決できますか?
// Coder
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
public class TableSchemaCoder extends AtomicCoder<TableSchema> {
public static class FieldSchema {
private String name;
private String type;
private String mode;
public FieldSchema(String name, String type, String mode) {
this.name = name;
this.type = type;
this.mode = mode;
}
/* setter/getter */
}
private final StringUtf8Coder stringCoder = StringUtf8Coder.of();
@Override
public TableSchema decode(InputStream inStream) throws IOException {
return new SchemaBuilder().build(stringCoder.decode(inStream));
}
@Override
public void encode(TableSchema value, OutputStream outStream) throws IOException {
List<JSONObject> fields = new ArrayList<>();
for (TableFieldSchema s : value.getFields()) {
fields.add(new JSONObject(new FieldSchema(s.getName(), s.getType(), s.getMode())));
}
String json = new JSONArray(fields).toString();
stringCoder.encode(json, outStream);
}
}
// Pipeline
// ...
CodeRegistry cr = pipeline.getCodeRegistry
cr.registerCoderForClass(TableSchema.class, TableSchemaCoder())
// ...
TableSchema schema = getSchema()
pipeline.apply(MapElements.via(RowParser(schema)))
エラーメッセージ:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected]
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:118)
at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:30)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 9 more
チェックアウトBigQueryIO自体が閉鎖にTableSchemaを保存するために使用してこのパターン:https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/ SRC /メイン/ javaの/組織/ apacheの/ビーム/ SDK/IO/GCP/BigQueryの/ BigQuerySourceBase.java#L199 – jkff
素敵な答えをありがとう!私は分かりません > Aコーダは、パイプライン内のデータをエンコードするためにのみ使用されます。 私は理解しました。私はそれを試してみます。 – nownabe