私はRabbitMQとともにスパークストリーミングを使用しています。そのため、ストリーミングジョブはRabbitMQからデータを取得し、変換とアクションを適用します。そこで、同じストリーミングで複数のアクション(つまり、2つの異なるフィーチャセットを計算する)を適用する方法を知りたいと思います。出来ますか?はい、コードに記載されているようにストリーミングオブジェクトを複数のクラスに渡すにはどうすればよいですか?同じスパークストリーミングで複数のアクションを実行する方法
ここからストリームを処理する方法 val config = ConfigFactory.parseFile(new File("SparkStreaming.conf"))
val conf = new SparkConf(true).setAppName(config.getString("AppName"))
conf.set("spark.cleaner.ttl", "120000")
val sparkConf = new SparkContext(conf)
val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval")))
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
:
val objProcessFeatureSet1 = new ProcessFeatureSet1(Some_Streaming_Object)
val objProcessFeatureSet2 = new ProcessFeatureSet2(Some_Streaming_Object)
ssc.start()
ssc.awaitTermination()
それはあなたの問題を解決した場合、あなたはそれを受け入れることができますしてください。 – Hokam
この解決策は 'JSONValue.parse'を両方回見直してから、両方の時にフィルタを実行することに注意してください。このシナリオでは、パーティション化した後、条件を分割してプロセスをフォークする方がよいでしょう。 –
こんにちは、マップ変換を使用して解析操作を実行した後に受け取った "jsonStream"でpersist演算子を使用できます。 – Hokam