EC2に1マスターと2スレーブを持つSpark StandAloneクラスタを実行しています。クラスタが動作しています。私はs3からデータをロードするpythonアプリケーションを持っています。以下のコードは次のとおりです。Pyspark - データフレームforeach関数が複数のワーカー/並列化で動作しない
spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED")
それから私は、DFの行のそれぞれにいくつかの作業を行うにはDFに.foreach(func)
を適用:
def test_func(row):
row = modify(row)
row.save() # just an example
df.foreach(test_func)
私は書類上で読んでいると、彼らは.foreach()
がすでにあると言います分散/並列処理に最適化されています。しかし、TEST_FUNCだけ1つのノード上で実行され、以下のログを参照してください。(タスク3が.foreach(test_func)
ある)
INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes)
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2
クラスタ内の複数のノード/労働者にこのtest_func
を配布するとにかくはありますか?ヘルプは本当に感謝しています。前もって感謝します。
****** ****** UPDATE
私はデータをぶつけてきたが、そこに1人の労働者に割り当てられた唯一の1つのタスクがまだあり、それが機能を実行するために多くの時間を要します。 これは私がもう一つは、私は--executor-memory 5G
を設定しても、ですが、労働者が唯一の1Gbラムを使用したアプリケーション
./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster
を実行する方法です。誰も私にこれを手伝ってもらえますか?私は数日間このことに固執しています。事前にどうもありがとうございました。 @LostInOverflowから
このコードでは、単一のタスクについては説明しません。もっと多くのデータが不足している可能性もあります。 –
こんにちはLostInOverflow、ご意見ありがとうございます。この単一のタスクが完了するまでに3分かかります。分散できる場合は速くする必要がありますか?とにかく私は大きなデータセットを試して、違いがあるかどうかを調べます。 – Leo