2016-03-26 11 views
4

私は、Akka Streams(下記参照)とメッセージを交換するための単純なTCPベースのプロトコルを実装する際にスタブを取った。しかし、着信のメッセージはすぐに処理されないようです。Akka Streamsを使用して簡単なTCPプロトコルを実装する方法は?

At t=1, on [client] A is entered 
At t=2, on [client] B is entered 
At t=3, on [server] Z is entered 
At t=4, on [server] A is printed 
At t=5, on [server] Y is entered 
At t=6, on [server] B is printed 

私が期待したもの/見たい:

ことは、二つのメッセージは、クライアントから次々に送られているシナリオでは、最初のメッセージは 後に何かがサーバーから送信されるだけ印刷されています
At t=1, on [client] A is entered 
At t=2, on [server] A is printed 
At t=3, on [client] B is entered 
At t=4, on [server] B is printed 
At t=5, on [server] Z is entered 
At t=6, on [client] Z is printed 
At t=7, on [server] Y is entered 
At t=8, on [client] Y is printed 

私には何が欠けていますか?おそらく私は、どういうわけか、両方の端でシンクを熱望する必要がありますか?または、各シンクが対応するソースによって何らかの形でブロックされている(ソースがコマンドラインからの入力を待っている間)?

import java.nio.charset.StandardCharsets.UTF_8 

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp} 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

import scala.io.StdIn 

object AkkaStreamTcpChatter extends App { 
    implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference()) 
    implicit val materializer = ActorMaterializer() 

    type Message = String 
    val (host, port) = ("localhost", 46235) 

    val deserialize:ByteString => Message = _.utf8String 
    val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8) 

    val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize 
    val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize 

    val protocol = BidiFlow.fromFlows(incoming, outgoing) 

    def prompt(s:String):Source[Message, _] = Source fromIterator { 
    () => Iterator.continually(StdIn readLine s"[$s]> ") 
    } 

    val print:Sink[Message, _] = Sink foreach println 

    args.headOption foreach { 
    case "server" => server() 
    case "client" => client() 
    } 

    def server():Unit = 
    Tcp() 
     .bind(host, port) 
     .runForeach { _ 
     .flow 
     .join(protocol) 
     .runWith(prompt("S"), print) 
     } 

    def client():Unit = 
    Tcp() 
     .outgoingConnection(host, port) 
     .join(protocol) 
     .runWith(prompt("C"), print) 
} 

答えて

5

私はこの問題は、アッカストリームがoperator fusionをしていることだと思います。これは、完全なフロー処理が単一のアクター上で実行されることを意味します。メッセージを読むためにブロックすると、何も印刷できません。

解決策は、ソースの後に非同期境界を追加することです。以下の例を参照してください。あなたは非同期境界を追加すると

def server(): Unit = 
    Tcp() 
    .bind(host, port) 
    .runForeach { 
     _ 
     .flow 
     .join(protocol) 
     .runWith(prompt("S").async, print) // note .async here 
    } 

def client(): Unit = 
    Tcp() 
    .outgoingConnection(host, port) 
    .join(protocol).async 
    .runWith(prompt("C").async, print) // note .async here 

、その後、融合は、このように何を示すからprintがブロックされていない、境界を越えて起こり、そして他の俳優にprompt実行されません。

+1

良いキャッチ!どうやら、それを指摘してくれてありがとう、私はドキュメントの関連セクションを逃した。 – Andrey

関連する問題