私のプログラムの流れは次のようなものです:
1.寄木細工のファイルからデータフレームに40億行(〜700GB)のデータを読み込みます。使用されるパーティションサイズは2296
2.クリーニングして25億行を除外します。
3.残りの15億行をパイプラインモデルと訓練モデルを使用して変換します。モデルは、ロジスティック回帰モデルを使用して訓練され、0または1を予測し、データの30%が変換されたデータフレームから除外されます。
4.上記データフレームは約1 TBの別のデータセットと結合左外側される(また、寄木細工ファイルから読み取る。)パーティションのサイズは4000
5.
等約100のMBの別のデータセットと、それに参加しています joined_data = data1.join(broadcast(small_dataset_100MB), data1.field == small_dataset_100MB.field, "left_outer")
6は、上記データフレームは、次いでcols_to_select
リスト内の10列の合計があります〜2000 exploded_data = joined_data.withColumn('field', explode('field_list'))
7集計が行われaggregate = exploded_data.groupBy(*cols_to_select)\ .agg(F.countDistinct(exploded_data.field1).alias('distincts'), F.count("*").alias('count_all'))
の要因に分解されます。
8.最後に、アクションaggregate.count()
が実行されます。最後のタスクでPysparkジョブが滞っています
問題は、3番目の最後のカウントステージ(200個のタスク)がタスク199で永久に停止することです。 4コアと56エグゼキュータを割り当てたにもかかわらず、その数は1コアと1エグゼキュータを使用してジョブを実行します。私はサイズを40億行から7億行に分解してみました。これは1/6部分で、4時間かかりました。私はこのプロセスを高速化する方法のいくつかの助けに本当に感謝していますありがとう