Akka 2.4.3を使用してTCPストリームを別のシンクにリダイレクトまたは転送しようとしています。 プログラムはサーバーソケットを開き、着信接続をリッスンしてからtcpストリームを消費する必要があります。私たちの送信者は私たちからの返信を期待/受け入れないので、何も返信しません。ただストリームを消費します。 tcpストリームをフレーミングした後、バイトをより有用なものに変換してシンクに送る必要があります。TCPストリームを消費して別のシンクにリダイレクトします(Akkaストリームの場合)
私は以下のことを試みましたが、特に、送信者にtcpパケットを送り返さず、シンクに正しく接続する方法については苦労しました。
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow
object TcpConsumeOnlyStreamToSink {
implicit val system = ActorSystem("stream-system")
private val log = Logging(system, getClass.getName)
//The Sink
//In reality this is of course a real Sink doing some useful things :-)
//The Sink accept types of "SomethingMySinkUnderstand"
val mySink = Sink.ignore;
def main(args: Array[String]): Unit = {
//our sender is not interested in getting replies from us
//so we just want to consume the tcp stream and never send back anything to the sender
val (address, port) = ("127.0.0.1", 6000)
server(system, address, port)
}
def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)
conn handleWith Flow[ByteString]
//this is neccessary since we use a self developed tcp wire protocol
.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
//here we want to map the raw bytes into something our Sink understands
.map(msg => new SomethingMySinkUnderstand(msg.utf8String))
//here we like to connect our Sink to the Tcp Source
.to(mySink) //<------ NOT COMPILING
}
val tcpSource = Tcp().bind(address, port)
val binding = tcpSource.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
class SomethingMySinkUnderstand(x:String) {
}
}
更新:必要DEPS
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
は、他の人がこれを再作成できるように、build.sbtファイルまたは少なくとも依存関係を表示するために役に立つかもしれません。 – Brian
は上記質問を上記の質問に追加しました – salyh