2017-10-10 1 views
0

私はakkaストリーミングを始めました。私はgithubから以下の例を実行しています。しかし、 "Helloer"アクターへのメッセージは、出力コンソールで受信されず、表示されません。スパークストリーミングのakkaストリーム:メッセージはアクターに配信されません。死んだ手紙を受け取る

StreamingApp.scala

import _root_.akka.actor.{ Actor, Props } 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.akka.{ ActorReceiver, AkkaUtils } 

class Helloer extends ActorReceiver { 
    override def preStart() = { 
    println("") 
    println("=== Helloer is starting up ===") 
    println(s"=== path=${context.self.path} ===") 
    println("") 
    } 
    def receive = { 
    // store() method allows us to store the message so Spark Streaming knows about it 
    // This is the integration point (from Akka's side) between Spark Streaming and Akka 
    case s => store(s) 
    } 
} 

object StreamingApp { 
    def main(args: Array[String]) { 
    // Configuration for a Spark application. 
    // Used to set various Spark parameters as key-value pairs. 
    val driverPort = 7777 
    val driverHost = "localhost" 
    val conf = new SparkConf() 
     .setMaster("local[*]") // run locally with as many threads as CPUs 
     .setAppName("Spark Streaming with Scala and Akka") // name in web UI 
     .set("spark.logConf", "true") 
     .set("spark.driver.port", driverPort.toString) 
     .set("spark.driver.host", driverHost) 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val actorName = "helloer" 

    // This is the integration point (from Spark's side) between Spark Streaming and Akka system 
    // It's expected that the actor we're now instantiating will `store` messages (to close the integration loop) 
    val actorStream = AkkaUtils.createStream[String](ssc, Props[Helloer](), actorName) 

    // describe the computation on the input stream as a series of higher-level transformations 
    actorStream.reduce(_ + " " + _).print() 

    // Custom receiver 
    import pl.japila.spark.streaming.CustomReceiverInputDStream 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    val input: ReceiverInputDStream[String] = ssc.receiverStream[String](CustomReceiverInputDStream(StorageLevel.NONE)) 
    input.print() 

    // Data Ingestion from Kafka 
    import org.apache.spark.streaming.kafka._ 

    // start the streaming context so the data can be processed 
    // and the actor gets started 
    ssc.start() 

    // FIXME wish I knew a better way to handle the asynchrony 
    java.util.concurrent.TimeUnit.SECONDS.sleep(3) 

    import _root_.akka.actor.ActorSystem 
    val actorSystem = ActorSystem("SparkStreamingAkka") 

    val url = s"akka.tcp://[email protected]$driverHost:$driverPort/user/Supervisor0/$actorName" 
    val helloer = actorSystem.actorSelection(url) 
    helloer ! "Hello" 
    helloer ! "from" 
    helloer ! "Spark Streaming" 
    helloer ! "with" 
    helloer ! "Scala" 
    helloer ! "and" 
    helloer ! "Akka" 

    import java.util.concurrent.TimeUnit.MINUTES 
    ssc.awaitTerminationOrTimeout(timeout = MINUTES.toMillis(1)) 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    } 
} 

customeReceiverInputDstream実装を使用してプログラム。以下はカスタムレシーバーです。

customeReceiverInputDstream.scala

package pl.japila.spark.streaming 

import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.storage.StorageLevel 

    case class CustomReceiverInputDStream[T](override val storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 
     def onStart() { 
     println("\nHello from CustomReceiver.START\n") 
     } 

     def onStop() { 
     println("\nHello from CustomReceiver.STOP\n") 
     } 
    } 

以下は、私が取得していますメッセージdeadletter出力されます。

    . 
        . 
        . 

Hello from CustomReceiver.START 

        . 
        . 
        . 

17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805400 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805600 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805600 
[INFO] [10/10/2017 08:00:05.693] [Executor task launch worker-0] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] 
[INFO] [10/10/2017 08:00:05.696] [Executor task launch worker-0] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552] 
17/10/10 08:00:05 INFO ActorReceiverSupervisor: Supervision tree for receivers initialized at:akka://streaming-actor-system-0/user/Supervisor0 
17/10/10 08:00:05 INFO ReceiverSupervisorImpl: Called receiver 0 onStart 
17/10/10 08:00:05 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped 
17/10/10 08:00:05 INFO ActorReceiverSupervisor: Started receiver worker at:akka://streaming-actor-system-0/user/Supervisor0/helloer 

=== Helloer is starting up === 
=== path=akka://streaming-actor-system-0/user/Supervisor0/helloer === 

