2017-01-28 5 views
1

Apache Bahirを使ってAkkaに接続する、Spark Streamingで簡単なプロセスをセットアップしようとしています。私はこのolder oneと一緒にtheir exampleに従おうとしました。Apache Bahir、ActorReceiverにものを送る

akka { 
    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 7777 
    } 
    } 
} 

と私の問題は、次のとおりです:私は、単純なフォワーダ俳優

class ForwarderActor extends ActorReceiver { 
    def receive = { 
    case data: MyData => store(data) 
    } 
} 

を持っていると私は

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName) 

に構成されたストリームを作成するには、次のようになり、私はにメッセージを送信しますかフォワーダー俳優?この場合、Akka Remoteがどのように使われているのか分かりません。アプリを起動すると、私はログに

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:7777] 

を参照し、後で私は思わ

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:52369] 

ScalaDocに説明を思い出させるために参照してください。

/** 
    * A default ActorSystem creator. It will use a unique system name 
    * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote 
    * communication. 
    */ 

すべてを、すべてのIに私がどのようにForwarderの俳優にメッセージを送ることになっているのか分かりません。助けてくれてありがとう!

答えて

0

アクカのアクタは、リモートJVM上で動作している他のアクタのアクタにメッセージを送信できます。したがって、送信者の俳優が、意図された受信者の俳優の住所を知る必要があるとき。

AkkaUtil(Bahir)を使用すると、ReceiverActorが受信するメッセージからスパークストリームを作成できます。しかし、どこからメッセージを受け取るのですか?まあ...遠隔地の俳優。そしてメッセージを送るために、このリモートアクターはあなたのsparkアプリケーションで動いているReceiverActorのアドレスが必要になります。

一般に、あなたのスパークアプリケーションを実行するIPについてはあまりにも確信が持てません。それで、sparkを使っている俳優がプロデューサーの俳優にそのリファレンスを伝えて、それを送るよう要求するようにします。

両方のアプリケーションが同じバージョンのScalaを使用して記述され、同じJREを実行していることを確認してください。今

...は、最初のデータソース、今

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 

case class SendMeYourStringsRequest(requesterRef: ActorRef) 
case class RequestedString(s: String) 

class MyActor extends Actor with ActorLogging { 

    val theListOfMyStrings = List("one", "two", "three") 

    override def receive: Receive = { 
    case SendMeYourStringsRequest(requesterRef) => { 
     theListOfMyStrings.foreach(s => { 
     requesterRef ! RequestedString(s) 
     }) 
    } 
    } 
} 

object MyApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="my-ip-address" 
     |  port=18000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("my-actor-system", config) 

    var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor") 

} 

だろう俳優を書き込むことができます...

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} 

case class SendMeYourStringsRequest(requesterRef: ActorRef) 
case class RequestedString(s: String) 

class YourStringRequesterActor extends ActorReceiver { 
    def receive = { 
    case RequestedString(s) => store(s) 
    } 

    override def preStart(): Unit = { 
    val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor") 
    val myActorSelection = context.actorSelection(myActorPath) 

    myActorSelection ! SendMeYourStringsRequest(self) 
    } 
} 

object YourSparkApp { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("ActorWordCount") 

    if (!sparkConf.contains("spark.master")) { 
     sparkConf.setMaster("local[2]") 
    } 

    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    val stringStream = AkkaUtils.createStream[String](
     ssc, 
     Props(classOf[YourStringRequesterActor]), 
     "your-string-requester-actor" 
    ) 

    stringStream.foreach(println) 

    } 
} 

注、私たちの簡単な火花アプリを書く::だけ取ることができますケアのmy-ip-address。他の問題がある場合は、私にコメントでお知らせください。

+0

@SaSarvesh Kumar Singh喜んでこれを見つけました。私は同じ問題を解決しようとしています。ここで私はあなたのコードを試しました。 'actorScelection'https://stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-worを使用しているときに、この問題を確認できますか? – Mahesh

関連する問題