2017-05-19 17 views
0

私はSparkプロジェクトに取り組んでおり、アマゾンのクラスタでアプリをエスケープしようとしています。小さなファイルでもパフォーマンスは非常に遅いです。私は解決策を望んでいない、経験豊富な遅さのための考えられる理由についてのちょうど意見。AmazonのクラスタでPySparkが非常に遅い

spark = SparkSession.builder.appName("appName").getOrCreate() 
sc = spark.sparkContext 

rec= sc.textFile(sys.argv[1]) 
# rec= sc.parallelize(records.collect()) 

a= rec.map(lambda line: line.split("\t")) 
      .filter(lambda x: int(x[6])>=4) 
      .map(lambda x: (x[1],[x[2], x[6]])) 
a=a.join(a) 
    .filter(lambda (x,(a,b)): a[0]<b[0]) 
    .map(lambda (x,(b,a)): ((a[0],b[0]),x)) 
    .groupByKey() 
    .filter(lambda (x,y): len(set(y))>2) 
    .sortBy(lambda a: a[0]) 
    .saveAsTextFile(sys.argv[2]) 
+1

*非常に遅い*と*小さい*の定義は何ですか? – mtoto

+0

300MBの場合は150秒。 Javaでは、操作は多かれ少なかれ60秒です – RamsesXVII

+0

ファイルをPandasで読み込み、それをSpark RDDにプッシュすることを検討してください。 – Henry

答えて

0

このコードシーケンスは、あなたの最も深い悲しみを引き起こしている可能性があります

records = sc.textFile(sys.argv[1]) 
rec= sc.parallelize(records.collect()) 

なぜ?あなたはスパークコンテキストのテキストファイル関数を介してRDDとしてファイルを読み込んでいます。そして、それはクラスタ全体にマテリアライズされます。次に、キッカーがあります。records.collect()を呼び出すと、クラスタにすべてのデータを送信するように指示します。 (どのマシンがジョブを起動したとしても)ドライバに渡し、最後に、ローカルに収集されたリストからRDDを再構築します。 rec RDDの代わりにRDDレコードを使用してください

編集: aを自分自身にクロス結合しているようです。それは意図されているのですか?

キーでグループ化すると、すべてのデータを強制的にシャッフルし、キーで縮小します。

SparkContextの使用を中止します。下位互換性のためにそこにあります。あなたのsparksessionの.read.option("delimiter", "\t").csv(file path)を使って、RDDの代わりにDataFrameを作成し、タブ区切りの行を一般的なRowオブジェクトに解析し、DataFrame APIを使用して非常に優れたパフォーマンスを得ることができます(タングステンとCatalystによる)。これはPySparkを使用しているので、Spark 2.xをDataFramesとPythonで使用すると、パフォーマンスがJVMのスカラと一致するため、PythonでRDDを使用すると、Pythonインタプリタが作業を行うため、Scalaの方がはるかに高速です)。

+0

ありがとうございます。とにかく私は(rec = sc.parallelizeを削除した後で)ほとんどの時間がオペレーションに費やされていることがわかりました。 .sortBy(lambda a:a [0]) .saveAsTextFile(sys.argv [2]) 実行の時間を改善する? – RamsesXVII

+0

遅れて実行されているので、セーブコールは常にハングアップしているように見えますが、ほとんどありません。 – Garren

+0

aとのクロス結合が必要です。私は "idPlane、idUser"の構造体にデータを持っており、私は同じ面を取るユーザの対が必要です:idPlane、idUser、idUser。とにかく私がコマンドを削除した場合 .sortBy(lambda a:a [0]) .saveAsTextFile(sys.argv [2]) 実行時間が80%減少!私は本当にこれを避けて同じ出力を得ることができるのかどうか分かりません。問題はこれらのコマンドにリンクされている必要があります! – RamsesXVII

0

よくPySparkまたはSparkは、大量のデータを処理する必要がある場合に使用します。小さなデータでは遅いことが予想されます。理由は次のとおりです。

  1. 実行中のエグゼキュータの数に応じてPySparkでは、JVMを最初に起動する必要があります。さらに、PySparkの場合、呼び出される必要があるPythonのサブプロセスを通じて追加のオーバーヘッドが作成されます。 PySpark Internals
  2. 第2の理由は、データシャッフルによるものです。あなたのデータはネットワーク上でシャッフルされるかもしれません。ローカルの場合、データは同じノード上で計算されます。データの配布のために、スケジューラはまずデータを配置する必要がある場所を特定し、次にそれをどのように処理するかを把握する必要があります。

PySpark/Sparkは、「大きなデータ」で何かをする必要がある場合にのみ輝きます!スパークは遅いと言われているので、最初は非常に多くの人が非常に失望しているのを見ましたが、ごくわずかなデータ量でしか使用していませんでした。お役に立てれば!一人で

関連する問題