2016-12-06 9 views
0

EC2に1マスターと2スレーブを持つSpark StandAloneクラスタを実行しています。クラスタが動作しています。私はs3からデータをロードするpythonアプリケーションを持っています。以下のコードは次のとおりです。Pyspark - データフレームforeach関数が複数のワーカー/並列化で動作しない

spark = SparkSession.builder.appName("Example").getOrCreate() 
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED") 

それから私は、DFの行のそれぞれにいくつかの作業を行うにはDFに.foreach(func)を適用:

def test_func(row): 
    row = modify(row) 
    row.save() # just an example 

df.foreach(test_func) 

私は書類上で読んでいると、彼らは.foreach()がすでにあると言います分散/並列処理に最適化されています。しかし、TEST_FUNCだけ1つのノード上で実行され、以下のログを参照してください。(タスク3が.foreach(test_func)ある)

INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes) 
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2 

クラスタ内の複数のノード/労働者にこのtest_funcを配布するとにかくはありますか?ヘルプは本当に感謝しています。前もって感謝します。

****** ****** UPDATE

私はデータをぶつけてきたが、そこに1人の労働者に割り当てられた唯一の1つのタスクがまだあり、それが機能を実行するために多くの時間を要します。 これは私がもう一つは、私は--executor-memory 5Gを設定しても、ですが、労働者が唯一の1Gbラムを使用したアプリケーション

./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster 

を実行する方法です。誰も私にこれを手伝ってもらえますか?私は数日間このことに固執しています。事前にどうもありがとうございました。 @LostInOverflowから

+1

このコードでは、単一のタスクについては説明しません。もっと多くのデータが不足している可能性もあります。 –

+0

こんにちはLostInOverflow、ご意見ありがとうございます。この単一のタスクが完了するまでに3分かかります。分散できる場合は速くする必要がありますか?とにかく私は大きなデータセットを試して、違いがあるかどうかを調べます。 – Leo

答えて

0

それが本当にそうであるならば、このコードは、単一のタスクを説明していません。 データが不足している可能性があります。

これは正しいです。データを数十万レコードに増やした後、タスクは分割され、すべてのエグゼキュータに割り当てられます。