0
は私が複数のカフカを受け入れることができpysparkプログラムそうのような設定で述べたストリームを持っていますPyspark複数カフカストリーム上書きする変数
aaaaa
aaaaa
aaaaa
...
:STREAM1は次の着信内容を持っている場合
from configobj import ConfigObj
config = ConfigObj("my.conf")
for i, j in conf.iteritems():
stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache()
stream.pprint()
は今言いますとストリーム2は、以下の内容があります。
PPRINT機能を使用してbbbbb
bbbbb
bbbbb
...
、私は次のような出力を見て期待していた。
-----------------------------
2017-09-13 16:54:32
-----------------------------
aaaaa
aaaaa
aaaaa
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
しかし、私は次の出力を参照してください。
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
を私はforループの2回目の反復の後に読み込まれた後に、変数stream
を読み込んでいる遅延ロードまたは何かがあるように見えます。誰も私にこれを達成する方法を知らせることができますので、forループで2つの別々のストリームを処理することができます。
ありがとうございます!
おかげ張。しかし、私はストリームの和集合を必要としません、代わりにいくつかのロジックに基づいて両方のストリームに異なるmap reduce関数を適用したいと思います。私は別々にストリームが必要です。 –