2017-06-15 9 views
0

私はPySparkを使っていくつかの大規模な処理を行い、いくつかの結果をMongoDBインスタンスに保存しています。私はmongo-spark-connector_2.11-2.0.0.jarを使ってデータフレームをMongoDBに書き出しています。PySparkでMongoDB例外を処理するには?

df.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection").mode("overwrite").save() 

時々私は、このようななどConnectionExceptionMongoCommandException、などの例外を取得します。だから私はそれらの例外を処理したい。だから私はこれらの例外処理スニペットを追加しましたが、私はImportError: No module named com.mongodbを取得します。

try: 
    df.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection").mode("overwrite").save() 
except MongoCommandException: 
    err_code = MongoCommandException.getErrorCode() 
    if err_code == int(16): 
     print "Request size is too large to write to Mongo" 

だから、あなたの誰もが原因PySparkは、Java jarを利用している実行スタックにmongo-spark-connector_2.11-2.0.0.jar

答えて

1

を使用してPySparkで例外を処理する方法を私を助けることができ、何を使っている/見ることは実際にはJavaのですとしょうかん。 これは、PySpark内からcom.mongodbライブラリにアクセスできない理由です。あなたはしかし、何ができるか

py4jライブラリ

from py4j.protocol import Py4JJavaError 

try: 
    df.write.format("com.mongodb.spark.sql.DefaultSource") 
      .option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection") 
      .mode("overwrite").save() 
except Py4JJavaError, ex: 
    print(ex.java_exception.toString()) 
    # analyse error stack and handle as needed. 

から例外をキャッチして処理するために利用可能な例外を参照してくださいMongoDB Java MongoException classのすべての直接のサブクラスを参照してください。

+0

興味深い。ありがとうございました! –