1

私はRabbitMQとともにスパークストリーミングを使用しています。そのため、ストリーミングジョブはRabbitMQからデータを取得し、変換とアクションを適用します。そこで、同じストリーミングで複数のアクション(つまり、2つの異なるフィーチャセットを計算する)を適用する方法を知りたいと思います。出来ますか?はい、コードに記載されているようにストリーミングオブジェクトを複数のクラスに渡すにはどうすればよいですか?同じスパークストリーミングで複数のアクションを実行する方法

ここからストリームを処理する方法
  val config = ConfigFactory.parseFile(new File("SparkStreaming.conf")) 
      val conf = new SparkConf(true).setAppName(config.getString("AppName")) 
      conf.set("spark.cleaner.ttl", "120000")   

      val sparkConf = new SparkContext(conf) 
      val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval"))) 

      val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey")) 
      val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 
      receiverStream.start() 

  val objProcessFeatureSet1 = new ProcessFeatureSet1(Some_Streaming_Object) 
      val objProcessFeatureSet2 = new ProcessFeatureSet2(Some_Streaming_Object) 

      ssc.start() 
      ssc.awaitTermination() 

答えて

3

下図のようにあなたが同じDSTREAMに複数のアクションを実行することができます:私が最初に作成していスニペット上記のコードで

import net.minidev.json.JSONValue 
import net.minidev.json.JSONObject 

val config = ConfigFactory.parseFile(new File("SparkStreaming.conf")) 
val conf = new SparkConf(true).setAppName(config.getString("AppName")) 
conf.set("spark.cleaner.ttl", "120000")   

val sparkConf = new SparkContext(conf) 
val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval"))) 

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey")) 
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 

val jsonStream = receiverStream.map(byteData => { 
    JSONValue.parse(byteData) 
}) 
jsonStream.filter(json => { 
    var customerType = json.get("customerType") 
    if(customerType.equals("consumer")) 
     true 
    else 
     false 
}).foreachRDD(rdd => { 
    rdd.foreach(json => { 
     println("json " + json) 
    }) 
}) 

jsonStream.filter(json => { 
    var customerType = json.get("customerType") 
    if(customerType.equals("non-consumer")) 
       true 
     else 
       false 
}).foreachRDD(rdd => { 
    rdd.foreach(json => { 
      println("json " + json) 
    }) 
}) 
ssc.start() 
ssc.awaitTermination() 

を受信したストリームからjsonStreamを作成し、顧客タイプに基づいて2つの異なるストリームを作成してから(foreachRDD)を適用します結果を印刷するためのアクション。

同様の方法で、同じdstreamを2つの異なるクラスに渡して、その中の変換とアクションを適用して異なるフィーチャセットを計算することができます。

上記の説明を参考にして問題を解決することをお勧めします。

おかげで、
Hokam

+0

それはあなたの問題を解決した場合、あなたはそれを受け入れることができますしてください。 – Hokam

+0

この解決策は 'JSONValue.parse'を両方回見直してから、両方の時にフィルタを実行することに注意してください。このシナリオでは、パーティション化した後、条件を分割してプロセスをフォークする方がよいでしょう。 –

+0

こんにちは、マップ変換を使用して解析操作を実行した後に受け取った "jsonStream"でpersist演算子を使用できます。 – Hokam

関連する問題