2016-03-26 10 views
-1

私はSparkで初めてです。私は2つのスパークSQlデータフレームを持っています。コードは次のようになりますApache Spark Outer Join長い時間をかけて

val df1 = sparksql.read.format(com.databricks.avro).loadfiles(filespath) 

val df2 = sparksql.sql(select * from hivedb) 

val df3 = df1.join(df2,df1.srcid <=> df2.srcid and df1.srccode <=> df2.srccode, left_outer) 

sqlcontext.cacheastable(""table) 

val insertid = sparkcontext.sql("select * from table where cid is null") 

val updateid = sparkcontext.sql("select * from table where cid is not null") 

この2つのdfをデータベースに保存します。

srcidstccodeなどの約10個のフィールドと、ファーストネーム、ラストネームなどの顧客の詳細などがあります。これらのフィールドは、もともとはoracleテーブルのvarcharであった小さな文字列です。

両方のデータフレームには、約300万レコードがあります。

私たちがジョブを実行すると、Executionerは計算を完了するのに約2時間かかってしまいます。左ブロードキャストジョインとパーティション数の設定など、多くの設定を試しましたが、無駄にしました。

データが小さい(50Kファイル)場合、ジョブは6分で完了します。私はSparkのUIを見て、実行計算が始まるだけです。私たちも、死刑執行12 Gあたりのメモリを設定して、私たちは、Clouderaの5.Xのクラスタと糸でランニングにバンドルされているスパーク1.5.0を使用している20

としてそれをNUMコアを搭載した24人の処刑を与えている

sparkコンテキスト以外のどこで低速をデバッグできますか教えてください。

+0

ここにコード例を追加しました。助けて欲しい。この結合を実行してデータをOracle DBに保存する必要があります –

答えて

0

あなたのdf3の物理的な計画をチェックし、最後にクロス製品がないことを確認することをお勧めします。もう1つは、spark.sql.shuffle.partitionsを確認し、それが低い場合はそれを上げます。約200を使用してください。

非常に重要なタスクで結合を最適化すると、多くのことを分析する必要があります。また、タングステンがオンになっていることを確認してください。 spark.sql.tungsten.enabledをtrueに設定します。

これを確認してお知らせください。私はそれが追加された最適化の完全なリストを持っていたとは思わないので、1.5.0は結合のために少し遅くなります。それにもかかわらず、これらをチェックし、私に知らせてください。

+0

こんにちはSrini .. Tugstenを有効にしました。また、パーティションの数も200です。私はスパークのUIでそれをチェックした。私は物理的な計画を参照して交換を参照してください。それはクロスプロダクトと同じですか?はいの場合は削除する方法...次のコメントで物理的な計画を提供します –

+0

パーティションについてはどうですか? – Srini

+0

私は物理的なプランをコピーしてコピーすることはできません。ここでは長いので、物理的なプランの構文の下に書いてあります –

0

この問題は修正されました。問題は、入力キーにnullがある場合、sparkはCartsian結合を作成するということでした。私たちはキーからヌルを取り除いてこれを修正しました