0
apex/spark runnerを使用してApacheビームのテーブルにデータを書き込む。ただし、apexランナーを使用してプログラムを実行している間は例外が発生します。Apache Beam -BiasQueryIO Apexランナーを使用
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Id").setType("STRING"));
fields.add(new TableFieldSchema().setName("row").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
PCollection<TableRow> data= pipeline.apply("ReadLines", TextIO.read().from(inputDir + "data.txt"))
.apply(ParDo.of(new ExtractDataFn()));
data.apply(BigQueryIO.writeTableRows()
.to("my-project:output.output_table")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_EEDED));
static class ExtractDataFn extends DoFn<String, TableRow> {
/**
*
*/
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element() != null) {
TableRow row = new TableRow().set("Id", c.element().substring(1, 5)).set("row", c.element());
c.output(row);
}
}
}
使用してプログラム上で実行した後にコマンドMVN execをコンパイル:のjava ... --runner = ApexRunner」-Papexランナー、例外の下に取得:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1398)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:974)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:525)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:460)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
あなたは私を案内してもらえ何が問題になるのですか?