1つのスパークストリームジョブ内から2つのカフカトピックにどのように接続できますか?トピックは一度に。一度に2つのカフカトピックに接続することはできますが、一度に1つしか処理しないでください
トピック#1から処理する機能が必要で、ある条件に基づいて処理トピック#2に切り替えます。
スパークでどうすればいいですか?
1つのスパークストリームジョブ内から2つのカフカトピックにどのように接続できますか?トピックは一度に。一度に2つのカフカトピックに接続することはできますが、一度に1つしか処理しないでください
トピック#1から処理する機能が必要で、ある条件に基づいて処理トピック#2に切り替えます。
スパークでどうすればいいですか?
私は、2つのストリーミングコンテキストオブジェクトを作成し、トピックを変更する必要があるときにストリームを正常に停止して開始できると信じています。テストのために
私はqueueStream
data = 'abcdefgh'
rddQueue1 = map(lambda x: sc.parallelize(x), zip(*[iter(data)] * 2))
rddQueue2 = map(lambda x: sc.parallelize(x), zip(*[iter(data.upper())] * 2))
s1, s2 = ssc.queueStream(rddQueue1), ssc.queueStream(rddQueue2)
s3 = s1.transformWith(lambda t, x, y: x if int(str(t)[-1]) % 2 else y, s2)
使用し、KafkaUtilsも()
をDStream.transformWithをサポートしています