2017-12-18 10 views
0

私はApache Sparkを使って、Elasticsearchクラスタに格納しているデータの解析エンジンを作ることができるかどうかを試してきました。私は、RDDサイズ(つまり数百万レコード)があれば、最も簡単な操作でさえも1分以上かかることが分かっています。Apache Spark Map Job遅い

package es_spark; 

import java.util.Map; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 

public class Main { 

    public static void main (String[] pArgs) { 

     SparkConf conf = new SparkConf().setAppName("Simple Application"); 
     conf.set("es.nodes", pArgs[0]); 

     JavaSparkContext sc = new JavaSparkContext(conf); 

     long start = System.currentTimeMillis(); 
     JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "test3"); 
     long numES = esRDD.count(); 
     long loadStop = System.currentTimeMillis(); 

     JavaRDD<Integer> dummyRDD = esRDD.map(pair -> {return 1;}); 
     long numDummy = dummyRDD.count(); 
     long mapStop = System.currentTimeMillis(); 

     System.out.println("ES Count: " + numES); 
     System.out.println("ES Partitions: " + esRDD.getNumPartitions()); 

     System.out.println("Dummy Count: " + numDummy); 
     System.out.println("Dummy Partitions: " + dummyRDD.getNumPartitions()); 

     System.out.println("Data Load Took: " + (loadStop - start) + "ms"); 
     System.out.println("Dummy Map Took: " + (mapStop - loadStop) + "ms"); 

     sc.stop(); 
     sc.close(); 
    } 
} 

私は3人の奴隷、それぞれ14個のコアを持つとRAMの49.0ギガバイトでスパーククラスタ上でこれを実行しようとしました:

は例えば、私はこの単純なテストプログラムを作りました。次のコマンドで:

./bin/spark-submit --class es_spark.Main --master spark://<master_ip>:7077 ~/es_spark-0.0.1.jar <elasticsearch_main_ip> 

出力は次のとおりです。

ES Count: 8140270 
ES Partitions: 80 
Dummy Count: 8140270 
Dummy Partitions: 80 
Data Load Took: 108059ms 
Dummy Map Took: 104128ms 

それは8+万レコードにダミーのマップジョブを実行するため1.5+分かかります。地図の仕事は何もしないので、私はこのパフォーマンスが驚くほど低いことがわかります。私は何か間違っているのですか、これはSparkの通常のパフォーマンスですか?

私はまた、--executor-memory--executor-coresを大きく変えずに試しました。

答えて

0

マップジョブが何もしないので、このパフォーマンスは驚くほど低くなります。

マップジョブは何もしません。 Elastic検索から完全なデータセットを取得する必要があります。データはキャッシュされないので、各アクションごとに1回、2回発生します。この時間には初期化時間も含まれます。

全体的に、あなたは計測:ESクエリの

  • 時間を。
  • SparkクラスタとESの間のネットワーク遅延。

など、いくつかの二次的なもの:エグゼキュータのJVMの完全な初期化の

  • 時間。
  • Probablly GC休止時間。
+0

私はまた、カウントの直後にesRDDのキャッシュを呼び出そうとしました: 長いnumES = esRDD.count(); esRDD.cache(); これは、Elastic Searchからのダブルフェッチを防止しませんか? 地図のタイミングはあまり変わっていません。 – dubbervt

+0

上記のサンプルコードをesRDD.count()の前にesRDD.cache()を呼び出すように変更しました。残念ながら、地図のパフォーマンスはそれほど助けになりませんでした。 ダミーマップTook:113298ms – dubbervt

+0

私はesRDDのカウントを取り出して、** complete **テストプログラムを約1.5分で実行したので、何かが起きているかもしれません。だから地図の操作時間はほとんど何もない。 – dubbervt

0

OOMの障害や重大なGCが表示されたり、ボトルネックとしてディスクに流出したりしない限り、エグゼキュータのメモリを変更する価値はありません。変更するときは、spark.memory.fractionも減らす必要があります。あなたの仕事のために、それは非常に役立つことはほとんどありません。

Sparkの起動コストがあるため、データの負荷が小さいほど効率が悪いです。スタートアップを1分よりもはるかに短くすることができるはずですが、リアルタイム分析ではなく、非常に大きなバッチロードではまだまだ実用的です。

RDDの代わりにDataFrame APIを使用することをお勧めします。上の簡単な例の操作では、それは重要ではありませんが、より複雑になるにつれてパフォーマンスの最適化の恩恵を受ける可能性が高くなります。

sql.read.format("es").load("test3")

遅い原因を解決するには、Spark UIをご覧ください。あなたは実際に並列性を得ていましたか?すべての仕事はほぼ同じ時間で実行されましたか?別の考えられる原因としては、クラスタとESサーバー間のネットワークの問題があります。

+0

私は特にElasticsearchの読み込み時間には関係しません。負荷のタイミング情報は無視できます。私は、何もしていないマップジョブが負荷の後に実行するのに1.5分かかるという事実にもっと関心があります。 – dubbervt

関連する問題