2016-11-28 9 views
3

私は、200次元を基準にして、非常に重要な個々の用語(〜100k)を持つWord2Vecモデルをトレーニングしています。Spark Word2VecModelが最大保存RPCサイズを超えています

スパークの典型的なW2Vモデル化は、現在、各単語のベクトルを中心に構成されたメモリ使用量を加算します。つまり、numberOfDimensions*sizeof(float)*numberOfWordsです。数学を行うには、上記の大きさの順序は、100MBの、与えるか、または取る。
まだ私のトークナイザで作業していて、最適なベクトルサイズのためにまだベンチングしていると考えると、75k-150k単語と100〜300ディメンションの辞書で計算していますので、 。

これで、このモデルを保存するまですべてが正常です。現在スパークでこのように実現される:

override protected def saveImpl(path: String): Unit = { 
    DefaultParamsWriter.saveMetadata(instance, path, sc) 
    val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq) 
    val dataPath = new Path(path, "data").toString 
    sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) 
} 

すなわち:1行のデータフレームが作成され、すべてのベクトルの配列に大きなF(L)を含む行。データフレームは寄木細工として保存されます。それは大丈夫です...あなたがそれをエグゼクターに発送しなければならない限り。クラスタモードで実行します。

これは、ジョブを爆破してしまうので、のようなスタックトレースで:再現する

16/11/28 11:29:00 INFO scheduler.DAGScheduler: Job 3 failed: parquet at Word2Vec.scala:311, took 5,208453 s 
16/11/28 11:29:00 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: 
    Serialized task 32:5 was 204136673 bytes, 
    which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). 
    Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 

シンプルコード(あなたが火花シェルはできません、それをローカルに、しかし、あなたはクラスタにそれを出荷する必要があります) :

object TestW2V { 

def main(args: Array[String]): Unit = { 
    val spark = SparkSession.builder().appName("TestW2V").getOrCreate() 
    import spark.implicits._ 

    // Alphabet 
    val randomChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTYVWXTZ".toCharArray 
    val random = new java.util.Random() 

    // Dictionnary 
    def makeWord(wordLength: Int): String = new String((0 until wordLength).map(_ => randomChars(random.nextInt(randomChars.length))).toArray) 
    val randomWords = for (wordIndex <- 0 to 100000) // Make approx 100 thousand distinct words 
        yield makeWord(random.nextInt(10)+5) 

    // Corpus (make it fairly non trivial) 
    def makeSentence(numberOfWords: Int): Seq[String] = (0 until numberOfWords).map(_ => randomWords(random.nextInt(randomWords.length))) 
    val allWordsDummySentence = randomWords // all words at least once 
    val randomSentences = for (sentenceIndex <- 0 to 100000) 
         yield makeSentence(random.nextInt(10) +5) 
    val corpus: Seq[Seq[String]] = allWordsDummySentence +: randomSentences 

    // Train a W2V model on the corpus 
    val df = spark.createDataFrame(corpus.map(Tuple1.apply)) 
    import org.apache.spark.ml.feature.Word2Vec 
    val w2v = new Word2Vec().setVectorSize(250).setMinCount(1).setInputCol("_1").setNumPartitions(4) 
    val w2vModel = w2v.fit(df) 
    w2vModel.save("/home/Documents/w2v") 

    spark.stop 
} 
} 

は今...私ははこれがなぜ起こるかを理解するために、私は推測し、内部は十分理解しています。質問は次のとおりです。

  • 私はこの権利をしています(正しい私のAPIの使用法とは?)
  • 私はそれを回避することができますか? spark.mllib.feature.Word2VecModel( "非推奨"のRDDベースの1.xバージョン)は、私自身が適切にパーティション化されたセーブ/ロード実装をローリングすることによって手動で作業できるパブリックコンストラクタを持っています。しかし、新しいspark.ml.feature.Word2VecModelは、私が見ることができる公共のコンストラクタを提供していません。
  • 火花提供者がこのように来た場合:これはバグ/可能性のある改善と考えられますか?このJIRA固定スパークチーム考慮

https://issues.apache.org/jira/browse/SPARK-11994、(1.xのAPIのためです)、私は、彼らが2.0 APIのダブルチェックをした、と私は間違っている:-)何かをやっていると思います。

ローカルモードで実行でき、最終的なタスクのシリアライズを避けることができますが、これは生産レベル(データのアクセシビリティなど)では不可能な一時的な解決策です。または、RPCのサイズを512MBにクラックアップしてください。

PS:上記はSpark 2.0.1およびスパークスタンドアロンクラスタ(ローカルモードでは再生できません)で発生します。
私は通常、この種のメッセージをユーザーメーリングリストに投稿しますが、Spark encourages the use of SOと表示されます...

答えて

0

私はまったく同じ経験をしています。ローカルではうまく動作しますが、クラスタモードでは、提案したようにRPCサイズを512MBにする必要はありません。

つまり、spark.rpc.message.maxSize=512を通過すると私が取得します。

また、保存の実装が疑わしいと思われる場合は、特にrepartition(1)ビットであることに同意します。

関連する問題