2016-08-31 8 views
3

Akka 2.4.3を使用してTCPストリームを別のシンクにリダイレクトまたは転送しようとしています。 プログラムはサーバーソケットを開き、着信接続をリッスンしてからtcpストリームを消費する必要があります。私たちの送信者は私たちからの返信を期待/受け入れないので、何も返信しません。ただストリームを消費します。 tcpストリームをフレーミングした後、バイトをより有用なものに変換してシンクに送る必要があります。TCPストリームを消費して別のシンクにリダイレクトします(Akkaストリームの場合)

私は以下のことを試みましたが、特に、送信者にtcpパケットを送り返さず、シンクに正しく接続する方法については苦労しました。

import scala.util.Failure 
import scala.util.Success 

import akka.actor.ActorSystem 
import akka.event.Logging 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Tcp 
import akka.stream.scaladsl.Framing 
import akka.util.ByteString 
import java.nio.ByteOrder 
import akka.stream.scaladsl.Flow 

object TcpConsumeOnlyStreamToSink { 
    implicit val system = ActorSystem("stream-system") 
    private val log = Logging(system, getClass.getName)  

    //The Sink 
    //In reality this is of course a real Sink doing some useful things :-) 
    //The Sink accept types of "SomethingMySinkUnderstand" 
    val mySink = Sink.ignore; 

    def main(args: Array[String]): Unit = { 
    //our sender is not interested in getting replies from us 
    //so we just want to consume the tcp stream and never send back anything to the sender 
    val (address, port) = ("127.0.0.1", 6000) 
    server(system, address, port) 
    } 

    def server(system: ActorSystem, address: String, port: Int): Unit = { 
    implicit val sys = system 
    import system.dispatcher 
    implicit val materializer = ActorMaterializer() 
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn => 
     println("Client connected from: " + conn.remoteAddress) 

     conn handleWith Flow[ByteString] 
     //this is neccessary since we use a self developed tcp wire protocol 
     .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
     //here we want to map the raw bytes into something our Sink understands 
     .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) 
     //here we like to connect our Sink to the Tcp Source 
     .to(mySink) //<------ NOT COMPILING 
    } 


    val tcpSource = Tcp().bind(address, port) 
    val binding = tcpSource.to(handler).run() 

    binding.onComplete { 
     case Success(b) => 
     println("Server started, listening on: " + b.localAddress) 
     case Failure(e) => 
     println(s"Server could not bind to $address:$port: ${e.getMessage}") 
     system.terminate() 
    } 

    } 

    class SomethingMySinkUnderstand(x:String) { 

    } 
} 

更新:必要DEPS

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
+0

は、他の人がこれを再作成できるように、build.sbtファイルまたは少なくとも依存関係を表示するために役に立つかもしれません。 – Brian

+0

は上記質問を上記の質問に追加しました – salyh

答えて

4

handleWithを得るためにあなたのbuild.sbtファイルにこれを追加、すなわち未接続の入口と接続されていないコンセント付きボックス、Flowを期待しています。 to操作を使用してFlowSinkに接続したため、効果的にSourceを提供します。

は、私はあなたが次の操作を行うことができると思う:

conn.handleWith(
    Flow[ByteString] 
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) 
    .alsoTo(mySink) 
    .map(_ => ByteString.empty) 
    .filter(_ => false) // Prevents sending anything back 
) 
+0

これは私には役に立たない。それはOPのために働いたのですか? – tapasvi

+0

はい、私のために働いた – salyh