0

Atmosphereリソースのユーザーセッションごとに大気放送事業者を設定できません。私がドキュメントから集めることができるのは、すべてのユーザーに同じメッセージをブロードキャストする「チャット」アプリケーションを構築する方法です。セッションあたりの大気リソース

Atmosphereフレームワークでユーザーセッションごとにチャネルを確立することはできますか、何かを実行してこれらの接続をメモリマップで処理する必要がありますか?

これは私が欲しいのリソースです:

/websockets/notifications 

私は、ユーザーのAおよびBは、異なるブラウザからこのチャンネルに接続し、独立して、彼らにメッセージをストリーミングする能力を持っていると思います。セッションIDを使用して、応答を送信する相手を大気に理解させることができるはずです。

雰囲気はこれをサポートしていますか?

関連のpom.xml

<spring-boot-starter-web.version>1.3.3.RELEASE</spring-boot-starter-web.version> 
<atmosphere-runtime.version>2.4.4</atmosphere-runtime.version> 
<atmosphere-javascript.version>2.3.0</atmosphere-javascript.version> 

雰囲気の設定

package com.hello; 

import javax.servlet.ServletContext; 
import javax.servlet.ServletException; 
import javax.servlet.ServletRegistration; 

import org.atmosphere.cache.UUIDBroadcasterCache; 
import org.atmosphere.cpr.ApplicationConfig; 
import org.atmosphere.cpr.AtmosphereFramework; 
import org.atmosphere.cpr.AtmosphereServlet; 
import org.atmosphere.cpr.MetaBroadcaster; 
import org.springframework.boot.context.embedded.ServletContextInitializer; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class AtmosphereConfiguration implements ServletContextInitializer { 

    @Bean 
    public AtmosphereServlet atmosphereServlet() { 
     return new AtmosphereServlet(); 
    } 

    @Bean 
    public AtmosphereFramework atmosphereFramework() { 
     return atmosphereServlet().framework(); 
    } 

    @Bean 
    public MetaBroadcaster metaBroadcaster() { 
     AtmosphereFramework framework = atmosphereFramework(); 
     return framework.metaBroadcaster(); 
    } 

    @Override 
    public void onStartup(ServletContext servletContext) throws ServletException { 
     configureAthmosphere(atmosphereServlet(), servletContext); 
    } 

    private void configureAthmosphere(AtmosphereServlet servlet, ServletContext servletContext) { 
     ServletRegistration.Dynamic atmosphereServlet = servletContext.addServlet("atmosphereServlet", servlet); 
     atmosphereServlet.setInitParameter(ApplicationConfig.ANNOTATION_PACKAGE, "com.hello"); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_CACHE, UUIDBroadcasterCache.class.getName()); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_SHARABLE_THREAD_POOLS, "true"); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "10"); 
     atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "10"); 
     servletContext.addListener(new org.atmosphere.cpr.SessionSupport()); 
     atmosphereServlet.addMapping("/websocket/*"); 
     atmosphereServlet.setLoadOnStartup(0); 
     atmosphereServlet.setAsyncSupported(true); 
    } 

} 

雰囲気リソース

package com.hello; 

import java.nio.charset.StandardCharsets; 

import org.atmosphere.config.service.Get; 
import org.atmosphere.config.service.Disconnect; 
import org.atmosphere.config.service.ManagedService; 
import org.atmosphere.config.service.Ready; 
import org.atmosphere.cpr.AtmosphereResource; 
import org.atmosphere.cpr.AtmosphereResourceEvent; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

@ManagedService(path = NotificationAtmosphereResource.PATH) 
public class NotificationAtmosphereResource { 

    public static final String PATH = "/websocket/notifications"; 

    private Logger logger = LoggerFactory.getLogger(NotificationAtmosphereResource.class); 

    @Get   
    public void init(AtmosphereResource resource){ 
     resource.getResponse().setCharacterEncoding(StandardCharsets.UTF_8.name()); 
    } 

    @Ready 
    public void onReady(final AtmosphereResource resource) { 
     logger.info("Connected {}", resource.uuid()); 
    } 

    @Disconnect 
    public void onDisconnect(AtmosphereResourceEvent event) { 
     logger.info("Client {} disconnected [{}]", event.getResource().uuid(), 
       (event.isCancelled() ? "cancelled" : "closed")); 
    } 

} 

私が発信するサービス

package com.hello; 

import org.atmosphere.cpr.MetaBroadcaster; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 

@Service 
public class NotificationEmitterBean implements NotificationEmitter { 

    private Logger logger = LoggerFactory.getLogger(NotificationEmitterBean.class); 

    @Autowired 
    private MetaBroadcaster metaBroadcaster; 

    @Autowired 
    private NotificationService notificationService; 

    @Autowired 
    private JsonMapper jsonMapper; 

    @Override 
    public void emitNotification(String sessionId, String msg) { 

    // This will broadcast to all users on /websocket/notifications 
    // How can I use sessionId to broadcast to the respective browser? 
    metaBroadcaster.broadcastTo(NotificationAtmosphereResource.PATH, 
        jsonMapper.toJson(msg));   
     } 

    } 

} 

答えて

0

私がこの作業を得ることができた唯一の方法は、自分のセッションベースの放送局を作成することでした。 Jeanfrancois Arcandによって書かれたExcludeSessionBroadcasterをベースラインとして使用しました。

