2017-07-31 6 views
-1
def map[U: ClassTag](f: T => U): RDD[U] = withScope { 
    val cleanF = sc.clean(f) 
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) 
    } 

このコードスニペットは、spark 2.2ソースコードからのものです。私はスカラでは専門家ではないので、誰でもこのコードをプログラム上の観点から説明できますか?マップの後に角括弧が何をしているのか分かりません。また、https://www.tutorialspoint.com/scala/scala_functions.htmを参照すると、スカラ関数は '='の後に波カッコを入れる必要がありますが、このコードスニペットで '='記号の後にwithScopeという名前の関数があるのはなぜですか?スカラのマップ機能を理解するための説明が必要

答えて

1

実際には、「=」の後ろにスカラ関数には括弧は使用できません。例えば。

def func():Int = 1

だからwithScope {}は戻り型RDD [U]との関数であり、map関数はwithScope機能を実行することであると考えることができます。

のはwithScopeのソースコードを見てみましょう:

private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body) 

は、それはここで機能だ、参照してください。のは、上で行ってみましょう:

private[spark] def withScope[T](
    sc: SparkContext, 
    name: String, 
    allowNesting: Boolean, 
    ignoreParent: Boolean)(body: => T): T = { 
    // Save the old scope to restore it later 
    val scopeKey = SparkContext.RDD_SCOPE_KEY 
    val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY 
    val oldScopeJson = sc.getLocalProperty(scopeKey) 
    val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) 
    val oldNoOverride = sc.getLocalProperty(noOverrideKey) 
    try { 
    if (ignoreParent) { 
     // Ignore all parent settings and scopes and start afresh with our own root scope 
     sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson) 
    } else if (sc.getLocalProperty(noOverrideKey) == null) { 
     // Otherwise, set the scope only if the higher level caller allows us to do so 
     sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) 
    } 
    // Optionally disallow the child body to override our scope 
    if (!allowNesting) { 
     sc.setLocalProperty(noOverrideKey, "true") 
    } 
    body 
    } finally { 
    // Remember to restore any state that was modified before exiting 
    sc.setLocalProperty(scopeKey, oldScopeJson) 
    sc.setLocalProperty(noOverrideKey, oldNoOverride) 
    } 
} 

最後に、それはこの場合INT、bodyパラメータを実行し、体が

{ 
    val cleanF = sc.clean(f) 
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) 
} 

に等しい:

private[spark] def withScope[T](
    sc: SparkContext, 
    allowNesting: Boolean = false)(body: => T): T = { 
    val ourMethodName = "withScope" 
    val callerMethodName = Thread.currentThread.getStackTrace() 
    .dropWhile(_.getMethodName != ourMethodName) 
    .find(_.getMethodName != ourMethodName) 
    .map(_.getMethodName) 
    .getOrElse { 
     // Log a warning just in case, but this should almost certainly never happen 
     logWarning("No valid method name for this RDD operation scope!") 
     "N/A" 
    } 
    withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body) 
} 

は終わりwithScopeを続けてみましょう結論として、withScopeはClosureであり、引数として関数を取り、最初にコード自体を実行し、引数を実行します。

関連する問題