2017-01-09 11 views
0

JMSトランスフォーマーでActiveMQのAMQPを利用して同じようにベアボーンアプリケーションを稼働させようとしています。私のクライアントライブラリはSpring Integrationですが、この設定では基本的なサンプルを入手して実行することはできません。 AMQP上のActiveMQのJMSトランスにActiveMQ AMQP with JMSトランスフォーマーのバネ統合

詳細:http://activemq.apache.org/amqp.html

メインテストアプリ

@IntegrationComponentScan 
@SpringBootApplication 
public class SpringCloudStreamJmsActivemqSenderExampleApplication implements CommandLineRunner { 

    @Bean 
    public ConnectionFactory connectionFactory() { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
     connectionFactory.setBrokerURL("tcp://localhost:61616"); 
     connectionFactory.setUserName("admin"); 
     connectionFactory.setPassword("admin"); 
     return connectionFactory; 
    } 

    @Bean 
    public ConnectionFactory connectionFactoryAMQP() { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
     connectionFactory.setBrokerURL("tcp://localhost:5672"); 
     connectionFactory.setUserName("admin"); 
     connectionFactory.setPassword("admin"); 
     return connectionFactory; 
    } 

    public static void main(String[] args) { 
     SpringApplication.run(SpringCloudStreamJmsActivemqSenderExampleApplication.class, args); 
    } 

    @Autowired 
    JmsGateway gateway; 

    @Override 
    public void run(String... strings) throws Exception { 
     gateway.sendMessage("Hi"); 
    } 

    @Bean(name = PollerMetadata.DEFAULT_POLLER) 
    public PollerMetadata poller() { 
     return Pollers.fixedDelay(1, TimeUnit.SECONDS).get(); 
    } 

    @Bean(name = "outboundChannel") 
    MessageChannel myOutBoundChannel() { 
     return new QueueChannel(); 
    } 

    @Bean(name = "inboundChannel") 
    MessageChannel myInboundChannel() { 
     return new QueueChannel(); 
    } 

    @Bean(name = "errorChannel") 
    MessageChannel myErrorChannel() { 
     return new DirectChannel(); 
    } 

    @Bean 
    IntegrationFlow jmsInboundFlow() { 
     return IntegrationFlows.from(Jms 
       .inboundGateway(connectionFactoryAMQP()) 
       .destination("myCoolQueue") 
       .errorChannel(myErrorChannel())) 
        .handle(this::print) 
       .get(); 
    } 

    @Bean 
    IntegrationFlow jmsOutboundFlow() { 
     return IntegrationFlows.from(myOutBoundChannel()) 
       .handle(Jms.outboundAdapter(connectionFactory()) 
       .destination("myCoolQueue")) 
       .get(); 
    } 

    @Bean 
    IntegrationFlow customErrorFlow() { 
     return IntegrationFlows.from(myErrorChannel()) 
        .handle(this::printStackTrace) 
       .get(); 
    } 

    private void print(Message message) { 
     System.out.println("Message payload: " + message.getPayload()); 
     //throw new RuntimeException("broke it"); 
    } 

    private void printStackTrace(Message errorMessage) { 
     ((ErrorMessage)errorMessage).getPayload().printStackTrace(); 
    } 
} 

メッセージングゲートウェイ

@MessagingGateway 
interface JmsGateway { 
    @Gateway(requestChannel = "outboundChannel") 
    void sendMessage(String message); 
} 

ActiveMQ.xml

<transportConnectors> 
    <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    <transportConnector name="amqp" uri="amqp://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/> 
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    <transportConnector name="ws" uri="ws://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
</transportConnectors> 

ログ出力

2017-01-09 08:42:26.158 INFO 24332 --- [ restartedMain] treamJmsActivemqSenderExampleApplication : Started SpringCloudStreamJmsActivemqSenderExampleApplication in 2.676 seconds (JVM running for 3.041) 
2017-01-09 08:42:31.143 WARN 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'myCoolQueue' - trying to recover. Cause: Disposed due to prior exception 
2017-01-09 08:42:31.150 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672 
2017-01-09 08:42:36.155 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672 
2017-01-09 08:42:41.163 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672 
+0

額で働かないと思います。 JMS APIを使用してAMQP 1.0ブローカに接続する方法を理解するには、Apache Qpidプロジェクトを参照してください。 –

+0

ここをクリックして工場を作成します。http://stackoverflow.com/questions/39528325/unable-to-access-activemq-using-jms-based-code-and-amqp-1-0/39529196#39529196 –

+0

I私はそれに精通していないが、私はあなたが変圧器が何を誤解していると思う。 amqpポートのプロトコルは変更されません。 AMQP経由で送信されたメッセージをマップするようです(例:QPID Protonから)をJMSメッセージに変換して、JMSコンシューマ(OpenWire経由)で消費できるようにします。逆も同様です。 –

答えて

2

JNDI

@Bean 
public ConnectionFactory connectionFactoryAMQP() { 
    String factoryName = "myFactoryLookup"; 
    Properties props = new Properties(); 
    props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); 
    props.setProperty("connectionfactory." + factoryName, "amqp://localhost:5672"); 
    props.put("property.connectionfactory." + factoryName + ".username", "admin"); 
    props.put("property.connectionfactory." + factoryName + ".password", "admin"); 
    InitialContext ic = new InitialContext(props); 
    ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup(factoryName); 
    return connectionFactory; 
} 

OR

FACTORY

@Bean 
public ConnectionFactory connectionFactoryAMQP() { 
     org.apache.qpid.jms.JmsConnectionFactory connectionFactory = new JmsConnectionFactory(); 
     connectionFactory.setRemoteURI("amqp://localhost:5672"); 
     connectionFactory.setUsername("admin"); 
     connectionFactory.setPassword("admin"); 
    return connectionFactory; 
} 

のみ/ AMQPメッセージからJMSメッセージを変換するactivemq.xmlに

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/> 

transport.transformer =のJMSをポートを追加し、この依存関係

<dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.9.0</version> </dependency>を追加します。ブローカーがAMQP輸送& ActiveMQの間でブローカーが受信するとき■AMQP転送によるAMQPメッセージはAMQPメッセージからJMSメッセージに変換され、AMQP転送を介してコンシューマにメッセージがディスパッチされると、JMSからAMQPメッセージに変換されます。

+0

工場出荷時の例とJMSトランスポートの説明をありがとうございます。私は今、自分のコードをスムーズに動かすことができます。 –

1

は、ActiveMQのクライアントのみOpenWireはそう動作しませんAMQPポートに接続しようとしてActiveMQのネイティブプロトコルを話す、接続の試みは失敗します。 AMQPを介してメッセージを送受信するには、AMQPクライアントを使用してブローカ上のAMQPポートに接続する必要があります。 Apache Qpidプロジェクトには、多数のAMQP v1.0クライアントが用意されています。 JMSタイプのクライアントAPIに固執する場合は、Qpid JMSクライアントが必要です。あなたは2つの方法であなたのBean定義を変更する必要があり

+0

Timは、AMQP設定のJMSトランスフォーマでJMSクライアントを接続できないが、JMSメッセージ形式をサポートできることを意味していますか? –