2017-06-22 23 views
2

Apache Beam 2.0(Java)でJdbcIOを使用して、同じプロジェクト内のDataflowからCloud SQLインスタンスに接続するのには苦労しています。私は次のエラーを取得していますデータフロージョブからCloud SQLに接続

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure 

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) 
  • データフローサービスアカウントのドキュメントによると*@dataflow-service-producer-prod.iam.gserviceaccount.comは、すべてにアクセスできる必要があります彼は "編集"権限を持っている場合は、同じプロジェクト内のリソース。

  • DirectRunnerで同じデータフロージョブを実行すると、すべて正常に動作します。

これは私が使用しているコードです:

private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true"; 

PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read() 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL) 
    .withUsername(JDBC_USER).withPassword(JDBC_PW)) 
.withQuery(
    "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable") 
.withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of())) 
.withRowMapper(new JdbcIO.RowMapper < KV < String, Double >>() { 
    public KV < String, Double > mapRow(ResultSet resultSet) throws Exception { 
    return KV.of(resultSet.getString(1), resultSet.getDouble(2)); 
    } 
})); 

EDIT:別のデータフロージョブ内のビームの外側で、次のアプローチを使用して

が伝えたDataflowRunnerで正常に動作するようです私はデータベースが問題ではないかもしれないと私に言います。私はこのアプローチは良い仕事かもしれないと思う

java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW); 

答えて

1

は、com.mysql.jdbc.GoogleDriverを試し、そしてここに記載されているMavenの依存関係を使用してください。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

関連質問: Where i find and download this jar file com.mysql.jdbc.GoogleDriver?

+0

ちょっとアレックス・アマート@、残念ながら、両方のMavenの依存関係が追加されているにもかかわらず、 "java.sql.SQLException:JDBCドライバクラス 'com.mysql.jdbc.GoogleDriver'をロードできません"というgcpデータフローでは動作しないようです。 – Jimmy

0

こんにちは、それは私がDB構成方法からwithusernameとパスワードのメソッドを削除し、私のパイプライン構成は

以下のようになりますuはit.Additionalyやったように、私のために働きましたこれが役立つ
PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read() 
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8") 
      ) 
    .withQuery(
     "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE") 
    .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of())) 
    .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >>() { 
     @Override 
     public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception { 
     LOG.info(resultSet.getDouble(1)+ "Came"); 
      return KV.of(resultSet.getDouble(1), resultSet.getDouble(2)); 
     } 
    })); 

希望

関連する問題