3

DSTREAMがunionの2種類を提供ストリーミングスパークでの労働組合の2種類の異なるあります:は、任意の

StreamingContext.union(Dstreams) 

Dstream.union(anotherDstream) 

をだから私は、特に並列処理性能で、異なっているかを知りたいです。

答えて

2

2つの操作のソースコードを見ると、DStreamを入力とし、もう1つをリストとする以外は違いはありません。

StreamingContext

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope { 
    new UnionDStream[T](streams.toArray) 
} 

Dstream

したがって
def union(that: DStream[T]): DStream[T] = ssc.withScope { 
    new UnionDStream[T](Array(this, that)) 
} 

、あなたが使用するものを持っていたすべきパフォーマンスの向上はありません、あなたの好みに依存します。あなたが結合するストリームのリストを持っている場合、StreamingConextのメソッドはコードを少し簡略化します。したがって、この場合は望ましいかもしれません。

+0

答えを投稿する前に、私は自分の準備ができていましたが、どこかに行かなければならなかったので、とにかく私の投稿をすることにしました。どうぞご覧ください。 – gsamaras

0

DStreamは2種類のユニオンを提供しています」という申し立ては正しくありません。

refには、異なる署名、より具体的には、ユニオン操作を提供する異なるクラスが記載されています。

StreamingContext.union(* dstreams)

同じ型、同じスライド持続時間の複数DStreamsから統一DSTREAMを作成します。このDSTREAMと別のDSTREAMの統一データによって

DStream.union(その他)

戻り新しいDSTREAM。 パラメータ:other - このDStreamと同じ間隔(つまり、slideDuration)を持つ別のDStream。

後にはSpark User Listで議論された:「組合関数は単に両方の要素を持つDSTREAMを返します。これは、我々がRDDSに労働組合を呼び出したときと同じ動作です。」。


StreamingContextのソースコード:

def union(self, *dstreams): 
    ... 
    first = dstreams[0] 
    jrest = [d._jdstream for d in dstreams[1:]] 
    return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) 

DStreamのソースコード:

def union(self, other): 
    return self.transformWith(lambda a, b: a.union(b), other, True) 

あなたは他のtransformWithを使用しながら、最初は、(予想通り)、再帰を使用していることがわかります、これは同じクラスで定義され、各RDDを変換します。


覚えておくべき事は、場合によってはデータ受信がシステムのボトルネックになっていること、そして、データ受信処理を並列化を検討すると良いでしょうLevel of Parallelism in Data Receiving、です。

この結果、の方法を複数に適用することが推奨され、コードをきれいに保ちながら、これを簡単に行う方法が提供されました。 IMHO、パフォーマンスに違いはありません。