複数のRabbitMQキューに対してSparkストリーミングを設定しようとしていました。以下に述べるように私は2人の作業員を設定し、各作業員には1つのコアと2GBのメモリが与えられます。だから問題は、このパラメータをconf.set("spark.cores.max","2")
にしておけば、ストリーミングはジョブを追加するだけのデータを処理しないということです。しかし、一度それを設定するとconf.set("spark.cores.max","3")
ストリーミングが処理を開始します。だから、私はこれの理由を理解できませんでした。また、両方のキューから並列にデータを処理したい場合、どうすればよいのですか。私は私のコードと設定を以下に述べました。複数のrabbitmqキューからのSparkストリーミング処理を並列に処理する
Spark-env.sh:
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_CORES=1
Scalaのコード:
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1"))
val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams)
predReceiverStream.start()