0
私は、それぞれスキーマS1とS2を持つ2つのソケットストリームS1とS2を受け取りました。PythonによるSparkストリーミング:特定の属性に関する2つのストリームの結合
スパークストリーミングを使用して、属性 "a"に関してS1とS2を結合したいと思います。以下は私のコードです:
sc = SparkContext("local[3]", "StreamJoin")
ssc = StreamingContext(sc, 1)
S1 = ssc.socketTextStream("localhost", 9999)
S2 = ssc.socketTextStream("localhost", 8085)
# Create windowed stream
wS1 = S1.window(10)
wS2 = S2.window(1)
wS1.flatMap(lambda line: line.split(",")).pprint()
wS2.flatMap(lambda line: line.split(",")).pprint()
# Perform join
joinedStream = wS1.join(wS2)
joinedStream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x)))
ssc.start()
ssc.awaitTermination()
S1とS2の両方がコンマで区切られています。
ただし、上記のコードは完全な行に関して結合を実行しますが、
私は、特定の属性(この場合は属性 'a')に関して2つのストリームを結合することに興味があります。どのように私はこれを達成することができる?
ありがとうございます!