2017-02-08 1 views
3

で何もしない:mapMaterializedValue私はSource.actorRefと俳優のbindetにメッセージを送信しようとしていますが、コードのこの部分Source.actorRef

println(s"Before mapping $src") 
src.mapMaterializedValue { ref => 
    println(s"Mapping $ref") 
    ref ! letter.text 
} 
println(s"After mapping $src") 

はこのようなだけで何かを印刷していますマッピングソース(SourceShape(ActorRefSource.out)、ActorRefSource(0、失敗)[5564f412])後

マッピングソース([5564f412] SourceShape(ActorRefSource.out)、ActorRefSource(0、失敗))前

So.なんとかmapMaterializedValue何もしないでください。俳優へのメッセージは送信されません。 ref - 何かの理由でですか?

さらに、私はすべてのコードを掲載します。 WebSocketには、単純なメッセンジャー(1対1のメッセージ)のようなもののプロットです。私は今はAkkaストリームを勉強しているので、このコードは本当に完璧ではありません。私は批評家やアドバイスを聞く準備ができています。

メインサーバーオブジェクト:

package treplol.server 

import treplol.common._ 

import akka.actor.{ActorRef, ActorSystem} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws._ 
import akka.http.scaladsl.server.Directives._ 
import akka.stream.scaladsl._ 
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy} 

import scala.io.StdIn 
import java.util.UUID 

object WsServer extends App { 

    implicit val system = ActorSystem("example") 
    implicit val materializer = ActorMaterializer() 

    def createSource(uuid: UUID): Source[String, ActorRef] = { 
    val src = Source.actorRef[String](0, OverflowStrategy.fail) 
    sources(uuid) = src 
    src 
    } 

    val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] = 
    collection.mutable.HashMap[UUID, Source[String, ActorRef]]() 
    val userSources: collection.mutable.HashMap[String, UUID] = 
    collection.mutable.HashMap[String, UUID]() 

    def flow: Flow[Message, Message, Any] = { 

    val uuid: UUID = UUID.randomUUID() 
    val incomingSource: Source[String, ActorRef] = createSource(uuid) 

    Flow.fromGraph(GraphDSL.create() { implicit b => 
     import GraphDSL.Implicits._ 

     val merge = b.add(Merge[String](2)) 

     val mapMsgToLttr = b.add(
     Flow[Message].collect { case TextMessage.Strict(txt) => txt } 
      .map[Letter] { txt => 
      WsSerializer.decode(txt) match { 
       case Auth(from) => 
       userSources(from) = uuid 
       Letter("0", from, "Authorized!") 
       case ltr: Letter => ltr 
      } 
      } 
    ) 

     val processLttr = b.add(
     Flow[Letter].map[String] { letter => 
      userSources.get(letter.to) flatMap sources.get match { 
      case Some(src) => 
       println(s"Before mapping $src") 
       src.mapMaterializedValue { ref => 
       println(s"Mapping $ref") 
       ref ! letter.text 
       } 
       println(s"After mapping $src") 
       "" 
      case None => "Not authorized!" 
      } 
     } 
    ) 

     val mapStrToMsg = b.add(
     Flow[String].map[TextMessage] (str => TextMessage.Strict(str)) 
    ) 

     mapMsgToLttr ~> processLttr ~> merge 
        incomingSource ~> merge ~> mapStrToMsg 

     FlowShape(mapMsgToLttr.in, mapStrToMsg.out) 
    }) 

    } 

    val route = path("ws")(handleWebSocketMessages(flow)) 
    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 

    import system.dispatcher 
    bindingFuture 
    .flatMap(_.unbind()) 
    .onComplete(_ => system.terminate()) 
} 

共通パッケージ:

name := "treplol" 

version := "0.0" 

scalaVersion := "2.12.1" 

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases" 

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.16", 
    "com.typesafe.akka" %% "akka-stream" % "2.4.16", 
    "com.typesafe.akka" %% "akka-http" % "10.0.3", 
    "org.json4s" %% "json4s-jackson" % "3.5.0" 
) 

build.sbt

package treplol 

package object common { 

    trait WsMessage 
    case class Letter(from: String, to: String, text: String) extends WsMessage 
    case class Auth(from: String) extends WsMessage 

    object WsSerializer { 

    import org.json4s.{Extraction, _} 
    import org.json4s.jackson.JsonMethods.{compact, parse} 
    import org.json4s.jackson.Serialization 

    implicit val formats = { 
     Serialization.formats(NoTypeHints) 
    } 

    case class WsData(typeOf: String, data: String) 
    object WsDataType { 
     val LETTER = "letter" 
     val AUTH = "auth" 
    } 

    class WrongIncomingData extends Throwable 

    def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match { 
     case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter] 
     case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth] 
     case _ => throw new WrongIncomingData 
    } 

    def encode(wsMessage: WsMessage): String = { 
     val typeOf = wsMessage match { 
     case _: Letter => WsDataType.LETTER 
     case _: Auth => WsDataType.AUTH 
     case _ => throw new WrongIncomingData 
     } 
     compact(Extraction.decompose(
     WsData(typeOf, compact(Extraction.decompose(wsMessage))) 
    )) 
    } 
    } 

} 

は、事前にあなたのすべてをありがとう!ドキュメントを1として

+0

'mapMaterializedValue'は新しいSourceを返します。あなたの例は 'val x = 1; println(x); x + 3; println(x) 'です。 'val src2 = src.mapMaterializedValue(...)を試してください。 println(src2) ' – Dylan

答えて

4

mapMaterializedValueコンビネータ

は、彼らがいたとして、すべての 他のプロパティを残して、このソースの唯一のマテリア値を変換します。

マテリアライズド値(この場合はソース)任意のグラフ段階の後にのみ使用可能であるがランあります。コード内でソースを実行することは決してありません。

WebSocketメッセージを処理するために使用するFlow[Message, Message, Any]は実際にはAkka-HTTPインフラストラクチャによって実行されるため、手動で行う必要はありません。ただし、processLttrの本文に作成したSourceは、残りのグラフには添付されていないため、実行されません。

グラフの実行とマテリアライゼーションの詳細については、docsを参照してください。

+0

ありがとうございます。ステファノ! しかし見て... 'flow'メソッドの最初のすべての接続に対して、私は' incomingSource'を作成し、 'sources'ハッシュマップ(' createSource'メソッド内)に入れています。 'flow'の終わりに' incomingSource'がグラフに追加されます。 'processLttr'では、すでにハッシュマップからソースを取得しています。少なくとも私はそれをやっていると思います。 だから。どこが間違っていますか? – Sasha

+1

ハッシュマップから取得するソースは「既にマテリアライズされていません」。各ソースは不変であり、自由に共有することができます。ソースを実行すると、マテリアライズされた値(元の値、またはマップされたマテリアライズされた値)が戻されます。また、何回でもソースを実行することができます。 –

0

Stefanoに感謝します!

しかし、そういう形で私が欲しかったことを達成する方法がないようです。しかし、私は深く掘って、custom stream processing and integration with actorsを使用しました。この手法では、外部からの特定のストリームにメッセージをプッシュできます。 (この機能はまだ実験中です!)

関連する問題