2017-05-17 10 views
2

私はpySparkで.saveAsTable()とハイブテーブル(寄木細工)にスパークデータフレームを保存しようとするが、以下のようにメモリの問題に走り続けるのです:メモリ割り当ての問題は、テーブルハイブに

org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1: 
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes. 

最初の数字(1034931)は、通常、異なる実行で変化し続けます。私は2番目の数字(1048576)が1024^2であることを認識していますが、私はそれがここで何を意味しているのかほとんど考えていません。

私のプロジェクトのいくつかのもの(まったく大きなデータフレームを持つもの)と全く同じテクニックを使用しており、問題なく動作しています。ここでは、基本的にプロセスと構成の構造をコピーして貼り付けていますが、メモリの問題に取り組んでいます!それは私が行方不明の何か自明でなければならない。 (これは正常に動作している場合〜10列と〜30万行を、より多くのことができる)

スパークデータフレーム構造を有している(のはsdfそれを呼びましょう):

+----------+----------+----------+---------------+---------------+ 
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str| 
+----------+----------+----------+---------------+---------------+ 
|val_a1_str|val_b1_num|val_c1_num|  val_d1_str|  val_e1_str| 
|val_a2_str|val_b2_num|val_c2_num|  val_d2_str|  val_e2_str| 
|  ...|  ...|  ...|   ...|   ...| 
+----------+----------+----------+---------------+---------------+ 

ハイブのテーブルは次のように作成されました:

sqlContext.sql(""" 
        CREATE TABLE IF NOT EXISTS my_hive_table (
         col_a_str string, 
         col_b_num double, 
         col_c_num double 
        ) 
        PARTITIONED BY (partition_d_str string, 
            partition_e_str string) 
        STORED AS PARQUETFILE 
       """) 

このテーブルにデータを挿入する試みは、次のコマンドである:

sdf.write \ 
    .mode('append') \ 
    .partitionBy('partition_d_str', 'partition_e_str') \ 
    .saveAsTable('my_hive_table') 

スパーク/ハイブの設定は、このようなものです:私は小さな塊、テーブルやデータフレームを再作成するデータフレームを分割し、メモリを増やし、.partitionBy('partition_d_str', 'partition_e_str').partitionBy(['partition_d_str', 'partition_e_str'])に変更しようとしたが、何も動いていないようにみえ

spark_conf = pyspark.SparkConf() 
spark_conf.setAppName('my_project') 

spark_conf.set('spark.executor.memory', '16g') 
spark_conf.set('spark.python.worker.memory', '8g') 
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000') 
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64') 
spark_conf.set('spark.executor.cores', '4') 

sc = pyspark.SparkContext(conf=spark_conf) 

sqlContext = pyspark.sql.HiveContext(sc) 
sqlContext.setConf('hive.exec.dynamic.partition', 'true') 
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000') 
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict') 
sqlContext.setConf('hive.exec.compress.output', 'true') 

。オンラインでも解決策は見つけられません。何がメモリエラーの原因になっていますか(私はどこから来ているのか完全に理解していません)、Hiveテーブルに書き込むようにコードを変更するにはどうすればよいですか?ありがとう。

+1

最小ページサイズ、つまり読み書き単位の最小単位は、デフォルトで1048576になるプロパティ 'parquet.page.size'によって定義されます。書き込みしようとしているデータがこのしきい値を下回っている可能性があります。それはなぜスローエラーがありますか?これは私の推測です... [これをチェックしてください](https://github.com/Parquet/parquet-mr/blob/fa8957d7939b59e8d391fa17000b34e865de015d/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java# L64) – Pushkr

+0

リンクありがとうございます。 'parquet.page.size'と' parquet.block.size'の設定だけでなく、私のデータのサイズを増やすだけで、あなたの提案を試しましたが、運はありません。同じエラー:( – vk1011

答えて

2

私は、.saveAsTable()を投げていたNULL可能フィールドでパーティショニングしていました。私はスパークデータフレームにRDDに変換されたとき、私は提供されたスキーマは、このように生成されました:partition_e_strので

from pyspark.sql.types import * 

# Define schema 
my_schema = StructType(
        [StructField('col_a_str', StringType(), False), 
        StructField('col_b_num', DoubleType(), True), 
        StructField('col_c_num', DoubleType(), True), 
        StructField('partition_d_str', StringType(), False), 
        StructField('partition_e_str', StringType(), True)]) 

# Convert RDD to Spark DataFrame 
sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema) 

(そのStructFieldための第三引数)nullable=Trueとして宣言されたに書き込むとき、それは問題を持っていましたそれはパーティション化フィールドの1つとして使用されていたため、ハイブテーブル。私はそれを:

# Define schema 
my_schema = StructType(
        [StructField('col_a_str', StringType(), False), 
        StructField('col_b_num', DoubleType(), True), 
        StructField('col_c_num', DoubleType(), True), 
        StructField('partition_d_str', StringType(), False), 
        StructField('partition_e_str', StringType(), False)]) 

に変更しました。

レッスン:パーティションフィールドがヌル入力可能でないことを確認してください。