foreachPartition
の中でSparkContextとSQLContextを使用したいが、シリアル化エラーのためにできない。 ..私は、両方のオブジェクトが直列化可能でないことを知っているが、私はforeachPartition
がスパークコンテキストとSQLContextの両方が利用可能なマスター、上で実行されていることを考えforeachPartition内でSQLContextとSparkContextを使用する方法
表記:これは私の現在のコードが
`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`
ある (UtilsDMは、extends Serializable
というオブジェクトです)。失敗したコード部分はval schema =...
から始まり、をDataFrame
に書き込んで、それをParquetに保存します。私がコードを編成した方法が非効率なのかもしれませんが、私はここにあなたの推薦をしたいと思います。ありがとう。
// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)
// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
val producer = UtilsDM.createProducer
var df = UtilsDM.getDF
val result = iter.map{ msg =>
// ...
Seq(msg("key"),msg("value"))
}
// HERE I WANT TO WRITE result TO S3, BUT IT FAILS
val schema = StructType(
StructField("key", StringType, true) ::
StructField("value", StringType, true)
result.foreach { row =>
val rdd = sc.makeRDD(row)
val df2 = sqlContext.createDataFrame(rdd, schema)
// If the parquet file is not created, then create it
var df_final: DataFrame = null
if (df != null) {
df_final = df.unionAll(df2)
} else {
df_final = df2
}
df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
}
})
EDIT:
私はスパーク1.6.2とScalaの2.10.6を使用しています。
を動作することがわかりましたか? – mrsrinivas
@MRSrinivas:Spark 1.6.2とScala 2.10.6を使用しています。それを言いたくて申し訳ありません。 – duckertito