2016-11-19 2 views
2

foreachPartitionの中でSparkContextとSQLContextを使用したいが、シリアル化エラーのためにできない。 ..私は、両方のオブジェクトが直列化可能でないことを知っているが、私はforeachPartitionがスパークコンテキストとSQLContextの両方が利用可能なマスター、上で実行されていることを考えforeachPartition内でSQLContextとSparkContextを使用する方法

表記:これは私の現在のコードが

`msg -> Map[String,String]` 
`result -> Iterable[Seq[Row]]` 

ある (UtilsDMは、extends Serializableというオブジェクトです)。失敗したコード部分はval schema =...から始まり、をDataFrameに書き込んで、それをParquetに保存します。私がコードを編成した方法が非効率なのかもしれませんが、私はここにあなたの推薦をしたいと思います。ありがとう。

// Here I am creating df from parquet file on S3 
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages)) 
var df: DataFrame = null 
if (exists) { 
    df = sqlContext 
    .read.parquet("s3n://bucket/pathToParquetFile") 
} 
UtilsDM.setDF(df) 

// Here I process myDStream 
myDStream.foreachRDD(rdd => { 
    rdd.foreachPartition{iter => 
    val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort) 
    val producer = UtilsDM.createProducer 
    var df = UtilsDM.getDF 
    val result = iter.map{ msg => 
     // ... 
     Seq(msg("key"),msg("value")) 
    } 

    // HERE I WANT TO WRITE result TO S3, BUT IT FAILS 
    val schema = StructType(
        StructField("key", StringType, true) :: 
        StructField("value", StringType, true) 

    result.foreach { row => 
     val rdd = sc.makeRDD(row) 
     val df2 = sqlContext.createDataFrame(rdd, schema) 

     // If the parquet file is not created, then create it 
     var df_final: DataFrame = null 
     if (df != null) { 
      df_final = df.unionAll(df2) 
     } else { 
      df_final = df2 
     } 
     df_final.write.parquet("s3n://bucket/pathToSentMessages) 
} 
    } 
}) 

EDIT:

私はスパーク1.6.2とScalaの2.10.6を使用しています。

+0

を動作することがわかりましたか? – mrsrinivas

+0

@MRSrinivas:Spark 1.6.2とScala 2.10.6を使用しています。それを言いたくて申し訳ありません。 – duckertito

答えて

2

これはできません。 SparkContext,SQLContextおよびSparkSessionは、ドライバでのみ使用できます。あなたはforeachRDDのトップレベルでsqlContextを使用することができます。

myDStream.foreachRDD(rdd => { 
    val df = sqlContext.createDataFrame(rdd, schema) 
    ... 
}) 

あなたは、変換/アクションでそれを使用することはできません。

myDStream.foreachRDD(rdd => { 
    rdd.foreach { 
     val df = sqlContext.createDataFrame(...) 
     ... 
    } 
}) 

あなたはおそらく相当します:

myDStream.foreachRDD(rdd => { 
    val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter)) 
    val df = sqlContext.createDataFrame(foo, schema) 
    df.write.parquet("s3n://bucket/pathToSentMessages) 
}) 
+0

ありがとう、ありがとう。つまり、 'foreachPartition'の代わりに' resultPartitions'を返すために 'mapPartitions'を使うべきですか?それをどうやって行うのか教えてください。 – duckertito

+0

このようなはずですか?それは私のためにコンパイルされません: 'myDStream.foreachRDD(rdd => {val finalResult = rdd.mapPartitions(iter => val r = new RedisClient(UtilsDM.getHost、UtilsDM.getPort)val result = iter.map { > ...}))}) ' – duckertito

+0

私はあなたのコードを完全に理解していませんが、' RDD.foreach'、 'RDD.map'、' RDD.mapPartitions'などの中でsqlContextを使うことはできません。 –

0

私は、ループ内の既存のSparkContextを(私は事前にsparkContext SCを作成していると仮定)を使用すると、その火花のバージョンを使用している、すなわち

// this works 
stream.foreachRDD(_ => { 
    // update rdd 
    .... = SparkContext.getOrCreate().parallelize(...) 
}) 

// this doesn't work - throws a SparkContext not serializable error 
stream.foreachRDD(_ => { 
    // update rdd 
    .... = sc.parallelize(...) 
}) 
関連する問題