2017-12-27 26 views
1

私のクエリが何百万行も返された場合、JdbcIOがどのようにクエリを並列に実行するかを知りたいと思います。 私はhttps://issues.apache.org/jira/browse/BEAM-2803と関連するプルリクエストを参照しました。私はそれを完全に理解できませんでした。Apache Beamを使用してデータベースからバルクデータを読み取る

ReadAllexpandの方法はParDoを使用します。したがって、データベースへの複数の接続を作成して、データを並行して読み込みますか?データソース内のDBに作成できる接続数を制限すると、接続制限に固執しますか?

これはどのように扱われるのか教えてください。JdbcIO?上記のコードはReadFnがパルドで適用されていることを示して

.apply(
      ParDo.of(
       new ReadFn<>(
        getDataSourceConfiguration(), 
        getQuery(), 
        getParameterSetter(), 
        getRowMapper()))) 

:私は2.2.0

アップデートを使用しています。私は、ReadFnは並行して実行されると思います。私の前提が正しいとすれば、一度に限られた数の接続しか確立できないDBから読み取るのに、どのようにしてreadAll()メソッドを使用しますか? Balu

答えて

0

ReadAllメソッドを使用すると、多くの複数のクエリを持っている場合を扱う

感謝。各文字列がクエリである文字列のPCollectionとしてクエリを格納できます。次に、各項目は単一のParDoで別のクエリとして処理されます。

これは、パラレル化をクエリの数に制限するため、少数のクエリではうまく機能しません。しかし、あなたは多くを持っている場合、それははるかに速くpreformします。これは、ほとんどのReadAll呼び出しの場合です。

コードから、セットアップ機能で作業者ごとに接続が行われているようです。これには、ワーカー数とクエリ数に応じていくつかのクエリが含まれます。

クエリ制限はどこに設定されていますか? ReadAllの有無にかかわらず同様に動作するはずです。

は、より多くの情報のためにJIRAを参照してください:https://issues.apache.org/jira/browse/BEAM-2706

私はjdbcIO非常に精通していないですが、彼らはJIRAで提案されているバージョンを実装するようにそれはそう。 PCollectionが何かの可能性があり、その後、PCollectionの要素に応じてクエリを変更するためのコールバック。これにより、PCollection内の各アイテムはクエリを表すことができますが、各要素として新しいクエリを持つことにより、少し柔軟になります。

+0

ララ、コメントをありがとう。しかし、私の質問は、単一のクエリがDBから何百万もの行をロードする場合に特有です。このようなデータを並行して読み込む実装はありません。ここではどのように並列性を達成できますか? – Balu

0

次のようにデータソースを作成しました。

ComboPooledDataSource cpds = new ComboPooledDataSource(); 
    cpds.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver 
    cpds.setJdbcUrl("jdbc:mysql://<IP>:3306/employees"); 
    cpds.setUser("root"); 
    cpds.setPassword("root"); 
    cpds.setMaxPoolSize(5); 

このドライバを今すぐ設定する方が良い方法があります。 データベースプールのサイズを5に設定しました。JdbcIOの変換中に、このデータソースを使用して接続を作成しました。 は、パイプラインでは、私は約3万レコードを返すクエリを使用

option.setMaxNumWorkers(5); 
option.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); 

を設定します。 DB接続を観察しながら、プログラムの実行中に接続数が徐々に増加していました。特定のインスタンスで最大5つの接続を使用しました。 データベースから大量のデータをロードするには、JdbcIO trnsformationを実行しているときに、DBへの接続数を制限する方法があります。

ComboPoolDataSource

<dependency> 
     <groupId>c3p0</groupId> 
     <artifactId>c3p0</artifactId> 
     <version>0.9.1.2</version> 
    </dependency> 

用のMaven依存**私はここで何かを逃した場合は答えを修正すること自由に感じなさい。*

関連する問題