2017-06-12 12 views
0

私のHDFSには、約350のcsvファイルのコレクションがあります。各ファイルのサイズは数KBから250Mbまでです。これらのcsvファイルの値をRECORDというテーブルに挿入する必要があります。挿入中に、他のテーブルも参照する必要があります(PARAMETERとFRAME_RATE)。私はこの仕事を達成するためにこの次のクエリを持っています。私の小さなのPoC研究で大量のcsvファイルからテーブルにレコードを挿入する

-- create external table for the csv files in hdfs 
    CREATE EXTERNAL TABLE TEMP_CSV( 
    FRAME_RANK BIGINT, 
    FRATE BIGINT, 
    SOURCE STRING, 
    PARAM STRING, 
    RECORDEDVALUE STRING 
    )   
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ';'  
location '/user/bala/output' 
TBLPROPERTIES ("skip.header.line.count"="2"); 


-- Now insert fresh values into T_RECORD 
INSERT OVERWRITE TABLE RECORD 
PARTITION(SESSION) 
SELECT DISTINCT   
    TEMP_CSV.F_FRAME_RANK,       
    PARAMETER.K_ID, 
    FRAME_RATE.K_ID, 
    CAST(TEMP_CSV.RECORDEDVALUE as FLOAT),   
    split(reverse(split(reverse(TEMP_CSV.INPUT__FILE__NAME),"/")[0]), "[.]")[0] AS SESSION   
    FROM TEMP_CSV , PARAMETER, FRAME_RATE 
    WHERE PARAMETER.NAME = TEMP_CSV.PARAM AND FRAME_RATE.FRATE = TEMP_CSV.FRATE; 

私は350個のファイルを処理するとき、私は約50 CSVファイルを持っていたし、このクエリが成功したしかし

下記の構成で500secondsについて
Hive-on-spark 
spark standalon 
6 nodes in the cluster 
4 cores per node/16gb RAM 
spark.executor.memory 2g 

にRECORDテーブルにレコードを埋め、エグゼキュータのJavaヒープ・スペース・エラーで照会が失敗しました。だから、私はexecutor.memoryを4gに増やしました。失敗しました。私はexecutor.memoryを6gに増やしました。失敗しました。最後に、私はspark.executor.memoryを12gに増やしました。成功。しかし、それは約2時間30分かかりました。 spark.executor.memoryを12gに増やすと、ノードごとに1つのエグゼキュータが発生するため、エグゼキュータは6つしかありません。

私executor.memoryが6グラムだったときに、これは障害発生時のログで、エグゼキュータで

****** 
****** 
2017-06-12 11:59:09,988 Stage-1_0: 101/101 Finished Stage-2_0: 12/12 Fini shed Stage-3_0: 0(+12,-2)/12 
2017-06-12 11:59:12,997 Stage-1_0: 101/101 Finished Stage-2_0: 12/12 Finished Stage-3_0: 0(+12,-2)/12 
2017-06-12 11:59:16,004 Stage-1_0: 101/101 Finished Stage-2_0: 12/12 Finished Stage-3_0: 0(+12,-2)/12 
2017-06-12 11:59:19,012 Stage-1_0: 101/101 Finished Stage-2_0: 12/12 Finished Stage-3_0: 0(+12,-2)/12 
***** 
***** 

、これはエラーログです

