2016-05-16 5 views
2

S3から数多くのCSVファイル(それぞれ〜63MB @ 63MB)にアクセスする必要がある場合、Sparkアプリケーションが失敗してSpark RDDにパイプされます。 CSVを分割する実際のプロセスはうまくいくようですが、S3NativeFileSystemへの余分な関数呼び出しでエラーが発生し、ジョブがクラッシュするようです。開始するにS3NativeFileSystemがAWS EMR 4.6.0でPysparkアプリケーションを終了しています

、次は私のPySparkアプリケーションです:

from pyspark import SparkContext 
sc = SparkContext("local", "Simple App") 
from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
import time 

startTime = float(time.time()) 

dataPath = 's3://PATHTODIRECTORY/' 
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "MYKEY") 
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "MYSECRETKEY") 

def buildSchemaDF(tableName, columnList): 
    currentRDD = sc.textFile(dataPath + tableName).map(lambda line: line.split("|")) 
    currentDF = currentRDD.toDF(columnList) 
    return currentDF 

loadStartTime = float(time.time()) 
lineitemDF = buildSchemaDF('lineitem*', ['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment']) 
lineitemDF.registerTempTable("lineitem") 
loadTimeElapsed = float(time.time()) - loadStartTime 

queryStartTime = float(time.time()) 

qstr = """ 
    SELECT 
     lineitem.l_returnflag, 
     lineitem.l_linestatus, 
     sum(l_quantity) as sum_qty, 
     sum(l_extendedprice) as sum_base_price, 
     sum(l_discount) as sum_disc, 
     sum(l_tax) as sum_tax, 
     avg(l_quantity) as avg_qty, 
     avg(l_extendedprice) as avg_price, 
     avg(l_discount) as avg_disc, 
     count(l_orderkey) as count_order 
    FROM 
     lineitem 
    WHERE 
     l_shipdate <= '19981001' 
    GROUP BY 
     l_returnflag, 
     l_linestatus 
    ORDER BY 
     l_returnflag, 
     l_linestatus 
    """ 
tpch1DF = sqlContext.sql(qstr) 

queryTimeElapsed = float(time.time()) - queryStartTime 
totalTimeElapsed = float(time.time()) - startTime 

tpch1DF.show() 

queryResults = [qstr, loadTimeElapsed, queryTimeElapsed, totalTimeElapsed] 
distData = sc.parallelize(queryResults) 
distData.saveAsTextFile(dataPath + 'queryResults.csv') 

print 'Load Time: ' + str(loadTimeElapsed) 
print 'Query Time: ' + str(queryTimeElapsed) 
print 'Total Time: ' + str(totalTimeElapsed) 

私は、次のAWS CLIコマンドを使用して、スパークEMRクラスタをスピンアップすることから始めステップバイステップでそれを取るために(キャリッジリターンは読みやすくするために追加):

aws emr create-cluster --name "Big TPCH Spark cluster2" --release-label emr-4.6.0 
--applications Name=Spark --ec2-attributes KeyName=blazing-test-aws 
--log-uri s3://aws-logs-132950491118-us-west-2/elasticmapreduce/j-1WZ39GFS3IX49/ 
--instance-type m3.2xlarge --instance-count 6 --use-default-roles 

EMRクラスタは、私はその後、「/home/hadoop/pysparkApp.py」でのマスターノード上に私のPysparkアプリケーションをコピープロビジョニング終了した後。それをコピーすると、spark-submitのステップを追加することができます。

aws emr add-steps --cluster-id j-1DQJ8BDL1394N --steps 
Type=spark,Name=SparkTPCHTests,Args=[--deploy-mode,cluster,- 
conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,5,--executor 
cores,5,--executor memory,20g,/home/hadoop/tpchSpark.py] 
,ActionOnFailure=CONTINUE 

