2016-11-03 6 views
3

Spark 1.6.1を使用してHDFSに書き込んでいます。場合によっては、すべての作業が1つのスレッドで実行されているように見えます。何故ですか?Sparkを使用してHDFSへの読み書きを遅くする

また、パーケットファイルをImpalaに登録するには、parquet.enable.summary-metadataが必要です。

Df.write().partitionBy("COLUMN").parquet(outputFileLocation); 

また、これはすべて実行者の1つのCPUで発生するようです。その後、再び

16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0 
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3 times so far) 
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: 

: -

16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz] 
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting. 
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0 time so far) 
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1 time so far) 
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time. 
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: 

スキーマ何度も何度も次の行の20倍程度の

について200。

16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040 
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed 
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver 

更新: parquet.enable.summary-メタデータをfalseに設定する - は、次の行の最後で、その後

16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp} 

16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz] 
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk. 
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651 

について200。 21.

Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation); 

から
削減パーティションそれは速度を改善しましたが、まだ完了するまでに時間を要します。

更新: - 問題のほとんどの理由は、複数の左外部結合が非常に小さいデータを書き込み直前に実現するためです。ファイルを開いたままにするAppendモードのため、スピルが発生しています。これは、このモードでは5つのオープンファイルのデフォルトの制限です。 "spark.sql.sources.maxConcurrentWrites"プロパティを使用してこれを増やすことができます。

+0

あなたは、ファイルの読み込み時にコード内で再分割を使用しようとしましたか? –

+0

デフォルトでは、Sparkはワーカーごとに1つのエグゼキュータを割り当てます。必要なエグゼキュータの数を指定できます。あなたはどちらのマスターを使いましたか(ローカル、糸)? – ahars

+0

spark-submit '' - master ''糸クラスタ '' --driver-memory '' 8G ' - ドライバコア' '3' --executor-memory '' 8G '' --driver-コア '' 3 '' - 実行者コア '' 3 '' - ナムエグゼキュータ '' 4 ' – morfious902002

答えて

0

最後に、書き込み部分に達する前にコード内のいくつかの最適化を行った後、書き込み時間が改善されました。シャッフルが4-5Gbを超えていたので、私たちは再パーティションを行うことができませんでした。前回の変更の後、コードを合体から再分割に変更しました。そこでは、エグゼキュータの各CPUに書き込むデータの量を与えることで、すべてのエグゼキュータにデータを配布しました。 ジョブによって作成されたパーケットファイルのサイズが、書き込み前にDataframeを再分割しようとするよりも異なる場合があります。

また、これはあまりにも書き込み性能を支援することができます: -

sc.hadoopConfiguration.set("parquet.enable.dictionary", "false") 
関連する問題