2017-10-13 10 views
0

ソケットストリームから各レコードを取得しようとしています。レコードを行の文字列データ型にしたい。 Pythonでコードを書くには?ありがとう!socketTextStreamから文字列形式でレコードを取得する方法

モデル= pipeline.PipelineModel.read()。負荷(model_path)

SC = spark.sparkContext SSC = StreamingContext(SC、1)

線= ssc.socketTextStream(sys.argvの[ 1]、INT(sys.argvの[2]))

場合(ライン)なしておりません。 lines.foreachRDD(ラムダRDD:rdd.foreach(processRecord))

DEF processRecord(レコード):

print("test") 
... 

答えて

0
from __future__ import print_function 
import sys 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 


if __name__ == "__main__": 
    sc = SparkContext(appName="Demo") 
    ssc = StreamingContext(sc, 1) 

    #record = ssc.socketTextStream("localhost", 9999) 
    record = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
    # print out each single word 
    record.flatMap(lambda line: line.split(" ")).pprint() 

    # start streaming 
    ssc.start() 
    # stop when the socket we are listening is dead 
    ssc.awaitTermination() 

ありがとうございました。

+0

レコードは文字列型ではありません – icecream

+0

私はそこにコードを追加しました。 plsが私のコードに何が間違っているかを確認します。ありがとう! – icecream

関連する問題