17/06/12 11:58:36 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(5,[Lscala.Tuple2;@e65f7b8,BlockManagerId(5, bndligpu04, 54618))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [50 seconds]. This timeout is controlled by spark.executor.heartbeatInterval 
at  org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101) 
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:476) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:505) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:505) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:505) 
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1801) 
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:505) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [50 seconds] 
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
at scala.concurrent.Await$.result(package.scala:107) 
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
... 14 more 
17/06/12 11:58:36 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 115) 
java.lang.OutOfMemoryError: Java heap space 
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
at org.apache.orc.impl.OutStream.getNewInputBuffer(OutStream.java:109) 
at org.apache.orc.impl.OutStream.write(OutStream.java:130) 
at org.apache.orc.impl.RunLengthIntegerWriterV2.writeDeltaValues(RunLengthIntegerWriterV2.java:238) 
at org.apache.orc.impl.RunLengthIntegerWriterV2.writeValues(RunLengthIntegerWriterV2.java:186) 
at org.apache.orc.impl.RunLengthIntegerWriterV2.write(RunLengthIntegerWriterV2.java:772) 
at org.apache.orc.impl.WriterImpl$IntegerTreeWriter.writeBatch(WriterImpl.java:1039) 
at org.apache.orc.impl.WriterImpl$StructTreeWriter.writeRootBatch(WriterImpl.java:1977) 
at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:2759) 
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushInternalBatch(WriterImpl.java:277) 
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:296) 
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:103) 
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:743) 
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837) 
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:97) 
at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.processKeyValues(SparkReduceRecordHandler.java:309) 
at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.processRow(SparkReduceRecordHandler.java:267) 
at org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunctionResultList.processNextRecord(HiveReduceFunctionResultList.java:49) 
at org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunctionResultList.processNextRecord(HiveReduceFunctionResultList.java:28) 
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList$ResultIterator.hasNext(HiveBaseFunctionResultList.java:95) 
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$15.apply(AsyncRDDActions.scala:120) 
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$15.apply(AsyncRDDActions.scala:120) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
17/06/12 11:58:36  ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-1,5,main] 
java.lang.OutOfMemoryError: Java heap space 
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
at org.apache.orc.impl.OutStream.getNewInputBuffer(OutStream.java:109) 
at org.apache.orc.impl.OutStream.write(OutStream.java:130) 
at org.apache.orc.impl.RunLengthIntegerWriterV2.writeDeltaValues(RunLengthIntegerWriterV2.java:238) 

私の質問は以下のとおりです -

  1. クエリを最適化するための有効範囲はありますか?
  2. このチャレンジに対応できるその他のスパーク/ハイブ設定はありますか?
  3. ハイブに処理を指示する方法はありますか?言い換えれば、50個のファイルを別の50個後に伝えることはできますか?

この問題を解決するためのヘルプや情報が役立ちます。もう1つの情報、 'SELECT'ステートメントが動作し、私は色相ブラウザで結果を見ることができました。私が「SELECT」によって収集された情報をINSERTしようとすると、クエリが壊れてしまいます。

答えて

-1

このジョブのエグゼキュータコアを増やすことができます。

executorコアは、実行プログラムが実行できる並行タスクの数です。ワーカーを実行させるためのワーカーコアは「CPUコア」です。

Sparkでは、Sparkアプリケーションがワーカーでのみマシン上で使用できるようにするために、CPUコアの総数を定義するスレーブ起動時のCPUコアの量を設定するオプションがあります。デフォルトは次のとおりです。このようなものになるだろうスパークを開始するために、すべての利用可能なコア

のコマンドを使用します。 ./sbin/start-all.sh --cores 2

それとも、いくつかのログを掘り下げると、テーブル自体、私の後--executor-cores 2

+0

提案していただきありがとうございます。しかし、私はどのように--executor-cores = 2が正しい解決策になるのだろうかと思います。なぜなら、1つのコアで6つのすべての6 GBが使用可能な状態で1つのプロセスしか実行されていないと、それは失敗するからです。 6GBで共有するもう1つのプロセスを実行するともう一度失敗します。右?それにもかかわらず私は試してみることができます。 – Bala

0

で試すことができます次を実行しました

  1. RECORDテーブルの「クラスタリング」を削除しました。以前は、RECORDはバケツ(12個の数字)で、第2段階で12のタスクを作成しました。この数を増やすために、私はバケツを取り除いた。現在、273のタスクが作成されています。私はまだそれの背後にある理由を知らない。しかし、エグゼキュータメモリが4GBの場合、この設定が有効になりました。

  2. 私はspark-on-yarn構成に移行しました。これによりパフォーマンスが向上しました。今、私は35メートルでクエリを終了することができます。

まだ、クエリを最適化するための有効範囲または2つがあることがわかります。私は参加しようとします。

関連する問題