2017-03-23 15 views
1

個別のプロパティ変更を個別のメッセージとしてエンティティに公開するサービス(S)からメッセージを受け取りました。不自然な例では、このような実体のようになります。RabbitMQで、好ましくはSpring AMQPを使用して、受信したメッセージをグループ化しますか?

Person { 
    id: 123 
    name: "Something", 
    address: {...} 
} 

名前と住所が同じトランザクションで更新された場合(S)2つのメッセージ、PersonNameCorrectedPersonMovedを公開します。問題は、このPersonエンティティの投影を保存している受信側であり、各プロパティの変更によってデータベースに書き込まれます。この例では、データベースへの2回の書き込みがありますが、短時間メッセージをバッチしてIDでグループ化することができれば、データベースへの書き込みは1回だけで済むでしょう。

RabbitMQではどのように処理するのですか? Spring AMQPはより簡単な抽象化を提供しますか?

私はprefetchで簡単に見てきましたが、これが行く方法がわからないことに注意してください。私はそれを正しく理解している場合も、プリフェッチは、接続ごとに基づいています。私はキュー当たりベースでこれを達成しようとしています。なぜなら、バッチ処理(および追加された待ち時間)が行く方法なので、私はこの待ち時間を私のサービスで消費されたすべての待ち行列に追加したくないからです「グループ別ID」機能が必要なもの)。

答えて

2

プリフェッチはこのような場合には役に立ちません。

Spring Integrationを使用することを検討してください。このアダプタには、Spring AMQPの上にあるアダプタがあります。パイプラインの次の段階にメッセージを送信する前にメッセージをグループ化するために使用できるアグリゲータも用意されています。

EDIT

ここdemostrateするクイックブートアプリだ...

@SpringBootApplication 
public class So42969130Application implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(So42969130Application.class, args) 
      .close(); 
    } 

    @Autowired 
    private RabbitTemplate template; 

    @Autowired 
    private Handler handler; 

    @Override 
    public void run(String... args) throws Exception { 
     this.template.convertAndSend("so9130", new PersonNameChanged(123)); 
     this.template.convertAndSend("so9130", new PersonMoved(123)); 
     this.handler.latch.await(10, TimeUnit.SECONDS); 
    } 

    @Bean 
    public IntegrationFlow flow(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130") 
         .messageConverter(converter())) 
       .aggregate(a -> a 
         .correlationExpression("payload.id") 
         .releaseExpression("false") // open-ended release, timeout only 
         .sendPartialResultOnExpiry(true) 
         .groupTimeout(2000)) 
       .handle(handler()) 
       .get(); 
    } 

    @Bean 
    public Jackson2JsonMessageConverter converter() { 
     return new Jackson2JsonMessageConverter(); 
    } 

    @Bean 
    public Handler handler() { 
     return new Handler(); 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("so9130", false, false, true); 
    } 

    public static class Handler { 

     private final CountDownLatch latch = new CountDownLatch(1); 

     @ServiceActivator 
     public void handle(Collection<?> aggregatedData) { 
      System.out.println(aggregatedData); 
      this.latch.countDown(); 
     } 

    } 

    public static class PersonNameChanged { 

     private int id; 

     PersonNameChanged() { 
     } 

     PersonNameChanged(int id) { 
      this.id = id; 
     } 

     public int getId() { 
      return this.id; 
     } 

     public void setId(int id) { 
      this.id = id; 
     } 

     @Override 
     public String toString() { 
      return "PersonNameChanged [id=" + this.id + "]"; 
     } 

    } 

    public static class PersonMoved { 

     private int id; 

     PersonMoved() { 
     } 

     PersonMoved(int id) { 
      this.id = id; 
     } 

     public int getId() { 
      return this.id; 
     } 

     public void setId(int id) { 
      this.id = id; 
     } 

     @Override 
     public String toString() { 
      return "PersonMoved [id=" + this.id + "]"; 
     } 

    } 

} 

ポンポン:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.example</groupId> 
    <artifactId>so42969130</artifactId> 
    <version>2.0.0-BUILD-SNAPSHOT</version> 
    <packaging>jar</packaging> 

    <name>so42969130</name> 
    <description>Demo project for Spring Boot</description> 

    <parent> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-parent</artifactId> 
     <version>1.5.2.RELEASE</version> 
     <relativePath/> <!-- lookup parent from repository --> 
    </parent> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
     <java.version>1.8</java.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-integration</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-amqp</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.springframework.boot</groupId> 
       <artifactId>spring-boot-maven-plugin</artifactId> 
      </plugin> 
     </plugins> 
    </build> 


</project> 

結果:pの

2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123] 
[PersonNameChanged [id=123], PersonMoved [id=123]] 
+0

感謝スプリングの統合にあまりにも頼もしいことがなければ、私のユースケースのキックスターターとして使用できる例を知っていますか? – Johan

+1

私の回答は、あなたが始めるのに十分なはずの例で更新しました。 –

+0

私はそれについて簡潔にすべきだと知っていますが、これは素晴らしいと言わなければなりません、私はより良い答えを求めることができませんでした。 – Johan

関連する問題