私は、Akka Streams(下記参照)とメッセージを交換するための単純なTCPベースのプロトコルを実装する際にスタブを取った。しかし、着信のメッセージはすぐに処理されないようです。Akka Streamsを使用して簡単なTCPプロトコルを実装する方法は?
At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed
私が期待したもの/見たい:
ことは、二つのメッセージは、クライアントから次々に送られているシナリオでは、最初のメッセージは 後に何かがサーバーから送信されるだけ印刷されていますAt t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed
私には何が欠けていますか?おそらく私は、どういうわけか、両方の端でシンクを熱望する必要がありますか?または、各シンクが対応するソースによって何らかの形でブロックされている(ソースがコマンドラインからの入力を待っている間)?
import java.nio.charset.StandardCharsets.UTF_8
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
object AkkaStreamTcpChatter extends App {
implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
implicit val materializer = ActorMaterializer()
type Message = String
val (host, port) = ("localhost", 46235)
val deserialize:ByteString => Message = _.utf8String
val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)
val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize
val protocol = BidiFlow.fromFlows(incoming, outgoing)
def prompt(s:String):Source[Message, _] = Source fromIterator {
() => Iterator.continually(StdIn readLine s"[$s]> ")
}
val print:Sink[Message, _] = Sink foreach println
args.headOption foreach {
case "server" => server()
case "client" => client()
}
def server():Unit =
Tcp()
.bind(host, port)
.runForeach { _
.flow
.join(protocol)
.runWith(prompt("S"), print)
}
def client():Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol)
.runWith(prompt("C"), print)
}
良いキャッチ!どうやら、それを指摘してくれてありがとう、私はドキュメントの関連セクションを逃した。 – Andrey