0

私はKafkaとPythonで構造化ストリーミングを開始しようとしています。 要件:SparkでKafka(JSON形式)のストリーミングデータを処理し(変換を実行する)、データベースに格納する必要があります。Pythonを使ったSpark構造化ストリーム

私は

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() 

、同じようカフカから読み取るためspark.readStreamを使用する予定ですJSON形式のデータのように、 {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

を持っている私は、参考のためにthis linkを呼ばなく、解析する方法を取得していませんJSONデータ。私はこれを試しました、

data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)] 

しかし、それは動作しないように見えます。

spark構造化ストリーミングでPythonで作業したことがある人は、サンプルの例やリンクを続けることができますか?

、使用

schema = StructType([ 
    StructField("a", DoubleType()), 
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())]) 

inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() 
data = inData.select(from_json(col("value").cast("string"), schema)) 
query = data.writeStream.outputMode("Append").format("console").start() 

プログラムが実行されますが、

+-----------------------------------+ 
|jsontostruct(CAST(value AS STRING))| 
+-----------------------------------+ 
|    [null,null,null,2...| 
|    [null,null,null,2...| 
+-----------------------------------+ 

17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: { 
    "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e", 
    "runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b", 
    "name" : null, 
    "timestamp" : "2017-04-07T19:23:15.013Z", 
    "numInputRows" : 2, 
    "inputRowsPerSecond" : 125.0, 
    "processedRowsPerSecond" : 12.269938650306749, 
    "durationMs" : { 
    "addBatch" : 112, 
    "getBatch" : 8, 
    "getOffset" : 2, 
    "queryPlanning" : 4, 
    "triggerExecution" : 163, 
    "walCommit" : 26 
    }, 
    "eventTime" : { 
    "watermark" : "1970-01-01T00:00:00.000Z" 
    }, 
    "stateOperators" : [ ], 
    "sources" : [ { 
    "description" : "KafkaSource[Subscribe[test]]", 
    "startOffset" : { 
     "test" : { 
     "0" : 366 
     } 
    }, 
    "endOffset" : { 
     "test" : { 
     "0" : 368 
     } 
    }, 
    "numInputRows" : 2, 
    "inputRowsPerSecond" : 125.0, 
    "processedRowsPerSecond" : 12.269938650306749 
    } ], 
    "sink" : { 
    "description" : "[email protected]" 
    } 
} 

が、私はここで何かを見逃していました、と私は、コンソール上の値を取得しています。

答えて

0

あなたは、スキーマとfrom_jsonを使用することができ、次のいずれか

from pyspark.sql.functions import col, from_json 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("a", DoubleType()), 
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())]) 

data.select(from_json(col("value").cast("string"), schema)) 

またはget_json_objectと文字列として個々のフィールドを取得:

from pyspark.sql.functions import get_json_object 

data.select([ 
    get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c) 
    for c in ["a", "b", "name", "time"]]) 

cast彼らは後に、ニーズに応じて。

関連する問題