私はスパークから奇妙な行動に直面しています。ここに私のコードがあります:私の直感に対するスパークは仕事を複製しています
object MyJob {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val sqlContext = new hive.HiveContext(sc)
val query = "<Some Hive Query>"
val rawData = sqlContext.sql(query).cache()
val aggregatedData = rawData.groupBy("group_key")
.agg(
max("col1").as("max"),
min("col2").as("min")
)
val redisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))
aggregatedData.foreachPartition {
rows =>
writePartitionToRedis(rows, redisConfig)
}
aggregatedData.write.parquet(s"/data/output.parquet")
}
}
スパークスケジューラは、各データシンクのための2つのジョブ(Redisの、HDFS /寄せ木)が得られます。問題は、ハイブのクエリを実行して作業を倍増させている2番目のジョブです。私は両方の書き込み操作がaggregatedData
ステージのデータを共有すると仮定しました。何かが間違っているか、それとも期待される動作ですか?
私はキャッシュしていますが、 'rawData'ステップです。そして、私はまだ2番目の仕事でこのステップのいくつかのタスクを参照してください。 2番目のジョブがキャッシュからステージデータを取得しており、不要な作業をしていないことを確認するにはどうすればよいですか? –
はい、あなたは生の生データですが、集計されたデータではありませんので、すべての反復で再評価する必要があります。あなたがスパークアプリケーションのUIに入ると、アプリケーションが行っていることに関するすべてのデータが表示されます。キャッシュするデータがメモリに収まっていることを確認してください。そうでない場合は、 'persist'と適切なストレージレベルを使用して、ディスクにシリアライズまたはキャッシュしてみてください。 – puhlen