2017-08-19 7 views
3

queueStream機能の機能は何ですか?私の理解によると、それは入ってくるDStreamをキューに入れるキューです。そうであれば、ノードが多いクラスタでどのように処理されるのか。各ノードにはqueueStreamがあり、DStreamはクラスタ内のすべてのノードに分割されていますか?このqueueStreamはクラスタ設定でどのように機能しますか?SparkStreamingでのqueueStreamの機能と有効化?

私は[Spark Streaming documentation] [https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources]で以下の説明を読んでいますが、完全に理解できませんでした。それを理解するのを助けてください。

ストリームとしてキューRDDSの:テストデータとスパークストリーミングアプリケーションをテストするための、一つはまたstreamingContext.queueStream(queueOfRDDs)を使用して、RDDSのキューに基づいてDSTREAMを作成することができます。キューにプッシュされた各RDDは、DStream内のデータのバッチとして処理され、ストリームのように処理されます。上記のコードの部分が異なるノード上のパーティションに関して、スパーク・ストリーミング・コンテキストで実行されますどのように

val myQueueRDD= scala.collection.mutable.Queue[RDD[MyObject]]() 
val myStream= ssc.queueStream(myQueueRDD) 

for(count <- 1 to 100) { 
     val randomData= generateData() //Generated random data 
     val rdd= ssc.sparkContext.parallelize(randomData) //Creates the rdd of the random data. 
     myQueueRDD+= rdd //Addes data to queue. 
} 

myStream.foreachRDD(rdd => rdd.mapPartitions(data => evaluate(data))) 

答えて

2

QueueInputDStreamは、テスト用です。それは標準scala.collection.mutable.Queueを使用して、受信バッチを模倣するRDDsを格納します。

は、各ノードがこのqueueStreamを持って行い、DSTREAMは、クラスタ内のすべてのノード間で分配さ

号ありキューのコピーは1つだけであり、すべてのデータ分布がRDDsによって処理されます。 computeロジックは、現在のキューのdequeueoneAtATimetrueに設定されている)またはuniononeAtATimefalseに設定されている)が各ティックで非常に単純です。これは一般にDStreamsに適用されます。各ストリームはデータ配信メカニズムを提供する一連のRDDです。

APIはまだInputDStreamのAPIに従っていますが、概念的には、ローカルのコレクションであり、そこからすべてbatchDurationという要素を取ります。

+1

あなたは私に追いついています! ;-) – eliasah

+0

はい。キューが1つしかない場合、RDD配信がどのように行われるかは、明確にしたい部分です。キューがドライバ内にのみ存在する場合は、すべてのノードでどのように並列処理が行われるかをパーティション化して計算しますか? –

+0

あなたは疑問を明確にする質問の編集をチェックアウトできますか?user6910411 –

関連する問題