2016-01-15 6 views
15

foreachRDDに入力するたびに mongodb RDDを作成します。しかし私は、シリアル化の問題があります。スパークストリーミング:foreachRDD私のmongo RDDを更新

mydstream 
    .foreachRDD(rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     // ssc is my StreamingContext 
     val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }) 

これは私にエラーを与える:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 

任意のアイデア?

+0

'SparkContext'はシリアライズ可能ではないため、変換メソッドやアクションメソッド内では使用できません。ドライバクラスでのみ使用する必要があります。 – Shankar

+0

foreachRDDメソッドの中でリストをrddに変換する理由は何ですか? – Shankar

答えて

7

あなたはSparkContextまたはSparkStreamingContext(RDDがDSTREAMの場合)のいずれかを返すrdd.contextを使用しようとするかもしれません。

mydstream foreachRDD { rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) }) 

実際には、RDDにも.sparkContextメソッドがあるようです。私は正直なところ違いを知らないのでしょうか、別名かもしれません(?)。

2

「直列化できない」オブジェクトがある場合は、foreachPartitionに渡す必要がありますので、処理を実行する前に各ノードのデータベースに接続する必要があります。

mydstream.foreachRDD(rdd => { 
     rdd.foreachPartition{ 
      val mongoClient = MongoClient("localhost", 27017) 
      val db = mongoClient(mongoDatabase) 
      val coll = db(mongoCollection) 
      // ssc is my StreamingContext 
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }}) 
+0

これは動作しません。なぜなら、sscは直列化できないからです。 –

+0

rss.foreachPartitionの前にforeachRDD内でsscを作成することができます。 'val ssc = StreamingContext.getOrCreate(checkpointdirectory、functionToCreateContext _)' – Rami

関連する問題