2016-04-03 9 views
2

ポート6001から受け取るByteStringの各ストリームをエコーするサーバーを生成するコードがあります(下の図を参照)。この例では、サーバーに接続し、文字 'a'から 'z'までの文字のリストを含むByteStringのストリーム。HTTP経由でオブジェクトのストリームを取得する

この時点で私の質問はです。akkaは、httpでByStreamsの代わりにオブジェクトのストリームを送受信する方法を提供していますか?たとえば、Clientクラスのオブジェクト。

もしそうなら、そのようなストリームをどのように送受信できますか?あなたはそれを実行する方法を示すスニペットを私に提供できますか?

アッカのドキュメントはTcpEcho {

/** 
* Use without parameters to start both client and server. 
* 
* Use parameters `server 0.0.0.0 6001` to start server listening on port 
* 6001. 
* 
* Use parameters `client 127.0.0.1 6001` to start client connecting to 
* server on 127.0.0.1:6001. 
* 
*/ 
public static void main(String[] args) throws IOException { 
    if (args.length == 0) { 
     ActorSystem system = ActorSystem.create("ClientAndServer"); 
     InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     server(system, serverAddress); 
     client(system, serverAddress); 
    } else { 
     InetSocketAddress serverAddress; 
     if (args.length == 3) { 
      serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2])); 
     } else { 
      serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     } 
     if (args[0].equals("server")) { 
      ActorSystem system = ActorSystem.create("Server"); 
      server(system, serverAddress); 
     } else if (args[0].equals("client")) { 
      ActorSystem system = ActorSystem.create("Client"); 
      client(system, serverAddress); 
     } 
    } 
} 

public static void server(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> { 
     System.out.println("Client connected from: " + conn.remoteAddress()); 
     conn.handleWith(Flow.<ByteString> create(), materializer); 
    }); 

    final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system) 
      .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer); 

    bindingFuture.whenComplete((binding, throwable) -> { 
     System.out.println("Server started, listening on: " + binding.localAddress()); 
    }); 

    bindingFuture.exceptionally(e -> { 
     System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage()); 
     system.terminate(); 
     return null; 
    }); 

} 

public static void client(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final List<ByteString> testInput = new ArrayList<>(); 
    for (char c = 'a'; c <= 'z'; c++) { 
     testInput.add(ByteString.fromString(String.valueOf(c))); 
    } 

    Source<ByteString, NotUsed> responseStream = Source.from(testInput) 
      .via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort())); 

    CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in), 
      materializer); 

    result.whenComplete((success, failure) -> { 

     if (failure != null) { 
      System.err.println("Failure: " + failure.getMessage()); 
     } else { 
      System.out.println("Result: " + success.utf8String()); 
     } 
     System.out.println("Shutting down client"); 
     system.terminate(); 

    }); 
} 

}

+0

Bidiフローの作成方法について[この例](http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-graphs.html#Bidirectional_Flows)を見たことがありますか?多かれ少なかれ、あなたが求めているのは何ですか? – lpiepiora

+0

私は見ていませんでしたが、私はします。いずれにしても、コードスニペットの形でより良いアイデアや提案がありますか?ありがとう – broga

答えて

2

akka.stream.{javadsl,scaladsl}.Framingは、ユーティリティが含まれています...非おもちゃの例については、あなたの助け

パブリッククラスの

おかげでユーザーフレンドリーではありません一貫性のあるメッセージを作成するのに役立ちます。たとえば、メッセージをFraming.simpleFramingProtocolEncoder(maxLength)に送信して長さ情報を自動的に追加することができます。もう一方の端では、Framing.simpleFramingProtocolDecoder(maxLength)は、その囲まれた長さ情報に従ってメッセージのデコードを行う。

プレーンオブジェクトを操作する場合は、エンコーダを介して送信する前にByteStringにシリアル化し、デコーダからその表現を受け取った後にByteStringから逆シリアル化する必要があります。

関連する問題