2017-07-28 4 views
0

文字列サンプラーを作成するには、rddの文字列を辞書として使用し、クラスRandomDataGeneratorをorg.apache.spark.mllib.randomパッケージから作成しています。SparkException:このRDDにSparkContextがありません

import org.apache.spark.mllib.random.RandomDataGenerator 
import org.apache.spark.rdd.RDD 
import scala.util.Random 

class StringSampler(var dic: RDD[String], var seed: Long = System.nanoTime) extends RandomDataGenerator[String] { 
    require(dic != null, "Dictionary cannot be null") 
    require(!dic.isEmpty, "Dictionary must contains lines (words)") 
    Random.setSeed(seed) 

    var fraction: Long = 1/dic.count() 

    //return a random line from dictionary 
    override def nextValue(): String = dic.sample(withReplacement = true, fraction).take(1)(0) 

    override def setSeed(newSeed: Long): Unit = Random.setSeed(newSeed) 

    override def copy(): StringSampler = new StringSampler(dic) 

    def setDictionary(newDic: RDD[String]): Unit = { 
     require(newDic != null, "Dictionary cannot be null") 
     require(!newDic.isEmpty, "Dictionary must contains lines (words)") 
     dic = newDic 
     fraction = 1/dic.count() 
    } 
} 

val dictionaryName: String 
val dictionaries: Broadcast[Map[String, RDD[String]]] 
val dictionary: RDD[String] = dictionaries.value(dictionaryName) // dictionary.cache() 
val sampler = new StringSampler(dictionary) 
RandomRDDs.randomRDD(context, sampler, size, numPartitions) 

しかし、私は、文字列のランダムRDDを生成しようとすると、辞書がSparkContextの欠けていると言ってSparkExceptionが発生しました。クラスタノードにコピーするときにsparkが辞書rddのコンテキストを失っているように見えますが、それを修正する方法はわかりません。

StringSamplerに渡す前に辞書をキャッシュしようとしましたが、何も変更されませんでした。 元のSparkContextにリンクすることを考えていましたが、可能かどうかわかりません。誰でもアイデアはありますか?

Caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. 

答えて

0

私は、問題はここにあると信じて:

val dictionaries: Broadcast[Map[String, RDD[String]]] 
val dictionary: RDD[String] = dictionaries.value(dictionaryName) 

あなたはRDDを含むものを放送するべきではありません。 RDDはすでに並列化され、クラスタ全体に分散されています。このエラーは、RDDをシリアライズしてデシリアライズしようとしたときに発生します.RDDはコンテキストを失い、とにかく無意味です。

ただ、この操作を行います。

val dictionaries: Map[String, RDD[String]] 
val dictionary: RDD[String] = dictionaries(dictionaryName) 
+0

だから、クラスタ上で、私の辞書を配布するための最良のオプションは何ですか? [String、RDD [String]]、ブロードキャスト[[String、Array [String]]]または別のものですか? – belgacea

+0

少数の辞書(例えば、数百以下)を持っているなら、 'Map [String、RDD [String]]'は大丈夫です。マップキーとディクショナリへの参照のオーバーヘッドはドライバにありますが、ディクショナリの内容はクラスタの周りに分散されます。 それ以上の辞書があれば、おそらく 'RDD [(String、String)]'であり、各タプルは(辞書名、辞書エントリ)の対です。 –

+0

ありがとうございますが、残念ながら私はまだ 'Map [String、RDD [String]]を使って同じ問題を抱えています。 なぜ私はRDDコンテキストを失ってスパークしているのか分かりません。原因は何か? NB:辞書をスカラマップ関数に渡します。 – belgacea

関連する問題