私はKafkaとPythonで構造化ストリーミングを開始しようとしています。 要件:SparkでKafka(JSON形式)のストリーミングデータを処理し(変換を実行する)、データベースに格納する必要があります。Pythonを使ったSpark構造化ストリーム
私は
data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
、同じようカフカから読み取るためspark.readStream
を使用する予定ですJSON形式のデータのように、 {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}
を持っている私は、参考のためにthis linkを呼ばなく、解析する方法を取得していませんJSONデータ。私はこれを試しました、
data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)]
しかし、それは動作しないように見えます。
spark構造化ストリーミングでPythonで作業したことがある人は、サンプルの例やリンクを続けることができますか?
、使用
schema = StructType([
StructField("a", DoubleType()),
StructField("b", DoubleType()),
StructField("name", StringType()),
StructField("time", TimestampType())])
inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
data = inData.select(from_json(col("value").cast("string"), schema))
query = data.writeStream.outputMode("Append").format("console").start()
プログラムが実行されますが、
+-----------------------------------+
|jsontostruct(CAST(value AS STRING))|
+-----------------------------------+
| [null,null,null,2...|
| [null,null,null,2...|
+-----------------------------------+
17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: {
"id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e",
"runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b",
"name" : null,
"timestamp" : "2017-04-07T19:23:15.013Z",
"numInputRows" : 2,
"inputRowsPerSecond" : 125.0,
"processedRowsPerSecond" : 12.269938650306749,
"durationMs" : {
"addBatch" : 112,
"getBatch" : 8,
"getOffset" : 2,
"queryPlanning" : 4,
"triggerExecution" : 163,
"walCommit" : 26
},
"eventTime" : {
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test]]",
"startOffset" : {
"test" : {
"0" : 366
}
},
"endOffset" : {
"test" : {
"0" : 368
}
},
"numInputRows" : 2,
"inputRowsPerSecond" : 125.0,
"processedRowsPerSecond" : 12.269938650306749
} ],
"sink" : {
"description" : "[email protected]"
}
}
が、私はここで何かを見逃していました、と私は、コンソール上の値を取得しています。