2016-11-28 2 views
3

私はSparkでETLジョブを持っています。これはMySQLにも接続してデータを取得します。次のように歴史的に、私はそれをやってきた:私はtmp_usersテーブルを使用するたびに、複数のワーカーノード、このスケーリングしかしスパークETLジョブは一度だけmysqlを実行します

hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()).registerTempTable("tmp_users"); 

Row[] res = hiveContext.sql("SELECT " 
    + " u.name, " 
    + " SUM(s.revenue) AS revenue " 
    + "FROM " 
    + " stats s " 
    + " INNER JOIN tmp_users u " 
    + "  ON u.id = s.user_id 
    + "GROUP BY " 
    + " u.name " 
    + "ORDER BY " 
    + " revenue DESC 
    + "LIMIT 10").collect(); 

String ids = ""; 
// now grab me some info for users that are in tmp_user_stats 
for (i = 0; i < res.length; i++) { 
    s += (!s.equals("") ? "," : "") + res[i](0); 
} 

hiveContext.jdbc(
dbProperties.getProperty("myDbInfo"), 
"(SELECT name, surname, home_address FROM users WHERE id IN ("+ids+")) r", 
new Properties()).registerTempTable("tmp_users_prises"); 

、それはクエリを実行し、それが実行されます(少なくとも)ノードごとに一度これは、私たちのDB管理者がナイフでオフィスを回って走っているところです。

これを処理する最善の方法は何ですか? 3つのマシンのようにジョブを実行し、3つのクエリに制限して、他のノードがHadoopにデータを書き込んでそれを使うことはできますか?

本質的には、コメントで示唆されているように、私はMySQL側からデータを準備してHadoopにインポートできるETLジョブの外でクエリを実行できます。ただし、それ以降のクエリがある可能性があります。これは、のSparkおよびJDBC接続設定の解決策を示唆しています。

私はまだそれが仕事をすることはまだ分かっていませんが、少なくともより合理的なソリューションを提供するので、私はSqoopソリューションを受け入れるでしょう。何か見つかったら、もう一度編集します。あなたがデータをキャッシュすることができます

答えて

1

:最初のリード後

val initialDF = hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()) 
initialDF.cache(); 
initialDF.registerTempTable("tmp_users"); 

を、データがメモリ

代替にキャッシュされます(つまり、DBAを傷つけることはありません;))パラメータ--num-mappers=3とSqoopを使用することであり、その後、結果ファイルをSparkにインポート

関連する問題