2017-05-19 11 views
0

Spark構造化ストリーミング内の複数のパーティションに対してJSONをどのように構築しますか。以下の例は、ここでは1つのパーティションに対して貼り付けたものです。あなたの助けに感謝。トピック内の複数のパーティションに対するSpark Structured Streaming

spark.readStream().format("kafka") 
     .option("kafka.bootstrap.servers", "****") 
     .option("subscribePattern", "****.*") 
     .option("startingOffsets", "{\"Topic01\": {\"0\":250, \"1\": -1}}").load(); 

答えて

0

お気に入りのJSONライブラリを使用して文字列を作成できます。ここにjson4sの例があります:

scala> import org.json4s.jackson.Serialization 
import org.json4s.jackson.Serialization 

scala> import org.json4s.NoTypeHints 
import org.json4s.NoTypeHints 

scala> implicit val formats = Serialization.formats(NoTypeHints) 
formats: org.json4s.Formats{val dateFormat: org.json4s.DateFormat; val typeHints: org.json4s.TypeHints} = [email protected] 

scala> val offsets = Map("topic1" -> Map("0" -> 1, "1" -> -1, "2" -> -2), "topic2" -> Map("0" -> 0, "1" -> -1)) 
offsets: scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]] = Map(topic1 -> Map(0 -> 1, 1 -> -1, 2 -> -2), topic2 -> Map(0 -> 0, 1 -> -1)) 

scala> Serialization.write(offsets) 
res0: String = {"topic1":{"0":1,"1":-1,"2":-2},"topic2":{"0":0,"1":-1}} 
関連する問題