は、今私は、前述のCSVのほんの数は、最終的な結果が生成されるファイルの上に、この手順を実行する場合、しかし、スクリプトがまだ失敗していると主張するだろう。

私はそれがS3NativeFileSystemへの余分な呼び出しに関連していると思いますが、私は確信していません。これらは、私がその結論に導く私が得ている糸ログメッセージです。

16/05/15 23:18:00 INFO S3NativeFileSystem: Opening 's3://data-set-builder/splitLineItem2/lineitemad' for reading 
16/05/15 23:18:00 INFO latency: StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[10BDDE61AE13AFBE], ServiceEndpoint=[https://data-set-builder.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, Client Execute Time=[296.86], HttpRequestTime=[295.801], HttpClientReceiveResponseTime=[293.667], RequestSigningTime=[0.204], CredentialsRequestTime=[0.002], ResponseProcessingTime=[0.34], HttpClientSendRequestTime=[0.337], 
16/05/15 23:18:02 INFO ApplicationMaster: Waiting for spark context initialization ... 

私のように迷ってしまいました:「部分的な結果」(206エラー)が生じ、

16/05/15 23:18:00 INFO HadoopRDD: Input split: s3://data-set-builder/splitLineItem2/lineitemad:0+64901757 
16/05/15 23:18:00 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[ED8011CE4E1F6F18], ServiceEndpoint=[https://data-set-builder.s3-us-west-2.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, ClientExecuteTime=[77.956], HttpRequestTime=[77.183], HttpClientReceiveResponseTime=[20.028], RequestSigningTime=[0.229], CredentialsRequestTime=[0.003], ResponseProcessingTime=[0.128], HttpClientSendRequestTime=[0.35], 

秒1が正しく実行するように見えるわけではありませんが:最初の呼び出しがうまく動作するように見えます最初のファイルが効果的に応答し、ファイルを分割したように見える場合に、なぜS3NativeFileSystemへの2回目の呼び出しを行っているのかを知ることができます。これは私のEMR構成の製品ですか?私はS3Nativeがファイル制限の問題を抱えていることを知っています。まっすぐなS3呼び出しが最適です。これは私がやろうとしているものですが、私が何をしてもこの呼び出しはそこにあるようです。助けてください!

また、関連性がある場合は、私の糸ログにいくつかのエラーメッセージを追加してください。スパーク構成の優先

1)

16/05/15 23:19:22 ERROR ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application. 
16/05/15 23:19:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Timed out waiting for SparkContext.) 

2)

16/05/15 23:19:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 (No such file or directory) 
     at java.io.FileOutputStream.open(Native Method) 
     at java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
     at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:162) 
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:226) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
16/05/15 23:19:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 
16/05/15 23:19:22 WARN TaskMemoryManager: leak 32.3 MB memory from [email protected] 
16/05/15 23:19:22 ERROR Executor: Managed memory leak detected; size = 33816576 bytes, TID = 14 
16/05/15 23:19:22 ERROR Executor: Exception in task 13.0 in stage 1.0 (TID 14) 
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/3a/temp_shuffle_b9001fca-bba9-400d-9bc4-c23c002e0aa9 (No such file or directory) 
     at java.io.FileOutputStream.open(Native Method) 
     at java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
     at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88) 
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

答えて

3

順序は:

SparkContext(コード/アプリケーション)>火花提出>火花デフォルト。confに

だから、物事のカップルは、ここで指すように -

  1. 使用糸クラスタをあなたのスパークでの展開モードとマスターとして提出命令 -

    spark-submit --deploy-mode cluster --master yarn ...

    OR

    spark-submit --master yarn-cluster ...

  2. コード内の "ローカル"文字列をsc = SparkContext("local", "Simple App")行から削除します。 conf = SparkConf().setAppName(appName) sc = SparkContext(conf=conf)を使用してSparkコンテキストを初期化します。

参考 - http://spark.apache.org/docs/latest/programming-guide.html

関連する問題