私は、6つの入力DStreamを作成する次のコードを作成します。これは、直接aproachを使用してKafkaから6パーティションのトピックを読み込みます。ストリームに対して同じグループIDを指定しても、 6回。私は3 DStreamsを作成した場合、私は、データが3回繰り返さなど....私は間違ってここで何をスパークストリーミング。 Kafkaから並行して読み込みを繰り返すとデータが返される
numStreams = 6
kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6partitions"], {
"metadata.broker.list": brokers,
"fetch.message.max.bytes": "20971520",
"spark.streaming.blockInterval" : "2000ms",
"group.id" : "the-same"},
valueDecoder = decodeValue, keyDecoder = decode_key) for _ in range (numStreams)]
kvs = ssc.union(*kafkaStreams)
をやってる?取得しますか
@Doctorトピックごとに1つのDStreamでアプローチを試みましたか?それは今あなたのために働いていますか? –