2017-10-06 43 views
0

おそらくRabbitMQによってスローされる次の例外を防ぐために何が必要ですか?ListenerExecutionFailedException:リスナーが例外をスローしました

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:877) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:787) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:707) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:98) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1236) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:688) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1190) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1174) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:98) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1363) 
    at java.lang.Thread.run(Thread.java:748) 
    Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'amqpLaunchSpringBatchJobFlow.channel#0'; nested exception is jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32} 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:95) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:784) 
    ... 10 common frames omitted 
    Caused by: jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32} 
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:42) 
    at jp.ixam_drive.facebook.AmqpBatchLaunchIntegrationFlows.lambda$amqpLaunchSpringBatchJobFlow$1(AmqpBatchLaunchIntegrationFlows.java:71) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    ... 18 common frames omitted 
    Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32}. If you want to run this job again, change the parameters. 
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) 
    at sun.reflect.GeneratedMethodAccessor193.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) 
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) 
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
    at com.sun.proxy.$Proxy125.createJobExecution(Unknown Source) 
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) 
    at jp.ixam_drive.batch.service.JobOperationsService.launch(JobOperationsService.java:64) 
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:37) 
    ... 24 common frames omitted 

Spring Batchジョブを実行するために、次のコードを両方とも並列に実行するSpringブートアプリケーションのインスタンスが2つあるとしますか?

1. channel[async_ads_insights] ->IntegrationFlow[amqpOutboundAsyncAdsInsights]->[AMQP]->IntegrationFlow[amqpAdsInsightsAsyncJobRequestFlow]->channel[ad_report_run_polling_channel]->IntegrationFlow[adReportRunPollingLoopFlow]-IF END LOOP->channel[batch_launch_channel] ELSE -> channel[ad_report_run_polling_channel] 

    2. channel[batch_launch_channel] -> IntegrationFlow[amqpOutbound]-> IntegrationFlow[amqpLaunchSpringBatchJobFlow] 

    3. Spring Batch Job is launched. 

例外は両方のインスタンスが開始された直後に投げたが、しばらくした後にされていません。ここで

@Configuration 
@Conditional(AmqpBatchLaunchCondition.class) 
@Slf4j 
public class AmqpAsyncAdsInsightsConfiguration { 

    @Autowired 
    ObjectMapper objectMapper; 

    @Value("${batch.launch.amqp.routing-keys.async-insights}") 
    String routingKey; 

    @Bean 
    public IntegrationFlow amqpOutboundAsyncAdsInsights(AmqpTemplate amqpTemplate) { 
     return IntegrationFlows.from("async_ads_insights") 
       .<JobParameters, byte[]>transform(SerializationUtils::serialize) 
       .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpAdsInsightsAsyncJobRequestFlow(FacebookMarketingServiceProvider serviceProvider, 
      JobParametersToApiParametersTransformer transformer, ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey)) 
       .<byte[], JobParameters>transform(SerializationUtils::deserialize) 
       .<JobParameters, ApiParameters>transform(transformer) 
       .<ApiParameters>handle((payload, header) -> { 
        String accessToken = (String) header.get("accessToken"); 
        String id = (String) header.get("object_id"); 
        FacebookMarketingApi api = serviceProvider.getApi(accessToken); 
        String reportRunId = api.asyncRequestOperations().getReportRunId(id, payload.toMap()); 
        ObjectNode objectNode = objectMapper.createObjectNode(); 
        objectNode.put("accessToken", accessToken); 
        objectNode.put("id", id); 
        objectNode.put("report_run_id", reportRunId); 
        objectNode.put("classifier", (String) header.get("classifier")); 
        objectNode.put("job_request_id", (Long) header.get("job_request_id")); 
        return serialize(objectNode); 
       }).channel("ad_report_run_polling_channel").get(); 
    } 

    @SneakyThrows 
    private String serialize(JsonNode jsonNode) { 
     return objectMapper.writeValueAsString(jsonNode); 
    } 
} 

@Configuration 
@Conditional(AmqpBatchLaunchCondition.class) 
@Slf4j 
public class AmqpBatchLaunchIntegrationFlows { 

    @Autowired 
    SpringBatchLauncher batchLauncher; 

    @Value("${batch.launch.amqp.routing-keys.job-launch}") 
    String routingKey; 

