2016-12-22 6 views
1

私はスパークから奇妙な行動に直面しています。ここに私のコードがあります:私の直感に対するスパークは仕事を複製しています

object MyJob { 
    def main(args: Array[String]): Unit = { 
     val sc = new SparkContext() 
     val sqlContext = new hive.HiveContext(sc) 

     val query = "<Some Hive Query>" 
     val rawData = sqlContext.sql(query).cache() 

     val aggregatedData = rawData.groupBy("group_key") 
      .agg(
       max("col1").as("max"), 
       min("col2").as("min") 
      ) 

     val redisConfig = new RedisConfig(new RedisEndpoint(sc.getConf)) 
     aggregatedData.foreachPartition { 
      rows => 
       writePartitionToRedis(rows, redisConfig) 
     } 

     aggregatedData.write.parquet(s"/data/output.parquet") 
    } 
} 

スパークスケジューラは、各データシンクのための2つのジョブ(Redisの、HDFS /寄せ木)が得られます。問題は、ハイブのクエリを実行して作業を倍増させている2番目のジョブです。私は両方の書き込み操作がaggregatedDataステージのデータを共有すると仮定しました。何かが間違っているか、それとも期待される動作ですか?

答えて

0

spark:Lazynessという基本的な概念が欠落しています。

RDDにはデータが含まれていません。これはすべて、アクションを呼び出すときに実行される一連の命令です(ディスク/ hdfsにデータを書き込むようなものです)。 RDD(またはデータフレーム)を再利用すると、保存されたデータはなく、アクションを呼び出すたびに評価される必要がある指示を保存するだけです。

RDDを再評価せずにデータを再利用する場合は、.cache()、好ましくはpersistを使用します。 RDDを維持すると、変換の結果を保存することができ、RDDを将来の反復で再評価する必要はありません。

+0

私はキャッシュしていますが、 'rawData'ステップです。そして、私はまだ2番目の仕事でこのステップのいくつかのタスクを参照してください。 2番目のジョブがキャッシュからステージデータを取得しており、不要な作業をしていないことを確認するにはどうすればよいですか? –

+0

はい、あなたは生の生データですが、集計されたデータではありませんので、すべての反復で再評価する必要があります。あなたがスパークアプリケーションのUIに入ると、アプリケーションが行っていることに関するすべてのデータが表示されます。キャッシュするデータがメモリに収まっていることを確認してください。そうでない場合は、 'persist'と適切なストレージレベルを使用して、ディスクにシリアライズまたはキャッシュしてみてください。 – puhlen

関連する問題