2017-10-26 14 views
4

私は、タスクのシリアル化がSparkでどのように機能するのかを理解しようとしていますが、私が書いたテストでは混乱している結果が少しあります。Apache Spark RDDタスクのシリアル化の理解

私は複数のノード上で、以下のん(ポストのために簡略化され)、いくつかのテストコードがあります:私はrun()を実行すると、「タスク直列化可能ではない」を除いて、アウトジョブ爆弾、

object TestJob { 
    def run(): Unit = { 
    val rdd = ... 
    val helperObject = new Helper() // Helper does NOT impl Serializable and is a vanilla class 
    rdd.map(element => { 
     helperObject.transform(element) 
    }).collect() 
    } 
} 

helperObjectはシリアライズ可能ではないため、期待通りです。しかし、私が少し変更すると、次のようになります。

trait HelperComponent { 
    val helperObject = new Helper() 
} 

object TestJob extends HelperComponent { 
    def run(): Unit = { 
    val rdd = ... 
    rdd.map(element => { 
     helperObject.transform(element) 
    }).collect() 
    } 
} 

何らかの理由でジョブが正常に実行されます。なぜ誰かがこのことが理解できるように助けてくれますか? Sparkによって正確にシリアル化され、上記の各ケースでワーカーに送信されるものは何ですか?

私はSparkバージョン2.1.1を使用しています。

ありがとうございました!

答えて

2

これがなぜ起こったのか理解してくれる人がいますか?

最初のスニペットでは、helperObjectrunの中で宣言されたローカル変数です。このように、このコードが実行される場所であれば、すべての情報が利用可能になり、そのためにSparks ClosureCleanerがシリアル化しようとしてあなたに叫ぶように、関数によって閉じられます(持ち上げられます)。

2番目のスニペットでは、値はメソッドスコープのローカル変数ではなく、クラスインスタンスの一部です(技術的にはこれはオブジェクト宣言ですが、結局はJVMクラスに変換されます)。 )。

クラスタ内のすべてのワーカーノードにコードを実行するために必要なJARが含まれているため、これはSparkで意味があります。したがって、のためにTestObjectの全体をシリアライズする代わりに、SparkがあなたのワーカーのExecutorプロセスをスピンアップするとき、ClassLoaderを介してローカルにTestObjectをロードし、それ以外のJVMクラスと同じようにインスタンスを作成します分散アプリケーション。

結論として、あなたがタイプインスタンスを宣言した方法の変更のために、クラスがもはやシリアル化されていないため、この爆発が見られない理由があります。

+0

徹底的な返信をありがとう、それは今、多くの意味があります。さらに明確にするために、エグゼクティブが実行する各タスクの大きさや大きさを理解する手助けができますか? 「タスク」という用語は、1つまたは複数のRDD操作(マップ、フィルタなど)を参照するのか、データセットのサブセットですべてのRDD操作(つまり完全な「パイプライン」)を実行することを指していますか? – simonl

+1

@simonlタスクは実行の最小単位です。一般に、 'map'、' filter'などの各操作について、単一のタスクとして考えることができます。しかし実際には、Sparkはこれらの変換を単一のタスク実行に最適化します。スパークはどのようなタスクを一緒に「絡み合わせる」ことができるのかこれは通常、「狭い変換」と呼ばれるもので、「RDD」を返す基本的に怠惰な関数であり、データのシャッフルによる変換である「ワイド変換」に達するまで実行されます。 –

+0

@simonl前のコメントが理にかなっていて、〜500文字のすべてを説明するのは難しいと思います。 –

関連する問題