0

私は約40列の浮動小数点数を持つ5,000万行の大きなデータセットを持っています。カスタム変換の理由"java.lang.OutOfMemoryError:pyspark collect_list()の実行中に要求された配列のサイズがVMの制限を超えています"

、私は次の擬似コード使用して、pysparkcollect_list()機能を使用して、列ごとに、すべての浮動小数点値を収集しようとしています:列ごとに

for column in columns: 
    set_values(column, df.select(collect_list(column)).first()[0]) 

を、それがcollect_list()機能やセットを実行しますその値を他の内部構造に変換します。

8コアと64GB RAMの2つのホストで、ホストごとに1つのエグゼキュータに最大30 GBと6コアを割り当てる前述のスタンドアロンクラスタを実行しています。実行中に次の例外が発生しています。収集された配列のサイズと関係があります。

java.lang.OutOfMemoryError: Requested array size exceeds VM limit

私はより多くのメモリ、パーティション番号、並列処理、でも、Javaオプション、まだ運を割り当てるなど、spark-defaults.confで複数の設定を試してみました。

collect_list()は、エグゼクティブ/ドライバのリソースに深く関連しているか、またはこれと関係がありません。

この問題を解決するために使用できる設定はありますか?それ以外の場合はcollect()機能を使用する必要がありますか?

答えて

0

collect_listあなたのケースではcollectを呼び出すだけではありません。どちらも大規模なデータセットに対しては非常に悪い考えです。実用性はほとんどありません。

両方ともレコード数に比例した量のメモリを必要とし、collect_listはシャッフルのオーバーヘッドを追加するだけです。

つまり、選択肢がなく、ローカル構造が必要な場合は、selectcollectを使用し、ドライバのメモリを増やしてください。

df.select(column).rdd.map(lambda x: x[0]).collect() 
+0

これらの2つのいずれもなくても、1列あたりのすべての値を収集するための特別な提案はありますか? – geopet

関連する問題