2017-10-25 9 views
1

上では動作しません、登録コーダは動作しません。登録コーダーは、ApacheビームSDKでデータフロー

私は、BigQueryののTableSchemaSimpleFunctionを使用したいが、それはシリアル化する必要があります。 私はCodeRegistryTableSchemaCoderを追加しますが、使用していないようです。

どうすれば解決できますか?

// Coder 

import com.google.api.services.bigquery.model.TableFieldSchema; 
import com.google.api.services.bigquery.model.TableSchema; 
import org.apache.beam.sdk.coders.AtomicCoder; 
import org.apache.beam.sdk.coders.StringUtf8Coder; 
import org.json.JSONArray; 
import org.json.JSONObject; 

import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.util.ArrayList; 
import java.util.List; 

public class TableSchemaCoder extends AtomicCoder<TableSchema> { 
    public static class FieldSchema { 
     private String name; 
     private String type; 
     private String mode; 

     public FieldSchema(String name, String type, String mode) { 
      this.name = name; 
      this.type = type; 
      this.mode = mode; 
     } 

     /* setter/getter */ 
    } 

    private final StringUtf8Coder stringCoder = StringUtf8Coder.of(); 

    @Override 
    public TableSchema decode(InputStream inStream) throws IOException { 
     return new SchemaBuilder().build(stringCoder.decode(inStream)); 
    } 

    @Override 
    public void encode(TableSchema value, OutputStream outStream) throws IOException { 
     List<JSONObject> fields = new ArrayList<>(); 
     for (TableFieldSchema s : value.getFields()) { 
      fields.add(new JSONObject(new FieldSchema(s.getName(), s.getType(), s.getMode()))); 
     } 
     String json = new JSONArray(fields).toString(); 
     stringCoder.encode(json, outStream); 
    } 
} 



// Pipeline 

// ... 

CodeRegistry cr = pipeline.getCodeRegistry 
cr.registerCoderForClass(TableSchema.class, TableSchemaCoder()) 

// ... 

TableSchema schema = getSchema() 
pipeline.apply(MapElements.via(RowParser(schema))) 

エラーメッセージ:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
     at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) 
     at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) 
     at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591) 
     at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435) 
     at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:118) 
     at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:30) 
     at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) 
     at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) 
     at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284) 

Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) 
     ... 9 more 

答えて

2

あなたはRowParserのためのコードを共有しませんでしたが、私はそれがフィールドとしてTableSchemaを持って推測しています。 Coderは、パイプライン内のデータをエンコードするためにのみ使用されます。 RowParserなどの関数は、登録されたコーダーを使用しないJavaシリアル化を使用する必要があります。

あなたがテーブルのスキーマを生成している方法に応じて、あなたはいくつかのオプションがあります:

  1. をRowParserは、文字列として、あるいは他のいくつかの、直列化可能なフォーマットでいくつかに保管してもらいます。実際のTableSchemaオブジェクトに一時的なフィールドを持ち、そのフィールドがシリアル化可能な形式から初期化することができます(nullの場合)。

  2. はTableSchemaをシリアライズ避けるRowParserをシリアライズするためのJavaシリアライゼーションフックを実装します。これはおそらく上記と同様です。

  3. 計算スキーマRowParserが使用されて初めて

+0

チェックアウトBigQueryIO自体が閉鎖にTableSchemaを保存するために使用してこのパターン:https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/ SRC /メイン/ javaの/組織/ apacheの/ビーム/ SDK/IO/GCP/BigQueryの/ BigQuerySourceBase.java#L199 – jkff

+0

素敵な答えをありがとう!私は分かりません > Aコーダは、パイプライン内のデータをエンコードするためにのみ使用されます。 私は理解しました。私はそれを試してみます。 – nownabe

関連する問題