2017-08-02 5 views
0

これはspring integration.iを使用してmqtt接続を定義した方法です。これが可能かどうかわからないmqttサブスクライバは10メッセージの読み込み。現在、購読者はメッセージを公開した後に動作します。私たちはスプリングインテグレーションを使用してモスキートのメッセージ負荷10グループをバッチアップできますか?

@Autowired 
    ConnectorConfig config; 


    @Bean 
    public MqttPahoClientFactory mqttClientFactory() { 
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); 
     factory.setServerURIs(config.getUrl()); 
     factory.setUserName(config.getUser()); 
     factory.setPassword(config.getPass()); 
     return factory; 
    } 

    @Bean 
    public MessageProducer inbound() { 
     MqttPahoMessageDrivenChannelAdapter adapter = 
       new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING"); 

     adapter.setCompletionTimeout(5000); 
     adapter.setConverter(new DefaultPahoMessageConverter()); 
     adapter.setQos(1); 
     adapter.setOutputChannel(mqttRouterChannel()); 
     return adapter; 
    } 

    /**this is router**/ 
    @MessageEndpoint 
    public class MessageRouter { 

    private final Logger logger = LoggerFactory.getLogger(MessageRouter.class); 


    static final String ALERT = "ALERT"; 
    static final String READING = "READING"; 

    @Router(inputChannel = "mqttRouterChannel") 
    public String route(@Header("mqtt_topic") String topic){ 
     String route = null; 
     switch (topic){ 
      case ALERT: 
       logger.info("alert message received"); 
       route = "alertTransformerChannel"; 
       break; 
      case READING: 
       logger.info("reading message received"); 
       route = "readingTransformerChannel"; 
       break; 
     } 
     return route; 
    } 
} 
+1

あなたがここで何を求めているのかははっきりしません。最初の10件のメッセージを無視しますか?または一度に10のメッセージのグループを一括してアップすることができますか? – hardillb

+0

実際、不明です。質問をそれぞれ閉じます。 –

+0

私は一度に10メッセージのグループを一括してバッチする必要があります – Priyamal

答えて

1

私はMqttPahoMessageDrivenChannelAdapter責任ではない時点での10のメッセージのバッチまでのグループ

する必要があります。私たちは、このセマンティックとMqttCallbackが使用

* @param topic name of the topic on the message was published to 
* @param message the actual message. 
* @throws Exception if a terminal error has occurred, and the client should be 
* shut down. 
*/ 
public void messageArrived(String topic, MqttMessage message) throws Exception; 

そこで、我々はPAHOクライアントの性質によって、このチャネルアダプタにあっバッチそれらをすることはできません。

Spring Integrationの観点からお勧めするものは、Aggregator EIPの実装です。

ルータに送信する前に、AggregatorFactoryBean@Beanの前に@ServiceActivatorを追加してから、mqttRouterChannelを追加してください。

多分ような単純な:

@Bean 
@ServiceActivator(inputChannel = "mqttAggregatorChannel") 
AggregatorFactoryBean mqttAggregator() { 
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean(); 
    aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor()); 
    aggregator.setCorrelationStrategy(m -> 1); 
    aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10)); 
    aggregator.setExpireGroupsUponCompletion(true); 
    aggregator.setSendPartialResultOnExpiry(true); 
    aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000)); 
    aggregator.setOutputChannelName("mqttRouterChannel"); 
    return aggregator; 
} 

Reference Manualでより多くの情報を参照してください。

+0

アグリゲータは私のケースの解決策のように聞こえます。アグリゲータの使い方を学んでください。再度、感謝します。 – Priyamal

関連する問題