    @Bean(name = "batch_launch_channel") 
    public MessageChannel batchLaunchChannel() { 
     return MessageChannels.executor(Executors.newSingleThreadExecutor()).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate, 
      @Qualifier("batch_launch_channel") MessageChannel batchLaunchChannel) { 
     return IntegrationFlows.from(batchLaunchChannel) 
       .<JobParameters, byte[]>transform(SerializationUtils::serialize) 
       .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpLaunchSpringBatchJobFlow(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey)) 
       .handle(message -> { 
        String jobName = (String) message.getHeaders().get("job_name"); 
        byte[] bytes = (byte[]) message.getPayload(); 
        JobParameters jobParameters = SerializationUtils.deserialize(bytes); 
        batchLauncher.launchJob(jobName, jobParameters); 
       }).get(); 
    } 
} 

@Configuration 
@Slf4j 
public class AsyncAdsInsightsConfiguration { 

    @Value("${batch.core.pool.size}") 
    public Integer batchCorePoolSize; 

    @Value("${ixam_drive.facebook.api.ads-insights.async-poll-interval}") 
    public String asyncPollInterval; 

    @Autowired 
    ObjectMapper objectMapper; 

    @Autowired 
    private DataSource dataSource; 

    @Bean(name = "async_ads_insights") 
    public MessageChannel adsInsightsAsyncJobRequestChannel() { 
     return MessageChannels.direct().get(); 
    } 

    @Bean(name = "ad_report_run_polling_channel") 
    public MessageChannel adReportRunPollingChannel() { 
     return MessageChannels.executor(Executors.newFixedThreadPool(batchCorePoolSize)).get(); 
    } 

