1

統合フローでは、デフォルト戦略を使用した分割により、リストから項目が発行されます。その項目の処理が失敗することがあります。そのエラーを処理し、以前のメッセージ(カスタムエラーヘッダーに加えて)からのマッピング情報を持つ新しいメッセージを通常のメッセージングチャネルに送信します。Spring統合におけるメッセージ集約ロジックのカスタマイズ方法Java DSL

アグリゲータでは、失敗したプロセスの数と失敗しなかったメッセージの結果を含む別のタイプのメッセージを生成するために、集約ロジックをカスタマイズする必要があります。

ここで私は、私は、ヘッダーと、エラーメッセージを送信する方法について説明します。

@Bean 
public IntegrationFlow socialMediaErrorFlow() { 
    return IntegrationFlows.from("socialMediaErrorChannel") 
      .wireTap(sf -> sf.handle("errorService", "handleException")) 
      .<MessagingException>handle((p, h) 
       -> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList()) 
        .copyHeaders(p.getFailedMessage().getHeaders()) 
        .setHeader("ERROR", true) 
        .build() 
      ) 
      .channel("directChannel_1") 
      .get(); 
} 

私はアグリゲータは、このタイプのオブジェクトを生成したい:

public class Result { 

    private Integer totalTask; 
    private Integer taskFailed; 
    private List<CommentEntity> comments; 

} 

私はこれに近づく必要がありますどのように?

ありがとうございます。私はこの実装を作っアルテムの助けに

ありがとう:

.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() { 
     @Override 
     public Object processMessageGroup(MessageGroup mg) { 
      Integer failedTaskCount = 0; 
      Integer totalTaskCount = mg.getMessages().size(); 
      List<CommentEntity> comments = new ArrayList<>(); 
      for(Message<?> message: mg.getMessages()){ 
       if(message.getHeaders().containsKey("ERROR")) 
        failedTaskCount++; 
       else 
          comments.addAll((List<CommentEntity>)message.getPayload()); 
     } 

    return new IterationResult(totalTaskCount, failedTaskCount, comments); 

    } 
})) 

答えて

1

AggregatorSpecoutputProcessor性質を持っています

ここ
/** 
* A processor to determine the output message from the released group. Defaults to a message 
* with a payload that is a collection of payloads from the input messages. 
* @param outputProcessor the processor. 
* @return the aggregator spec. 
*/ 
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) { 

あなたがすべてのメッセージを解析するために、独自のカスタムロジックを提供することができますあなたのためにあなたのResultを構築してください。

テストケースからサンプル:

.aggregate(a -> a.outputProcessor(g -> g.getMessages() 
         .stream() 
         .map(m -> (String) m.getPayload()) 
         .collect(Collectors.joining(" ")))) 

Cafe Demoサンプル:

.aggregate(aggregator -> aggregator 
     .outputProcessor(g -> 
        new Delivery(g.getMessages() 
           .stream() 
           .map(message -> (Drink) message.getPayload()) 
           .collect(Collectors.toList()))) 
     .correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber())) 
関連する問題