2017-09-13 8 views
0

は私が複数のカフカを受け入れることができpysparkプログラムそうのような設定で述べたストリームを持っていますPyspark複数カフカストリーム上書きする変数

aaaaa 
aaaaa 
aaaaa 
... 
:STREAM1は次の着信内容を持っている場合
from configobj import ConfigObj 
config = ConfigObj("my.conf") 
for i, j in conf.iteritems(): 
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache() 
    stream.pprint() 

は今言いますとストリーム2は、以下の内容があります。

PPRINT機能を使用して
bbbbb 
bbbbb 
bbbbb 
... 

、私は次のような出力を見て期待していた。

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
aaaaa 
aaaaa 
aaaaa 
... 

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

しかし、私は次の出力を参照してください。

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

を私はforループの2回目の反復の後に読み込まれた後に、変数streamを読み込んでいる遅延ロードまたは何かがあるように見えます。誰も私にこれを達成する方法を知らせることができますので、forループで2つの別々のストリームを処理することができます。

ありがとうございます!

答えて

0

ような何か:ご返信用

streams = [] 
config = ConfigObj("my.conf") 
for i, j in conf.iteritems(): 
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache() 
    streams.append(stream) 

all_topic = ssc.union(*streams)  
all_topic.pprint() 
+0

おかげ張。しかし、私はストリームの和集合を必要としません、代わりにいくつかのロジックに基づいて両方のストリームに異なるmap reduce関数を適用したいと思います。私は別々にストリームが必要です。 –

関連する問題