0

は、構造化されたストリーミングを開始するためにkafkaに来るjsonを解析するための私の最初の試みの助けを必要とします。スパークストリーミングのネストされたjsonをkafka上でフラットなデータフレームに変換する方法はありますか?

私は入って来るjsonを変換し、後で処理するためにフラットなデータフレームに変換するのに苦労しています。

マイ入力JSONは

[ 
    { "siteId": "30:47:47:BE:16:8F", "siteData": 
     [ 
      { "dataseries": "trend-255", "values": 
       [ 
        {"ts": 1502715600, "value": 35.74 }, 
        {"ts": 1502715660, "value": 35.65 }, 
        {"ts": 1502715720, "value": 35.58 }, 
        {"ts": 1502715780, "value": 35.55 } 
       ] 
      }, 
      { "dataseries": "trend-256", "values": 
       [ 
        {"ts": 1502715840, "value": 18.45 }, 
        {"ts": 1502715900, "value": 18.35 }, 
        {"ts": 1502715960, "value": 18.32 } 
       ] 
      } 
     ] 
    }, 
    { "siteId": "30:47:47:BE:16:FF", "siteData": 
     [ 
      { "dataseries": "trend-255", "values": 
       [ 
        {"ts": 1502715600, "value": 35.74 }, 
        {"ts": 1502715660, "value": 35.65 }, 
        {"ts": 1502715720, "value": 35.58 }, 
        {"ts": 1502715780, "value": 35.55 } 
       ] 
      }, 
      { "dataseries": "trend-256", "values": 
       [ 
        {"ts": 1502715840, "value": 18.45 }, 
        {"ts": 1502715900, "value": 18.35 }, 
        {"ts": 1502715960, "value": 18.32 } 
       ] 
      } 
     ] 
    } 
] 

スパークスキーマは私の非常に単純なコードがある

data1_spark_schema = ArrayType(
StructType([ 
    StructField("siteId", StringType(), False), 
    StructField("siteData", ArrayType(StructType([ 
    StructField("dataseries", StringType(), False), 
    StructField("values", ArrayType(StructType([ 
     StructField("ts", IntegerType(), False), 
     StructField("value", StringType(), False) 
    ]), False), False) 
    ]), False), False) 
]), False 
) 

です:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import * 

from config.general import kafka_instance 
from config.general import topic 
from schemas.schema import data1_spark_schema 

spark = SparkSession \ 
      .builder \ 
      .appName("Structured_BMS_Feed") \ 
      .getOrCreate() 

stream = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", kafka_instance) \ 
     .option("subscribe", topic) \ 
     .option("startingOffsets", "latest") \ 
     .option("max.poll.records", 100) \ 
     .option("failOnDataLoss", False) \ 
     .load() 

stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1") \ 
         .select(from_json("bms_data1", data1_spark_schema).alias("bms_data1")) 

sites = stream_records.select(explode("bms_data1").alias("site")) \ 
         .select("site.*") 

sites.printSchema() 

stream_debug = sites.writeStream \ 
          .outputMode("append") \ 
          .format("console") \ 
          .option("numRows", 20) \ 
          .option("truncate", False) \ 
          .start() 


stream_debug.awaitTermination() 

私は、スキーマは次のように印刷され、このコードIを実行すると:

root 
|-- siteId: string (nullable = false) 
|-- siteData: array (nullable = false) 
| |-- element: struct (containsNull = false) 
| | |-- dataseries: string (nullable = false) 
| | |-- values: array (nullable = false) 
| | | |-- element: struct (containsNull = false) 
| | | | |-- ts: integer (nullable = false) 
| | | | |-- value: string (nullable = false) 

このスキーマは、ネストされたjsonの代わりにフラットなデータフレーム内のすべてのフィールドを取得する方法で使用できますか?だから、すべてのtsと値のために、それは私の親dataseriesとサイトIDと1つの行を与える必要があります。

+0

は、重複質問だ:[これを参照してください]:(https://stackoverflow.com/questions/35027966/elegant-json-flatten-in-spark) –

答えて

0

私自身の質問に答える。私は、以下の行を使用して、それを平らにするために管理:

sites_flat = stream_records.select(explode("bms_data1").alias("site")) \ 
          .select("site.siteId", explode("site.siteData").alias("siteData")) \ 
          .select("siteId", "siteData.dataseries", explode("siteData.values").alias("values")) \ 
          .select("siteId", "dataseries", "values.*") 
関連する問題