2016-11-22 10 views
3

私は、6つの入力DStreamを作成する次のコードを作成します。これは、直接aproachを使用してKafkaから6パーティションのトピックを読み込みます。ストリームに対して同じグループIDを指定しても、 6回。私は3 DStreamsを作成した場合、私は、データが3回繰り返さなど....私は間違ってここで何をスパークストリーミング。 Kafkaから並行して読み込みを繰り返すとデータが返される

numStreams = 6 
kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6partitions"], { 
    "metadata.broker.list": brokers, 
    "fetch.message.max.bytes": "20971520", 
    "spark.streaming.blockInterval" : "2000ms", 
    "group.id" : "the-same"}, 
    valueDecoder = decodeValue, keyDecoder = decode_key) for _ in range (numStreams)] 

kvs = ssc.union(*kafkaStreams) 

をやってる?取得しますか

答えて

1

直接アプローチでは、1つのトピックから多くのDStreamを作成しません。 documentationから

簡体並列処理:カフカは や労働組合、それらを複数の入力ストリームを作成する必要はありません。 directStreamを使用すると、Spark Streamingは、 個のRDDパーティションを作成し、消費するカフカパーティションを作成します。これは、すべて のKafkaからのすべてのデータを並行して読み込みます。そのため、KafkaパーティションとRDDパーティションの間には1対1のマッピング があります。これは理解しやすく、 に調整されています。

だから1つのDSTREAMを作成し、スパーク全てカフカパーティションを:)使用します

+0

@Doctorトピックごとに1つのDStreamでアプローチを試みましたか?それは今あなたのために働いていますか? –

1

私はPythonに精通していませんが、Spark Scalaのダイレクトストリームはオフセットをコミットしません。したがって、読んだメッセージのオフセットをコミットせずにストリームをn回開いた場合、消費者は最初から開始します。

Pythonで同じ場合は、n個のストリームを開始する必要はありません。 1つのストリームを開始すると、Sparkはexecutor/tasks自体へのパーティションの分配を処理します。

1

基本的にはカフカのトピックでは、DSTREAMを作成し、これまでload.Byのデフォルトを共有することにより、複数の受信機/消費者のためのより高速な配信を行うために小分けされています受信者スレッド(Javaスレッド)によって並列にDstreamパーティションにデータを送信し、各Kafkaトピックパーティションからデータを受信します。 1つのトピックに対して6つのDstreamを作成している場合は、同じトピックの6つの受信者を意味しています。各レシーバーはすべてのフィードを1回取得するので、各フィードは6回ずつ取得されます。

関連する問題