1

以下の問題について私を導く専門知識をお探しください。pyspark ALSの「recommendProductsForUsers」を適用したときのStackOverflowエラー(300GB以上のRAMが利用可能ですが)

背景:

  • 私はGoogleクラウドDataprocクラスタを使用する展開インフラとしてthis example
  • に触発基本PySparkスクリプトと一緒に行く取得しようとしています。私のコードで
  • 礎石は私が

    • ALSを負う機能「recommendProductsForUsersは」モデル

    問題ですべてのユーザーのトップX製品を私に戻っていますhereを文書化しています。列車のスクリプトは円滑に実行され、GCP(容易に100万人以上の顧客)に対応します。

  • しかし、予測を適用すると、すなわち、関数 'PredictAll'または 'recommendationsProductsForUsers'を使用すると、まったくスケールされません。私のスクリプトは小さなデータセット(< 100 Customer 、< 100個の製品)に対してスムーズに実行されます。しかし、ビジネス関連のサイズにそれを持って来るとき、私は(例えば、> 50K顧客および> 10kの製品)、それを拡張するために管理していない私は、取得

  • エラーは以下の通りです:

    16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager: 
        Lost task 22.0 in stage 411.0 (TID 15139, 
        productrecommendation-high-w-2.c.main-nova-558.internal): 
        java.lang.StackOverflowError 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
         at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
         at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) 
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
         at java.lang.reflect.Method.invoke(Method.java:498) 
         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
         at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    
  • 私はそれを実行しようとするために300 GBクラスター(108 GBの1メインノードと108 GB RAMの2ノード)を取得するまで行った。それは50Kの顧客のために働くが、何のために、より

  • 野望は、私はそれが

    を失敗した> 800K顧客

詳細

コード行を実行することができ、セットアップを持っているではありません

predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2]))) 
pprint.pprint(predictions.take(10)) 
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)]) 
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates() 

どのように進むことをお勧めしますか?私は、スクリプトの最後の部分(つまり、dfToSaveに書き込むとき)の「マージ」の部分がエラーを引き起こすと感じています。この&をバイパスする方法をバイパスする方法はありますか?スタックトレースから

答えて

1

これは物事がANの過程で怠惰な評価されていないときは、深くネストされたオブジェクトで終わるように、基本的には、スパークは、再帰的にRDD系統を表現Spark gives a StackOverflowError when training using ALS

と同じ問題であるように思われます反復的な作業負荷。 sc.setCheckpointDirを呼び出してチェックポイント間隔を調整すると、このRDD系列の長さが緩和されます。

+0

こんにちはデニス、あなたの考えに感謝します。 ALS.trainに実際にチェックポイント間隔のパラメータが設定されているということに私は確信していました。ただし、predictAllやrecommendProductsForUsers関数にはこのパラメータがあります。チェックポイントはどのように機能しますか? –

+0

更新:実装されたチェックポイント機能(チップのおかげでdennis)。ALS.train関数(簡単に100万人以上の顧客)の非常に優れたスケーリングが可能ですが、次のような述語を適用することはできません。 FreditAllまたはrecommendProductsForUsers関数を使用します。これに対する提案はありますか? –

+0

チェックポイントを適用した後、例外がスローされたとき、または変更されたときに 'ObjectInputStream'を含む同じスタックトレースが表示されますか? –

関連する問題