2017-12-14 7 views
1

私のプログラムの流れは次のようなものです:

1.寄木細工のファイルからデータフレームに40億行(〜700GB)のデータを読み込みます。使用されるパーティションサイズは2296

2.クリーニングして25億行を除外します。

3.残りの15億行をパイプラインモデルと訓練モデルを使用して変換します。モデルは、ロジスティック回帰モデルを使用して訓練され、0または1を予測し、データの30%が変換されたデータフレームから除外されます。

4.上記データフレームは約1 TBの別のデータセットと結合左外側される(また、寄木細工ファイルから読み取る。)パーティションのサイズは4000

5.

等約100のMBの別のデータセットと、それに参加しています joined_data = data1.join(broadcast(small_dataset_100MB), data1.field == small_dataset_100MB.field, "left_outer")

6は、上記データフレームは、次いでcols_to_selectリスト内の10列の合計があります〜2000

exploded_data = joined_data.withColumn('field', explode('field_list'))

7集計が行われ

aggregate = exploded_data.groupBy(*cols_to_select)\ .agg(F.countDistinct(exploded_data.field1).alias('distincts'), F.count("*").alias('count_all'))

の要因に分解されます。

8.最後に、アクションaggregate.count()が実行されます。最後のタスクでPysparkジョブが滞っています

問題は、3番目の最後のカウントステージ(200個のタスク)がタスク199で永久に停止することです。 4コアと56エグゼキュータを割り当てたにもかかわらず、その数は1コアと1エグゼキュータを使用してジョブを実行します。私はサイズを40億行から7億行に分解してみました。これは1/6部分で、4時間かかりました。私はこのプロセスを高速化する方法のいくつかの助けに本当に感謝していますありがとう

答えて

0

巨大なデータセットに結合された歪んだデータのため、操作が最終タスクで止まっていました。 2つのデータフレームを結合していた鍵は大きく歪んでいました。この問題は、データフレームから歪んだデータを取り除いて解決されました。偏りのあるデータを含める必要がある場合は、反復ブロードキャスト結合(https://github.com/godatadriven/iterative-broadcast-join)を使用できます。詳細はこの有益なビデオをご覧くださいhttps://www.youtube.com/watch?v=6zg7NTw-kTQ

関連する問題