    @Bean 
    public IntegrationFlow adReportRunPollingLoopFlow(FacebookMarketingServiceProvider serviceProvider) { 
     return IntegrationFlows.from(adReportRunPollingChannel()) 
       .<String>handle((payload, header) -> { 
        ObjectNode jsonNode = deserialize(payload); 
        String accessToken = jsonNode.get("accessToken").asText(); 
        String reportRunId = jsonNode.get("report_run_id").asText(); 
        try { 
         AdReportRun adReportRun = serviceProvider.getApi(accessToken) 
           .fetchObject(reportRunId, AdReportRun.class); 
         log.debug("ad_report_run: {}", adReportRun); 
         return jsonNode.set("ad_report_run", objectMapper.valueToTree(adReportRun)); 
        } catch (Exception e) { 
         log.error("failed while polling for ad_report_run.id: {}", reportRunId); 
         throw new RuntimeException(e); 
        } 
       }).<JsonNode, Boolean>route(payload -> { 
        JsonNode adReportRun = payload.get("ad_report_run"); 
        return adReportRun.get("async_percent_completion").asInt() == 100 && 
          "Job Completed".equals(adReportRun.get("async_status").asText()); 
       }, rs -> rs.subFlowMapping(true, 
         f -> f.transform(JsonNode.class, 
           source -> { 
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); 
            jobParametersBuilder 
              .addString("accessToken", source.get("accessToken").asText()); 
            jobParametersBuilder.addString("id", source.get("id").asText()); 
            jobParametersBuilder 
              .addString("classifier", source.get("classifier").asText()); 
            jobParametersBuilder 
              .addLong("report_run_id", source.get("report_run_id").asLong()); 
            jobParametersBuilder 
              .addLong("job_request_id", source.get("job_request_id").asLong()); 
            return jobParametersBuilder.toJobParameters(); 
           }).channel("batch_launch_channel")) 
         .subFlowMapping(false, 
           f -> f.transform(JsonNode.class, this::serialize) 
             .<String>delay("delay", asyncPollInterval, c -> c.transactional() 
               .messageStore(jdbcMessageStore())) 
             .channel(adReportRunPollingChannel()))).get(); 
    } 

    @SneakyThrows 
    private String serialize(JsonNode jsonNode) { 
     return objectMapper.writeValueAsString(jsonNode); 
    } 

    @SneakyThrows 
    private ObjectNode deserialize(String payload) { 
     return objectMapper.readerFor(ObjectNode.class).readValue(payload); 
    } 

    @Bean 
    public JdbcMessageStore jdbcMessageStore() { 
     JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource); 
     return jdbcMessageStore; 
    } 

    @Bean 
    public JobParametersToApiParametersTransformer jobParametersToApiParametersTransformer() { 
     return new JobParametersToApiParametersTransformer() { 
      @Override 
      protected ApiParameters transform(JobParameters jobParameters) { 
       ApiParameters.ApiParametersBuilder builder = ApiParameters.builder(); 
       MultiValueMap<String, String> multiValueMap = new LinkedMultiValueMap<>(); 
       String level = jobParameters.getString("level"); 
       if (!StringUtils.isEmpty(level)) { 
        multiValueMap.set("level", level); 
       } 
       String fields = jobParameters.getString("fields"); 
       if (!StringUtils.isEmpty(fields)) { 
        multiValueMap.set("fields", fields); 
       } 
       String filter = jobParameters.getString("filter"); 
       if (filter != null) { 
        try { 
         JsonNode jsonNode = objectMapper.readTree(filter); 
         if (jsonNode != null && jsonNode.isArray()) { 
          List<ApiFilteringParameters> filteringParametersList = new ArrayList<>(); 
          List<ApiSingleValueFilteringParameters> singleValueFilteringParameters = new ArrayList<>(); 
          ArrayNode arrayNode = (ArrayNode) jsonNode; 
          arrayNode.forEach(node -> { 
           String field = node.get("field").asText(); 
           String operator = node.get("operator").asText(); 
           if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(operator)) { 
            String values = node.get("values").asText(); 
            String[] valuesArray = !StringUtils.isEmpty(values) ? values.split(",") : null; 
            if (valuesArray != null) { 
             if (valuesArray.length > 1) { 
              filteringParametersList.add(ApiFilteringParameters 
                .of(field, Operator.valueOf(operator), valuesArray)); 
             } else { 
              singleValueFilteringParameters.add(ApiSingleValueFilteringParameters 
                .of(field, Operator.valueOf(operator), valuesArray[0])); 
             } 
            } 
           } 
          }); 
          if (!filteringParametersList.isEmpty()) { 
           builder.filterings(filteringParametersList); 
          } 
          if (!singleValueFilteringParameters.isEmpty()) { 
           builder.filterings(singleValueFilteringParameters); 
          } 
         } 

        } catch (IOException e) { 
         throw new UncheckedIOException(e); 
        } 
       } 
       String start = jobParameters.getString("time_ranges.start"); 
       String end = jobParameters.getString("time_ranges.end"); 
       String since = jobParameters.getString("time_range.since"); 
       String until = jobParameters.getString("time_range.until"); 

       if (!StringUtils.isEmpty(start) && !StringUtils.isEmpty(end)) { 
        builder.timeRanges(ApiParameters.timeRanges(start, end)); 
       } else if (!StringUtils.isEmpty(since) && !StringUtils.isEmpty(until)) { 
        builder.timeRange(new TimeRange(since, until)); 
       } 
       String actionBreakdowns = jobParameters.getString("action_breakdowns"); 
       if (!StringUtils.isEmpty(actionBreakdowns)) { 
        multiValueMap.set("action_breakdowns", actionBreakdowns); 
       } 
       String attributionWindows = jobParameters.getString("action_attribution_windows"); 
       if (attributionWindows != null) { 
        try { 
         multiValueMap 
           .set("action_attribution_windows", 
             objectMapper.writeValueAsString(attributionWindows.split(","))); 
        } catch (JsonProcessingException e) { 
         e.printStackTrace(); 
        } 
       } 
       builder.multiValueMap(multiValueMap); 
       String pageSize = jobParameters.getString("pageSize"); 
       if (!StringUtils.isEmpty(pageSize)) { 
        builder.limit(pageSize); 
       } 
       return builder.build(); 
      } 
     }; 
    } 
} 

は、メッセージがどのように流れるかです。 Spring Batch Jobの起動は成功しますが、「ジョブインスタンスは既に存在し、完了しています...」と失敗し始めます。

ジョブは、Facebookの広告結果を取得するためのものです。

私はあなたの洞察力を上記のエラーの原因となっていることに感謝します。

AMQPを使用せず問題なく動作するこの設定もありますが、これは1つのインスタンスにのみ適用されます。

@Configuration 
@Conditional(SimpleBatchLaunchCondition.class) 
@Slf4j 
public class SimpleBatchLaunchIntegrationFlows { 

    @Autowired 
    SpringBatchLauncher batchLauncher; 

    @Autowired 
    DataSource dataSource; 

    @Bean(name = "batch_launch_channel") 
    public MessageChannel batchLaunchChannel() { 
     return MessageChannels.queue(jdbcChannelMessageStore(), "batch_launch_channel").get(); 
    } 

