2017-10-06 10 views
1

Akka websocketsを使用してデータをクライアントにプッシュします。アクターを使用してAkkaウェブソケットにデータを送信する

これは私がこれまでにやっていることです:クライアント側では

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.TimeUnit; 

import akka.NotUsed; 
import akka.actor.ActorSystem; 
import akka.http.javadsl.ConnectHttp; 
import akka.http.javadsl.Http; 
import akka.http.javadsl.ServerBinding; 
import akka.http.javadsl.model.HttpRequest; 
import akka.http.javadsl.model.HttpResponse; 
import akka.http.javadsl.model.ws.Message; 
import akka.http.javadsl.model.ws.WebSocket; 
import akka.japi.Function; 
import akka.stream.ActorMaterializer; 
import akka.stream.Materializer; 
import akka.stream.javadsl.Flow; 
import akka.stream.javadsl.Sink; 
import akka.stream.javadsl.Source; 

public class Server { 

    public static HttpResponse handleRequest(HttpRequest request) { 
    System.out.println("Handling request to " + request.getUri()); 
    if (request.getUri().path().equals("/greeter")) { 
     final Flow<Message, Message, NotUsed> greeterFlow = greeterHello(); 
     return WebSocket.handleWebSocketRequestWith(request, greeterFlow); 
    } else { 
     return HttpResponse.create().withStatus(404); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    ActorSystem system = ActorSystem.create(); 

    try { 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request); 
     CompletionStage<ServerBinding> serverBindingFuture = Http.get(system).bindAndHandleSync(handler, 
      ConnectHttp.toHost("localhost", 8080), materializer); 

     // will throw if binding fails 
     serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); 
     System.out.println("Press ENTER to stop."); 
     new BufferedReader(new InputStreamReader(System.in)).readLine(); 
    } finally { 
     system.terminate(); 
    } 
    } 

    public static Flow<Message, Message, NotUsed> greeterHello() { 
    return Flow.fromSinkAndSource(Sink.ignore(), 
     Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!"))); 
    } 
} 

を、私は正常に受け付けております「こんにちは!」メッセージ。

import akka.actor.ActorRef; 
import akka.actor.UntypedActor; 

public class PushActor extends UntypedActor { 
    @Override 
    public void onReceive(Object message) { 
    if (message instanceof String) { 
     String statusChangeMessage = (String) message; 
     // How to push this message to a socket ?? 
    } else { 
     System.out.println(String.format("'%s':\nReceived unknown message '%s'!", selfActorPath, message)); 
    } 
    } 

} 

私はこのオンラインに関するすべての例を見つけることができません: はしかし、今、私はこのような何か、(好ましくは俳優から)動的にデータを送信したいです。

次は、使用中のソフトウェアスタックです:

答えて

0

1 - 必然的に非常にエレガントではない - これを行う方法はSource.actorRefを使用してマテリアライズドを送信することです俳優のどこか(多分ルータの俳優ですか?)あなたの要件に応じて。

public static Flow<Message, Message, NotUsed> greeterHello() { 
    return Flow.fromSinkAndSourceMat(Sink.ignore(), 
     Source.actorRef(100, OverflowStrategy.fail()), 
     Keep.right()).mapMaterializedValue(/* send your actorRef to a router? */); 
} 

接続されたクライアントのactorRefsを受け取った人は誰でもメッセージをルーティングする必要があります。

関連する問題