package com.hello; 

import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.Future; 

import org.atmosphere.cpr.AtmosphereConfig; 
import org.atmosphere.cpr.AtmosphereResource; 
import org.atmosphere.cpr.Broadcaster; 
import org.atmosphere.cpr.BroadcasterFuture; 
import org.atmosphere.cpr.DefaultBroadcaster; 
import org.atmosphere.cpr.Deliver; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
* An implementation of {@link DefaultBroadcaster} that include one or more {@link AtmosphereResource} 
* 
* Based on ExcludeSessionBroadcaster written by Jeanfrancois Arcand 
* 
* @author Steven Zgaljic 
*/ 
public class IncludeSessionBroadcaster extends DefaultBroadcaster { 

    private static final Logger logger = LoggerFactory.getLogger(IncludeSessionBroadcaster.class); 

    public IncludeSessionBroadcaster(){} 

    public Broadcaster initialize(String id, AtmosphereConfig config) { 
     return super.initialize(id, config); 
    } 

    /** 
    * the AtmosphereResource r will be include for this broadcast 
    * 
    * @param msg 
    * @param r 
    * @return 
    */ 
    @Override 
    public Future<Object> broadcast(Object msg, AtmosphereResource r) { 

     if (destroyed.get()) { 
      throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used"); 
     } 

     Set<AtmosphereResource> sub = new HashSet<AtmosphereResource>(); 
     sub.removeAll(resources); 
     sub.add(r); 
     start(); 
     Object newMsg = filter(msg); 
     if (newMsg == null) { 
      return null; 
     } 

     BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, sub.size()); 
     dispatchMessages(new Deliver(newMsg, sub, f, msg)); 
     return f; 
    } 


    /** 
    * the AtmosphereResources subset will be include for this broadcast 
    * 
    * @param msg 
    * @param subset 
    * @return 
    */ 
    @Override 
    public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset) { 

     if (destroyed.get()) { 
      return futureDone(msg); 
     } 

     subset.retainAll(resources); 
     start(); 
     Object newMsg = filter(msg); 
     if (newMsg == null) { 
      return futureDone(msg); 
     } 

     BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, subset.size()); 
     dispatchMessages(new Deliver(newMsg, subset, f, msg)); 
     return f; 
    } 

    /** 
    * session will be include for this broadcast 
    * 
    * @param msg 
    * @param s 
    * @return 
    */ 
    public Future<Object> broadcast(Object msg, String sessionId) { 

     if (destroyed.get()) { 
      return futureDone(msg); 
     } 

     Set<AtmosphereResource> subset = new HashSet<AtmosphereResource>(); 

     for (AtmosphereResource r : resources) { 
      if (!r.getAtmosphereResourceEvent().isCancelled() && 
        sessionId.equals(r.getRequest().getSession().getId())) { 
       subset.add(r); 
       break; 
      } 
     } 

     start(); 
     Object newMsg = filter(msg); 
     if (newMsg == null) { 
      return futureDone(msg); 
     } 

     BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, subset.size()); 
     dispatchMessages(new Deliver(newMsg, subset, f, msg)); 
     return f; 
    } 
} 

IncludeSessionBroadcaster.javaは、それから私は、大気リソースに、この放送局を割り当て。

NotificationAtmosphereResource.java

package com.hello; 

import java.nio.charset.StandardCharsets; 

import org.atmosphere.config.service.Get; 
import org.atmosphere.config.service.Disconnect; 
import org.atmosphere.config.service.ManagedService; 
import org.atmosphere.config.service.Ready; 
import org.atmosphere.cpr.AtmosphereResource; 
import org.atmosphere.cpr.AtmosphereResourceEvent; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

@ManagedService(path = NotificationAtmosphereResource.PATH, 
    broadcaster=IncludeSessionBroadcaster.class) 
public class NotificationAtmosphereResource { 

    public static final String PATH = "/websocket/notifications"; 

    private Logger logger = LoggerFactory.getLogger(NotificationAtmosphereResource.class); 

    @Get   
    public void init(AtmosphereResource resource){ 
     resource.getResponse().setCharacterEncoding(StandardCharsets.UTF_8.name()); 
    } 

    @Ready 
    public void onReady(final AtmosphereResource resource) { 
     logger.info("Connected {}", resource.uuid()); 
    } 

    @Disconnect 
    public void onDisconnect(AtmosphereResourceEvent event) { 
     logger.info("Client {} disconnected [{}]", event.getResource().uuid(), 
       (event.isCancelled() ? "cancelled" : "closed")); 
    } 

} 

それから私は私が望む唯一のブラウザ(セッションID)にメッセージを送信することができます。

package com.hello; 

import org.atmosphere.cpr.MetaBroadcaster; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 

@Service 
public class NotificationEmitterBean implements NotificationEmitter { 

    private Logger logger = LoggerFactory.getLogger(NotificationEmitterBean.class); 

    @Autowired 
    private BroadcasterFactory factory; 

    @Override 
    public void emitNotification(String sessionId, String msg) { 

      ((IncludeSessionBroadcaster)factory.lookup(NotificationAtmosphereResource.PATH)).broadcast(msg, sessionId);  
     } 

    } 

} 
NotificationEmitterBean.java

関連する問題