2017-06-13 21 views
3

私は火花2.11バージョンを使用していますが、私は自分のアプリケーションで唯一の3つの基本的な操作を行っています:パフォーマンスの問題

  1. は、データベースからレコードを取っ:220万
  2. は、ファイルからレコードをチェックする(5 000 )
  3. 含ま書き込みを使用してデータベース(220万)に存在するには、CSV形式のファイル

にレコードを一致しかし、これらの3つの操作のためには、約20分かかります。 SQLで同じ操作を行うと、1分未満で完了します。

私は結果が非常に速くなるが、あまりにも多くの時間を費やしているので、私はsparkを使い始めた。パフォーマンスを向上させる方法

ステップ1:データベースからレコードを取得します。

 Properties connectionProperties = new Properties(); 
     connectionProperties.put("user", "test"); 
     connectionProperties.put("password", "test##"); 
     String query="(SELECT * from items) 
     dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties); 

ステップ2:ファイルB(2M)中に存在するファイルAのレコード(5K)チェック使用が含ま

Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner"); 

ステップ3:書き込みは、CSV形式のファイルへの記録を一致

NewSet.repartition(1).select("*") 
     .write().format("com.databricks.spark.csv") 
     .option("delimiter", ",") 
     .option("header", "true") 
     .option("treatEmptyValuesAsNulls", "true") 
     .option("nullValue", "") 
     .save(fileAbsolutePath); 

パフォーマンスを向上させるために、私はキャッシュ設定のようないくつかのことを試しました データのシリアル化

set("spark.serializer","org.apache.spark.serializer.KryoSerializer")), 

シャッフル時間

sqlContext.setConf("spark.sql.shuffle.partitions", "10"), 

データ構造チューニング

-XX:+UseCompressedOops , 

アプローチのいずれもがより良いパフォーマンスが得られていないされていません。

+0

このユースケースにsparkを使用する理由はありますか? 5kのレコードをDBに書き込んで、DB内でSQL結合を発行するのが最も効率的なアプローチになると思います。 – maasg

+0

このクエリをSparkに具体化するのにどれくらいの時間がかかるのですか?「アイテムからのSELECT *」? – maasg

答えて

4

パフォーマンスの向上は、並列性の向上に似ています。

並列性はRDDのパーティション数によって異なります。

データセット/データフレーム/ RDDのパーティション数が多すぎず、パーティション数も非常に少ないことを確認してください。

コードを改善するための推奨事項をご確認ください。私はscalaにもっと慣れているので、私はscalaで提案を提供しています。

ステップ1: numPartitionsを記述することで、データベースとの接続を制御できることを確認してください。

接続数=パーティション数。

以下、私はnum_partitionsに10を割り当てました。これはパフォーマンスを上げるために調整する必要があります。

int num_partitions; 
    num_partitions = 10; 
    Properties connectionProperties = new Properties(); 
    connectionProperties.put("user", "test"); 
    connectionProperties.put("password", "test##"); 
    connectionProperties.put("partitionColumn", "hash_code"); 
    String query = "(SELECT mod(A.id,num_partitions) as hash_code, A.* from items A)"; 
    dataFileContent = spark.read() 
    .jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", 
     dbtable = query, 
     columnName = "hash_code", 
     lowerBound = 0, 
     upperBound = num_partitions, 
     numPartitions = num_partitions, 
     connectionProperties); 

You can check how numPartitions works

ステップ2:

Dataset<Row> NewSet = source.join(target, 
    target.col("ItemIDTarget").contains(source.col("ItemIDSource")), 
    "inner"); 

5kの記録(少量のデータ)は、後述するように、放送が参加使用することができますを持つテーブル/データフレームの1以来。

import org.apache.spark.sql.functions.broadcast 
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key") 

ステップ3: 使用することは、それは完全なシャッフルを避けるように、パーティションの数を減少させる合体します。

NewSet.coalesce(1).select("*") 
     .write().format("com.databricks.spark.csv") 
     .option("delimiter", ",") 
     .option("header", "true") 
     .option("treatEmptyValuesAsNulls", "true") 
     .option("nullValue", "") 
     .save(fileAbsolutePath); 

希望は私の答えです。

関連する問題