2016-05-20 8 views
1

定期的にポーリング(JSON)メッセージをポーリングするように設定し、メッセージを処理してデータベースに保存することにより、AWS SQSキューと連携するSpring IntegrationFlowを実装しようとしました。複数のJSONオブジェクトパターンのSpring IntegrationFlow

キューから単一のJSONメッセージパターンを正常にポーリングして、私のカスタムオブジェクトに変換できました。今私は2種類のJSONパターンを同じSQSキューに送ります。例えば、異なるトランスとserviceActivator(ハンドルに、その後の経路メッセージを

"Type" : "Notification", "MessageId" : "xxxx-xxxx-xxxx", "TopicArn" : "arn:aws:sns:us-west-2:xxxxx:topicName00", "Subject" : "OK: \"test00\" in US-West-2", "Message" : "{\"AlarmName\":\"test00\..."

"Type" : "Notification", "MessageId" : "xxxxx-xxxx-xxxxx", "TopicArn" : "arn:aws:sns:us-west-2:xxxxxx:topicName01", "Message" : "{\"version\":\"0\",\"id\":\"xxxxx\",\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.ec2\..."

これらのメッセージは、同じキューに送信され、私は同じポーラーを使用してキューをポーリングしたいです)のメッセージ本文に基づいています。

@Bean 
public IntegrationFlow sqsIntegrationFlow() 
{ 
    return IntegrationFlows.from(this.sqsMessageSource(), c -> c.poller(myPoller())) 
      .channel(new DirectChannel()) 
      .<Payload,Boolean>route(input -> input.value().contains("EC2 Instance State-change Notification"), 
        mapping -> mapping 
          .subFlowMapping("true", sf -> sf.channel(new DirectChannel()) 
            .transform(
              SqsMessageToInstanceConverter::convertSqsMessagesToInstanceInfo) 
            .channel(new DirectChannel()).handle((message) -> { 
             ec2InstanceService.updateInstanceInfo((List<SqsMessageResult>) message.getPayload()); 
            })) 
          .subFlowMapping("false", sf -> sf.channel(new DirectChannel()) 
            .transform(SqsMessageToInstanceConverter::convertSqsMessageToAlarmInfo) 
            .channel(new DirectChannel()).handle((alarm -> { 
             cwAlarmService.updateAlarmInfo(
               (List<SqsAlarmMessageResult>) alarm.getPayload()); 
            })))) 
      .get(); 
} 

私は私の質問があり、上記のように、ルータを使用してみましたとメッセージボディ(「EC2インスタンスの状態変更通知」)で文字列を使用してメッセージを特定したが

java.lang.ClassNotFoundException: org.springframework.integration.support.management.MappingMessageRouterManagement

とエラーが表示されます。

1.これはルータを使用する正しい方法ですか?

2.統合フローを使用して2つの異なるJSONメッセージを処理する目標をどのように達成できますか?

+1

は、私たちの公式の春の統合AWSの拡張に切り替えることを検討してください。https://github.com/spring-projects/スプリング統合 - aws。 'SqsMessageSource'ではなく、Message Driven Listenerがあります。私はSI-AWSソリューションを購入した場所に興味があります。ありがとうございました! –

答えて

1

はい、正しいです(1時間ほど前に同様のフローを書きました)。ある種のクラスパス問題があるように見えます。そのインターフェースはルータと同じ瓶にあります。アプリをどこで実行していますか?

-verbose JVMの引数で実行してみてください、私はやったし、これを得た...

... 
[Loaded org.springframework.integration.router.AbstractMessageRouter from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar] 
[Loaded org.springframework.integration.support.management.MappingMessageRouterManagement from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar] 
... 
+0

ありがとう、ゲーリー。私は依存関係を含めることができなかったようです。私のpomファイルに最新のスプリング統合コアアーチファクトを加えた後、java.lang.ClassNotFoundExceptionは消えます。 – user20160519

関連する問題