2016-10-17 16 views
0

複数のRabbitMQキューに対してSparkストリーミングを設定しようとしていました。以下に述べるように私は2人の作業員を設定し、各作業員には1つのコアと2GBのメモリが与えられます。だから問題は、このパラメータをconf.set("spark.cores.max","2")にしておけば、ストリーミングはジョブを追加するだけのデータを処理しないということです。しかし、一度それを設定するとconf.set("spark.cores.max","3")ストリーミングが処理を開始します。だから、私はこれの理由を理解できませんでした。また、両方のキューから並列にデータを処理したい場合、どうすればよいのですか。私は私のコードと設定を以下に述べました。複数のrabbitmqキューからのSparkストリーミング処理を並列に処理する

Spark-env.sh:

SPARK_WORKER_MEMORY=2g 
SPARK_WORKER_INSTANCES=1 
SPARK_WORKER_CORES=1 

Scalaのコード:

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys")) 
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 
    receiverStream.start()  

    val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1")) 
    val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams) 
    predReceiverStream.start() 

答えて

1

この動作はStreaming Guideに説明されています。各レシーバは、長時間実行されるプロセスであり、単一のスレッドを占有します。

使用可能なスレッドの数が少ない、または受信機の数に等しい場合に何のリソースが存在しないタスク処理のために残さ:

スパークストリーミングアプリケーションに割り当てられたコアの数は、より多くなければなりません受信機の数。そうしないと、システムはデータを受信しますが、処理することはできません。

関連する問題