Mesosクラスタで実行されているSpark 2.1.1ジョブがあります。 Spark UIは32人の実行中のエグゼキュータを表示し、RDD.getNumPartitionsは28個のパーティションを表示しています。 しかし、1人(ランダム)のエグゼキュータだけが仕事をしており、他のすべては完了しているとマークされています。 executorコード(stdout)にdebugステートメントを追加し、それらのステートメントを実行しているエグゼキュータは1人だけです。パイプライン全体は次のように構成されています: idsのリストを取得 - >各IDのJSONデータをダウンロード - > JSONデータを解析 - > S3に保存sparkが単一のエグゼキュータ上で複数のパーティションを持つジョブを実行しています
stage 1: val ids=session.sparkContext.textFile(path).repartition(28) -> RDD[String]
//ids.getNumPartitions shows 28
stage 2: val json=ids.mapPartitions { keys =>
val urlBuilder ...
val buffer ....
keys map { key =>
val url=urlBuilder.createUrl(id) //java.net.URL
val json=url.openStream() ... //download text to buffer, close stream
(id,json.toString)
}
} -> RDD[Tuple2[String,String]]
stage 3: val output = json flatMap { t =>
val values = ... //parse JSON, get values from JSON or empty sequence if not found
values map { value => (t._1, value) }
} -> RDD[Tuple2[String,String]]
stage 4: output.saveAsTextFile("s3://...")
これらスパークバイナリ用のコンフィグ設定されている: --driverメモリ32グラム--conf spark.driver.cores = 4 --executorメモリの4G --conf spark.cores.max = 128 - conf spark.executor.cores = 4
1つのエグゼキュータだけで実行されているステージは2番目のエグゼクティブです。 手順1で明示的にパーティション数(repartition(28))を指定しました。 誰もこのような行動を見たことがありますか? おかげで、
M
は、私は他の方法(トラヴィスからの提案を参照)を行って、働いていた100に(ステップ1の後に)パーティションの数を増加させた溶液を終えた仕事数分でしかし、副作用があります - 今ではS3の部分ファイルが100個あります。
ジョブを開始するために使用するコードとコマンドを提供できますか? –