0

ローカルマシンからパイプラインを実行すると、クラウドSQLインスタンスにあるテーブルを更新できます。しかし、DataflowRunnerを使ってこれを動かすと、同じことが以下の例外で失敗します。google sqlインスタンスにアクセスするデータフローでビームクラスを実行するにはどうすればよいですか?

私のEclipseから接続するには、 .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb")というデータソース設定を作成しました。

Dataflowランナーを実行している間、同じiが .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")に変更されました。

  1. インスタンスのゾーン情報にプレフィックスを付ける必要がありますか?

私はこれを実行したときに私が得る例外は以下のとおりである:

Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f... 
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write 
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully. 

SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver' 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261) 
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55) 
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43) 
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver' 
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source) 
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65) 
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47) 
    at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) 
    at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278) 
    ... 14 more 

この問題を解決するために、任意のヘルプは本当に感謝しています。これは、データフロージョブとしてビームパイプラインを実行する私の最初の試みです。

PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 

    ((DataflowPipelineOptions) options).setNumWorkers(2); 
    ((DataflowPipelineOptions)options).setProject("xxxxx"); 
    ((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging"); 
    ((DataflowPipelineOptions)options).setRunner(DataflowRunner.class); 
    ((DataflowPipelineOptions)options).setStreaming(false); 
    options.setTempLocation("gs://xxxx/tempbucket"); 
    options.setJobName("sqlpipeline"); 
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read() 
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration 
        .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db") 
        .withUsername("root").withPassword("root")) 
      .withQuery(
        "select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account") 
      .withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() { 
       public void setParameters(PreparedStatement preparedStatement) throws Exception { 
        preparedStatement.setFetchSize(1); 
        preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD); 

       } 
      }).withRowMapper(new JdbcIO.RowMapper<Account>() { 
       public Account mapRow(ResultSet resultSet) throws Exception { 
        Account account = new Account(); 
        account.setAccount_id(resultSet.getInt("account_id")); 
        account.setAccount_parent(resultSet.getInt("account_parent")); 
        account.setAccount_description(resultSet.getString("account_description")); 
        account.setAccount_type(resultSet.getString("account_type")); 
        account.setAccount_rollup("account_rollup"); 
        account.setCustom_Members("Custom_Members"); 
        return account; 
       } 
      })); 
+0

おかげパブロに役立つことを願っています。 (Y) – Balu

答えて

1

com.google.cloud.sql/mysql-socket-factory maven依存関係を正しく取得しましたか?あなたがクラスを読み込めないように見えます。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

+0

私は以下のバージョンを使用しています:mysql-socket-factory - 1.0.2。私のコードでは古いバージョンでした。しかし、1.0.2にアップデートしても、同じエラーが出ます。 "JDBCドライバクラス 'com.mysql.jdbc.GoogleDriver'"をロードできません。 – Balu

+0

こんにちはBalu、com.mysql.jdbc.GoogleDriverクラスのjarファイルがあるかどうかをビルド出力で確認できますか? また、競合があるかどうかを確認するために、mavenツリーを印刷してください。 https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree。 html http://maven.apache.org/plugins/maven-dependency-plugin/tree-mojo.html –

0

こんにちは、私はGoogleのドライバーが

だから、App Engineの展開のために支援しているので、それは、これはどのような私のパイプライン構成で行くように、それは「はcom.mysql.jdbc.Driver」と上を移動する方が良いと思います

PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read() 
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8") 
      ) 
    .withQuery(
     "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE") 
    .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of())) 
    .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >>() { 
     @Override 
     public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception { 
     LOG.info(resultSet.getDouble(1)+ "Came"); 
      return KV.of(resultSet.getDouble(1), resultSet.getDouble(2)); 
     } 
})); 

似ている、それは私のために完全に正常に動作しますが、それは再フォーマットのため

関連する問題