2017-08-22 10 views
0

私はソケットから入ってくるイベントを処理してから、イベントデータをウィンドウにして集約しようとしています。私は窓ガラスでぶつかった。 DataFrameのスキーマを指定しても、列に変換されないようです。ウィンドウと集約pyspark DataFrame

import sys 
from pyspark.sql.types import StructType, StringType, TimestampType, FloatType, IntegerType, StructField 

from pyspark.sql import SparkSession 
import pyspark.sql.functions as F 


if __name__ == "__main__": 
    # our data currently looks like this (tab separated). 
    # -SYMBOL DATE   PRICE TICKVOL BID   ASK 
    # NQU7 2017-05-28T15:00:00 5800.50 12  5800.50  5800.50 
    # NQU7 2017-05-28T15:00:00 5800.50 1  5800.50  5800.50 
    # NQU7 2017-05-28T15:00:00 5800.50 5  5800.50  5800.50 
    # NQU7 2017-05-28T15:00:00 5800.50 1  5800.50  5800.50 

    if len(sys.argv) != 3: 
     # print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) 
     exit(-1) 

    spark = SparkSession \ 
     .builder \ 
     .appName("StructuredTickStream") \ 
     .getOrCreate() 
    sc = spark.sparkContext 
    sc.setLogLevel('WARN') 

    # Read all the csv files written atomically in a directory 
    tickSchema = StructType([ 
     StructField("symbol", StringType(), True), 
     StructField("dt", TimestampType(), True), 
     StructField("price", FloatType(), True), 
     StructField("tickvol", IntegerType(), True), 
     StructField("bid", FloatType(), True), 
     StructField("ask", FloatType(), True) 
    ]) 

    events_df = spark \ 
     .readStream \ 
     .option("sep", "\t") \ 
     .option("host", sys.argv[1]) \ 
     .option("port", sys.argv[2]) \ 
     .format("socket") \ 
     .schema(tickSchema) \ 
     .load() 

    events_df.printSchema() 
    print("columns = ", events_df.columns) 

    ohlc_df = events_df \ 
     .groupby(F.window("dt", "5 minutes", "1 minutes")) \ 
     .agg(
      F.first('price').alias('open'), 
      F.max('price').alias('high'), 
      F.min('price').alias('low'), 
      F.last('price').alias('close') 
     ) \ 
     .collect() 


    query = ohlc_df \ 
     .writeStream \ 
     .outputMode("complete") \ 
     .format("console") \ 
     .start() 

    query.awaitTermination() 

print("columns = ", events_df.columns)の出力は['value']され、そしてプロセスは、以下のトレースで失敗します。

pyspark.sql.utils.AnalysisException: "cannot resolve '`dt`' given input columns: [value];;\n'Aggregate [timewindow('dt, 300000000, 60000000, 0)], [timewindow('dt, 300000000, 60000000, 0) AS window#3, first('price, false) AS open#7, max('price) AS high#9, min('price) AS low#11, last('price, false) AS close#13]\n+- StreamingRelation DataSource([email protected],socket,List(),Some(StructType(StructField(symbol,StringType,true), StructField(dt,TimestampType,true), StructField(price,FloatType,true), StructField(tickvol,IntegerType,true), StructField(bid,FloatType,true), StructField(ask,FloatType,true))),List(),None,Map(sep -> \t, host -> localhost, port -> 9999),None), textSocket, [value#0]\n" 

私が間違ってやっている任意のアイデア?

答えて

-1

データフレームには1つの列valueしか存在しません。ここでは、このevents_dfから列dtにアクセスしようとしています。これが問題の主な理由です。はっきり声明以下

は、それはあなたはそれが1つの列のみでDFを作成しているのはなぜこの

events_df = spark \ 
    .readStream \ 
    .option("sep", "\t") \ 
    .option("host", sys.argv[1]) \ 
    .option("port", sys.argv[2]) \ 
    .format("socket") \ 
    .schema(tickSchema) \ 
    .load() 

を検査する必要がある単一の列にvalue

print("columns = ", events_df.columns) 

を持って示しています。

関連する問題