2016-08-11 9 views
0

私はSparkで放送変数について学んでいたので、私はそれを利用しようとしました。私はスパークシェル(バージョン1.6.0)を使用しています。続いては、私のコードです:私は、スパーク・シェルを実行し、火花シェルが起動されたときに作成されますデフォルトSparkContext SCを使用するときにうまく働いたオブジェクトは直列化できませんorg.apache.spark.SparkContext

scala> val pageurls = sc.parallelize(List(("www.google.com","Google"),("www.yahoo.com","Yahoo")) 
pageurls: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 
    scala> val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10))) 
    pageCounts: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:27 
    scala> val pageMaps = pageurls.collectAsMap 
    pageMaps: scala.collection.Map[String,String] = Map(www.yahoo.com -> Yahoo, www.google.com -> Google) 
    scala> val bMaps = sc.broadcast(pageMaps) 
    bMaps: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(2) 
    scala> bMaps.value 
    res0: scala.collection.Map[String,String] = Map(www.yahoo.com -> Yahoo, www.google.com -> Google) 
    scala> val newRdd = pageCounts.map{ 
| case (url,count) => (url,bMaps.value(url),count)} 
    newRdd: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[2] at map at <console>:35 
    scala> newRdd.collect 
    res1: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10)) 

コード。しかし、自分のSparkContextを作成して、同じコードシーケンスを実行しようとしました。 org.apache.spark.SparkException:私自身のコンテキストを作成する前に、私はデフォルトでは私はこのようなSparkContextを作成して、放送変数を使用すると、私は次の例外を取得しsc.stop

sc.stop 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
val conf = new SparkConf().setMaster("local").setAppName("MyApp") 
val sc = new SparkContext(conf) 

を使用してSparkContextを作成停止しますタスクによって引き起こさ

直列化可能ではない:java.io.NotSerializableException:org.apache.spark.SparkConf

私はこれらのエラーを取得しないように、なぜそれがそのように起こるんし、私が何をしなければなら何かI?行方不明?

答えて

0

spark-shellを起動すると、spark-shellはsparkcontext [sc]を作成します。 1つのjvmには1つのスパークシェルのみがあります。同じjvmで別のスパークシェルを作成しようとしています。 sparkConfのバージョンがあなたの上にいるようですが、sparkConfは直列化できないクラスの例外をスローしています。この例外の使用を避けるために:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
val conf = new SparkConf() 
conf.setAppName("MyApp") 
conf.set("spark.driver.allowMultipleContexts", "true") 
conf.setMaster("local") 
val sc = new SparkContext(conf) 

参考文献: A] Multiple SparkContext detected in the same JVM

B] https://issues.apache.org/jira/browse/SPARK-2243

編集 解決方法1: は、放送変数のための機能を作成し、シェルからそれを呼び出す:

sc.stop 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
val conf = new SparkConf().setMaster("local").setAppName("MyApp") 
val sc = new SparkContext(conf) 
val pageurls = sc.parallelize(List(("www.google.com","Google"), ("www.yahoo.com","Yahoo"))) 
val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10))) 
val pageMaps = pageurls.collectAsMap 
object Test{ 
def bVar(sc:SparkContext, pageMaps: scala.collection.Map[String, String]) = { 
    val bMaps = sc.broadcast(pageMaps) 
    bMaps.value 
    val newRdd = pageCounts.map{case (url,count) => (url,bMaps.value(url),count)} 
    newRdd.collect 
}} 
val result = Test.bVar(sc, pageMaps) 
result: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10)) 

参照:​​

解決策2:シェルから関数を使用しないと主張する場合は、sparkcontextとsparkconfを一時的に作成してください。

sc.stop 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
@transient val conf = new SparkConf().setMaster("local").setAppName("MyApp") 
@transient val sc = new SparkContext(conf) 
val pageurls = sc.parallelize(List(("www.google.com","Google"), ("www.yahoo.com","Yahoo"))) 
val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10))) 
val pageMaps = pageurls.collectAsMap 
val bMaps = sc.broadcast(pageMaps) 
bMaps.value 
val newRdd = pageCounts.map{case (url,count) => (url,bMaps.value(url),count)} 
newRdd.collect 
res3: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10)) 

参考:Should I leave the variable as transient?

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

+0

私は私自身のコンテキストを作成する前に、私は火花(sc.stop)によって作成されたデフォルトコンテキストを停止することを言及するのを忘れてしまいました。 – user2430771

+0

OKay。既存のsparkconteaxtを停止する前と新しいsparkcontextを作成した後で、println(sc.getConf.toDebugString)の出力をポストしてください。 – hadooper

関連する問題