ポート6001から受け取るByteStringの各ストリームをエコーするサーバーを生成するコードがあります(下の図を参照)。この例では、サーバーに接続し、文字 'a'から 'z'までの文字のリストを含むByteStringのストリーム。HTTP経由でオブジェクトのストリームを取得する
この時点で私の質問はです。akkaは、httpでByStreamsの代わりにオブジェクトのストリームを送受信する方法を提供していますか?たとえば、Clientクラスのオブジェクト。
もしそうなら、そのようなストリームをどのように送受信できますか?あなたはそれを実行する方法を示すスニペットを私に提供できますか?
アッカのドキュメントはTcpEcho {
/**
* Use without parameters to start both client and server.
*
* Use parameters `server 0.0.0.0 6001` to start server listening on port
* 6001.
*
* Use parameters `client 127.0.0.1 6001` to start client connecting to
* server on 127.0.0.1:6001.
*
*/
public static void main(String[] args) throws IOException {
if (args.length == 0) {
ActorSystem system = ActorSystem.create("ClientAndServer");
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000);
server(system, serverAddress);
client(system, serverAddress);
} else {
InetSocketAddress serverAddress;
if (args.length == 3) {
serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2]));
} else {
serverAddress = new InetSocketAddress("127.0.0.1", 6000);
}
if (args[0].equals("server")) {
ActorSystem system = ActorSystem.create("Server");
server(system, serverAddress);
} else if (args[0].equals("client")) {
ActorSystem system = ActorSystem.create("Client");
client(system, serverAddress);
}
}
}
public static void server(ActorSystem system, InetSocketAddress serverAddress) {
final ActorMaterializer materializer = ActorMaterializer.create(system);
final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Client connected from: " + conn.remoteAddress());
conn.handleWith(Flow.<ByteString> create(), materializer);
});
final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system)
.bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer);
bindingFuture.whenComplete((binding, throwable) -> {
System.out.println("Server started, listening on: " + binding.localAddress());
});
bindingFuture.exceptionally(e -> {
System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage());
system.terminate();
return null;
});
}
public static void client(ActorSystem system, InetSocketAddress serverAddress) {
final ActorMaterializer materializer = ActorMaterializer.create(system);
final List<ByteString> testInput = new ArrayList<>();
for (char c = 'a'; c <= 'z'; c++) {
testInput.add(ByteString.fromString(String.valueOf(c)));
}
Source<ByteString, NotUsed> responseStream = Source.from(testInput)
.via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()));
CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in),
materializer);
result.whenComplete((success, failure) -> {
if (failure != null) {
System.err.println("Failure: " + failure.getMessage());
} else {
System.out.println("Result: " + success.utf8String());
}
System.out.println("Shutting down client");
system.terminate();
});
}
}
Bidiフローの作成方法について[この例](http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-graphs.html#Bidirectional_Flows)を見たことがありますか?多かれ少なかれ、あなたが求めているのは何ですか? – lpiepiora
私は見ていませんでしたが、私はします。いずれにしても、コードスニペットの形でより良いアイデアや提案がありますか?ありがとう – broga