    @Bean 
    public ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider() { 
     return new MySqlChannelMessageStoreQueryProvider(); 
    } 

    @Bean 
    public JdbcChannelMessageStore jdbcChannelMessageStore() { 
     JdbcChannelMessageStore channelMessageStore = new JdbcChannelMessageStore(dataSource); 
     channelMessageStore.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider()); 
     channelMessageStore.setUsingIdCache(true); 
     channelMessageStore.setPriorityEnabled(true); 
     return channelMessageStore; 
    } 

    @Bean 
    public IntegrationFlow launchSpringBatchJobFlow(@Qualifier("batch_launch_channel") 
      MessageChannel batchLaunchChannel) { 
     return IntegrationFlows.from(batchLaunchChannel) 
       .handle(message -> { 
        String jobName = (String) message.getHeaders().get("job_name"); 
        JobParameters jobParameters = (JobParameters) message.getPayload(); 
        batchLauncher.launchJob(jobName, jobParameters); 
       }, e->e.poller(Pollers.fixedRate(500).receiveTimeout(500))).get(); 
    } 
} 
+0

私はAMQP(SimpleBatchLaunchIntegrationFlows)を使用しないが、単一のインスタンス(仕事が共有されていない)だけを使用する@Configurationを追加しました – hanishi

答えて

1

スプリングバッチのドキュメントを参照してください。ジョブの新しいインスタンスを起動するとき、ジョブパラメータは一意でなければなりません。

一般的な解決策は、UUIDなどでダミーパラメータを追加することですが、バッチは、毎回数値パラメータを増やすなどの戦略を提供します。

EDIT

ありのメンバーが回収不能とみなされた例外の特定のクラス(致命的)があり、それは、再配信を試行しても意味がありません。

例:MessageConversionException - 最初に変換できない場合は、再配信で変換できない可能性があります。 ConditionalRejectingErrorHandlerは、このような例外を検出し、永続的に拒否される(再配信されない)メカニズムです。

その他の例外では、デフォルトでメッセージが再配信されます.という別のプロパティがあり、すべての失敗を恒久的に拒否するにはfalseに設定できます(推奨しません)。

あなたは、そのDefaultExceptionStrategyをサブクラス化することにより、エラーハンドラをカスタマイズすることができます -

cause.getCause().getCause() instanceof ...JobInstanceAlreadyCompleteExceptionを探し、trueを返すためにcauseツリーをスキャンするisUserCauseFatal(Throwable cause)をオーバーライドし、私はそれはによってスローエラーによって引き起こされたと思います"既に実行中のSpringBatchジョブ"例外。

それでも、同じパラメータで2番目のメッセージが受信されたことを示しています。元のジョブがまだ実行されているため、別のエラーです。そのメッセージは拒否され(および再キューに入れられます)、その後の配信では既に完了した例外が発生します。

問題の根本的な原因は重複した要求ですが、チャネルアダプタのリスナーコンテナでカスタマイズされたエラーハンドラを使用して動作を回避できます。

なぜあなたがそれらを入手しているのか理解できるように、重複したメッセージを記録することをお勧めします。

+0

あなたの応答に感謝します。私はそれが実行されているそれぞれのジョブパラメータの一意に識別可能な組み合わせを持っているため、それはジョブパラメータではないことを知っています。この例外がスローされるまで、ジョブはしばらくエラーなしに実行され、完了します。コンソールで見ると、両方のインスタンスがAMQPから同じジョブ識別可能なパラメータを取得している可能性があります。例外がスローされます。 「リスナーが例外を投げた」と思ったので、これを防ぐ方法を知りたい。 – hanishi

+0

それは分かりません。その例外が発生した場合、同じパラメーターでジョブを開始しようとしています。それはrabbitmqとは関係ありません。デバッグログは、それを追跡するのに役立ちます。 –

+0

ログにConditionalRejectingErrorHandlerがあります。 2017-10-07 10:26:24.572 WARN 55577 --- [erContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler:Rabbitメッセージリスナーの実行に失敗しました。 ドキュメントごとに、「条件付きでAmqpRejectAndDontRequeueExceptionで例外がラップされます」と、同じパラメータで同じジョブの「再実行 」が発生していると推測しています。 AmqpRejectAndDontRequeueExceptionとは何ですか? – hanishi

関連する問題