2017-12-05 9 views
0

私は、それぞれスキーマS1とS2を持つ2つのソケットストリームS1とS2を受け取りました。PythonによるSparkストリーミング:特定の属性に関する2つのストリームの結合

スパークストリーミングを使用して、属性 "a"に関してS1とS2を結合したいと思います。以下は私のコードです:

sc = SparkContext("local[3]", "StreamJoin") 
    ssc = StreamingContext(sc, 1) 

    S1 = ssc.socketTextStream("localhost", 9999) 
    S2 = ssc.socketTextStream("localhost", 8085) 

    # Create windowed stream 
    wS1 = S1.window(10) 
    wS2 = S2.window(1) 

    wS1.flatMap(lambda line: line.split(",")).pprint() 
    wS2.flatMap(lambda line: line.split(",")).pprint() 

    # Perform join 
    joinedStream = wS1.join(wS2) 

    joinedStream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))) 

    ssc.start()    
    ssc.awaitTermination() 

S1とS2の両方がコンマで区切られています。

ただし、上記のコードは完全な行に関して結合を実行しますが、

私は、特定の属性(この場合は属性 'a')に関して2つのストリームを結合することに興味があります。どのように私はこれを達成することができる?

ありがとうございます!

答えて

0

スパークで動作するウェイジョインは、キーに基づいてrdd行を結合します。キーは、行[0]の値です。あなたはこうすることができます:

wS1.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint() 
wS2.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint() 

そして、スプリットリストの最初の要素に基づいて結合が行われます。

ドキュメント参照:

https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=join#pyspark.RDD.join

関連する問題