17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805800 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805800 
17/10/10 08:00:06 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636806000 
           . 
           . 
           . 
+0

なぜあなたは「actorOf」の代わりに「actorSelection」を使用ドントに動作します。 Akka文書によると、「actorSelectionは、メッセージが配信されたとき、つまりアクタを作成しないとき、またはセレクションが作成されたときにアクタの存在を確認するときに、既存のアクタを検索するだけです。 – EmiCareOfCell44

+0

@ EmiCareOfCell44はactorOfを使って試しました。今回はデッド・レター・メッセージはありません。しかし、送信されたメッセージは上記のコードで書かれているように表示されませんでした。 – Mahesh

+0

case節の中にprintlnはありません。何が印刷されると思いますか? – EmiCareOfCell44

答えて

0

[OK]を、私は参照してください。ここで問題となるのは、Sourceとして動作するように作成されたActor、「hello」は別のActorSystemで開始され、このコードは別のActorSystemのakka.remoteを介して「SparkStreaminAkka」という名前のものから検索します。 tcp urlが使用されます。このコードでは、それはうまくいかず、さらに詳しい調査が必要です...しかし、この例では、別のActorSystemを使用することは必須ではありません。周りの作業は次のようになります。

import _root_.akka.actor.{Actor, Props} 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} 

class Helloer extends ActorReceiver { 
    override def preStart() = { 
    println("") 
    println("=== Helloer is starting up ===") 
    println(s"=== path=${context.self.path} ===") 
    println("") 
    } 
    def receive = { 
    // store() method allows us to store the message so Spark Streaming knows about it 
    // This is the integration point (from Akka's side) between Spark Streaming and Akka 
    case s => store(s) 
    } 
} 


// Create a common actor system 
object CreateActorSystem { 
    lazy val as = _root_.akka.actor.ActorSystem("ActorSystemSpark") 
} 

object StreamingApp { 
    import StreamingApp._ 

    def main(args: Array[String]) { 
    // Configuration for a Spark application. 
    // Used to set various Spark parameters as key-value pairs. 
    val driverPort = 7777 
    val driverHost = "localhost" 
    val conf = new SparkConf() 
     .setMaster("local[*]") // run locally with as many threads as CPUs 
     .setAppName("Spark Streaming with Scala and Akka") // name in web UI 
     .set("spark.logConf", "true") 
     .set("spark.driver.port", driverPort.toString) 
     .set("spark.driver.host", driverHost) 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val actorName = "helloer" 

    // This is the integration point (from Spark's side) between Spark Streaming and Akka system 
    // It's expected that the actor we're now instantiating will `store` messages (to close the integration loop) 

    // Pass actorsystem as parameter 
    val actorStream = AkkaUtils.createStream[String](ssc, Props[Helloer](), actorName, actorSystemCreator =() => CreateActorSystem.as) 

    // describe the computation on the input stream as a series of higher-level transformations 
    actorStream.reduce(_ + " " + _).print() 

    // Custom receiver 
    import pl.japila.spark.streaming.CustomReceiverInputDStream 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    val input: ReceiverInputDStream[String] = ssc.receiverStream[String](CustomReceiverInputDStream(StorageLevel.NONE)) 
    input.print() 

    // Data Ingestion from Kafka 
    //import org.apache.spark.streaming.kafka._ 

    // start the streaming context so the data can be processed 
    // and the actor gets started 
    ssc.start() 

    // FIXME wish I knew a better way to handle the asynchrony 
    java.util.concurrent.TimeUnit.SECONDS.sleep(3) 

    import _root_.akka.actor.ActorSystem 

    val actorSystem = CreateActorSystem.as 

    //Get the actor from the path. There is no nedd o akka.remote 
    val helloer = actorSystem.actorSelection("/user/Supervisor0/helloer") 

    helloer ! "Hello" 
    helloer ! "from" 
    helloer ! "Spark Streaming" 
    helloer ! "with" 
    helloer ! "Scala" 
    helloer ! "and" 
    helloer ! "Akka" 

    import java.util.concurrent.TimeUnit.MINUTES 
    ssc.awaitTerminationOrTimeout(timeout = MINUTES.toMillis(1)) 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    } 
} 

これは

+0

ありがとう!あなたの訂正は動作しています。これについては、akka.tcpを参照してください。 – Mahesh

+0

'actorOf 'を使ってリモートメッセージングを行うことができます。しかし、まだ 'actorSelection'https://stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-worを使用すると問題に直面します。 – Mahesh

関連する問題