2017-09-08 5 views
0

Mesosクラスタで実行されているSpark 2.1.1ジョブがあります。 Spark UIは32人の実行中のエグゼキュータを表示し、RDD.getNumPartitionsは28個のパーティションを表示しています。 しかし、1人(ランダム)のエグゼキュータだけが仕事をしており、他のすべては完了しているとマークされています。 executorコード(stdout)にdebugステートメントを追加し、それらのステートメントを実行しているエグゼキュータは1人だけです。パイプライン全体は次のように構成されています: idsのリストを取得 - >各IDのJSONデータをダウンロード - > JSONデータを解析 - > S3に保存sparkが単一のエグゼキュータ上で複数のパーティションを持つジョブを実行しています

stage 1: val ids=session.sparkContext.textFile(path).repartition(28) -> RDD[String] 

//ids.getNumPartitions shows 28 
stage 2: val json=ids.mapPartitions { keys => 
    val urlBuilder ... 
    val buffer .... 
    keys map { key => 
    val url=urlBuilder.createUrl(id) //java.net.URL 
    val json=url.openStream() ... //download text to buffer, close stream 
    (id,json.toString) 
    } 
} -> RDD[Tuple2[String,String]] 

stage 3: val output = json flatMap { t => 
    val values = ... //parse JSON, get values from JSON or empty sequence if not found 
    values map { value => (t._1, value) } 
} -> RDD[Tuple2[String,String]] 

stage 4: output.saveAsTextFile("s3://...") 

これらスパークバイナリ用のコンフィグ設定されている: --driverメモリ32グラム--conf spark.driver.cores = 4 --executorメモリの4G --conf spark.cores.max = 128 - conf spark.executor.cores = 4

1つのエグゼキュータだけで実行されているステージは2番目のエグゼクティブです。 手順1で明示的にパーティション数(repartition(28))を指定しました。 誰もこのような行動を見たことがありますか? おかげで、

M

は、私は他の方法(トラヴィスからの提案を参照)を行って、働いていた100に(ステップ1の後に)パーティションの数を増加させた溶液を終えた仕事数分でしかし、副作用があります - 今ではS3の部分ファイルが100個あります。

+0

ジョブを開始するために使用するコードとコマンドを提供できますか? –

答えて

0

「IDのリストを取得する」の後に.repartition()ステージが行われていることを確認してください。

最初に28個のパーティションを持つ空のセットを生成してから、そのIDのリストを1つのパーティションに取得しているようです。提供

EDITは、サンプルコードの後:

は、各タスクが(すなわち、数秒以内)すぐに完了していることは可能ですか?私は、たとえ何千もの未解決のタスクがあっても、短い時間でタスクが完了したときにエグゼキュータをアイドル状態にするようにタスクをスケジュールしないことを目の当たりにしています。そうであれば、各タスクを少し長くするために必要なパーティション数は少なくて済むかもしれません。時には、タスク・スケジューラーを起動して、エグゼキュータをアイドル状態にするためにさらに多くのタスクをスケジュールするのに十分です。

+0

私は別の方法を行って、パーティションの数を(ステップ1の後で)100に増やしました。それはうまくいって、仕事は数分で完了しました。しかし、副作用があります - 今ではS3の部分ファイルが100個あります。 – user7606438

関連する問題