2017-12-10 6 views
0

groupByKey()の後、私は自分のRDDを手に入れました。(0, [a list of name]) ユースケース:s3のファイルに名前のリストを書きます。このRDDは一行だけですので、私は直接foreach()spark foreach(keyValue)、値がメモリの爆発を引き起こすかどうか?

コードを使用している:私の質問は機能write_to_s3()lines = keyValue[1]である

def write_to_s3(keyValue): 
    lines = keyValue[1] 
    tmp_file = ... 
    with open(tmp_file, w+) as f: 
     for line in lines: 
      f.write(line + '\n') 
    # upload tmp_file to s3 
    # remove tmp_file 

myRDD.foreach(write_to_s3) 

、それはライン(リスト)ため、メモリブローアップさせることが可能です大きすぎますか?

答えて

0

私の質問は関数write_to_s3()、lines = keyValue [1]にあります。行(リスト)が大きすぎるためにメモリが壊れる可能性はありますか?

いいえ、しかし、groupByKeyだけでもそれを行う可能性が高いです。言い換えると、キーのデータを大きくするとコードが失敗する可能性があります。

DataFrameWriterpartitionByをマージして出力することをお勧めします。それが可能しようとしない場合には

df_before_group_by_key.toDF(["key", "value"]).write.partitionBy("key").text("some_file") 

は:

df_before_group_by_key.repartitionAndSortWithinPartitions(...).mapPartitions(write_to_s2) 

def write_to_s3(iterator): 
    ... # 1. check the first key 
    ... # 2. open new file 
    ... # 3. write until you encounter new one 
    ... # 4. upload 
    ... # 5. go back to 2, repeat until iterator is empty 
+0

あなたは私にそれがラインからメモリブローアップ(リスト)を引き起こすことはありません理由を教えてもらえ大きすぎますか? – caden

関連する問題