2017-02-01 9 views
0

私は多くの春の統合コンポーネントでアプリケーションを継承しました(そして、私は春の統合に慣れていません)。春の統合実装のレビュー/フィードランダムに処理を停止します

この問題は、グループ化された(correlId)、再順序付けされたほぼ2000個の個別のJMSメッセージでシステムに入ってくる(Large)ProductFeedで最も一般的です。この問題は電子商取引アプリケーションであり、 Apache MQサーバーから取得された後に正しい順序で実行されます。これは、従属オブジェクトが他のオブジェクトの前にDBに保存されるように行われます。製品情報システムからの輸出構造の結果。しかし、価格などの他の飼料でも目撃されています。

私たちには同じバネ構成の管理サーバ/ JVMが3つあります。フィード内の2000個のメッセージの1〜2個のメッセージの後で、Product Feedsがメッセージの処理をランダムに停止することがあるという問題があります。すべてのメッセージはAMQで正常にエンキューされ、キューはすべてのメッセージを排除したように見えますが、実際には1〜2だけログが受信されて処理されます。

これは、複数の管理サーバーが存在する環境でのみ発生します。 Springの統合ドキュメントでは、複数のJVMがクラスタ内でどのメッセージをどのメッセージで取り出すかについてどのように機能するのか分かりません。JMS Consumersが何らかの理由でこの問題を引き起こしているのかどうか疑問に思っています。メッセージを受信し、待ち行列に長時間住んでいるため、取り出されて再配列される)。

  • 過去にこのフィード処理の問題が発生しましたか?
  • このようなメッセージを処理するために使用するより優れたアーキテクチャがあるか、バネ統合構成から何かが欠落していますか?

私はスプリング統合コード、製品を具体的にすべての処理豆に保つために、いくつかのスプリングコードを省略しています。すべての感謝の気持ちで助けてください。

<bean id="jmsDefaultRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 
    <property name="queue" value="*"/> 
    <property name="maximumRedeliveries" value="${import.jms.redelivery.count}"/> 
    <property name="initialRedeliveryDelay" value="${import.jms.redelivery.delay}"/> 
</bean> 

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL" value="${import.jms.url}"/> 
    <property name="redeliveryPolicy" ref="jmsDefaultRedeliveryPolicy"/> 
</bean> 

<integration:control-bus input-channel="operationChannel"/> 

<!-- message resequencing flow --> 
<integration:channel id="messageRoutingInputChannel"/> 

<integration:header-value-router input-channel="messageRoutingInputChannel" header-name="REQUIRES_RESEQUENCING"> 
    <integration:mapping value="true" channel="messageResequenceInputChannel" /> 
    <integration:mapping value="false" channel="messageHandleInputChannel" /> 
</integration:header-value-router> 

<integration:channel id="messageResequenceInputChannel"/> 

<integration:chain input-channel="messageResequenceInputChannel" output-channel="messageHandleInputChannel"> 
    <integration:header-enricher id="messageHeaderEnricher"> 
     <integration:header name="correlationId" expression="headers['jms_correlationId']" /> 
     <integration:header name="sequenceNumber" expression="headers['JMSXGroupSeq']" /> 
     <integration:header name="sequenceSize" expression="headers['GROUP_SIZE']" /> 
    </integration:header-enricher> 

    <integration:resequencer id="messageResequencer" release-partial-sequences="true" /> 
</integration:chain> 
<!-- message resequencing flow --> 

<integration:channel id="messageHandleInputChannel"/> 

<integration:header-value-router input-channel="messageHandleInputChannel" header-name="FEED_TYPE"> 
    <integration:mapping value="Assets" channel="messageProductInputChannel"/> 
    <integration:mapping value="Attributes" channel="messageProductInputChannel"/> 
    <integration:mapping value="DeletedCategories" channel="messageProductInputChannel"/> 
    <integration:mapping value="Categories" channel="messageProductInputChannel"/> 
    <integration:mapping value="VariantCategories" channel="messageProductInputChannel"/> 
    <integration:mapping value="DeletedBaseProducts" channel="messageProductInputChannel"/> 
    <integration:mapping value="BaseProducts" channel="messageProductInputChannel"/> 
    <integration:mapping value="Products" channel="messageProductInputChannel"/> 
    <integration:mapping value="ProductCrossReferences" channel="messageProductInputChannel"/> 
</integration:header-value-router> 

<bean id="baseMessageHandler" abstract="true" class="com.mycompany.dataimport.service.feed.impl.AbstractMessageHandler"> 
    <property name="activeTenantId" value="${tenantId}" /> 
</bean> 

<util:list id="activeChannelAdapters"> 
    <value>productMessageAdapter</value> 
</util:list> 

<bean id="baseFeedImportStrategy" abstract="true" class="com.mycompany.dataimport.service.feed.impl.AbstractFeedImportStrategy"> 
    <property name="feedSessionSetups"> 
     <list> 
      <ref bean="adminUserSessionSetup"/> 
      <ref bean="catalogSessionSetup"/> 
     </list> 
    </property> 
    <property name="errorLoggers"> 
     <list> 
      <bean class="com.mycompany.dataimport.service.feed.impl.ConsoleImportErrorLogger" /> 
      <ref bean="emailImportErrorLogger" /> 
     </list> 
    </property> 
</bean> 

<bean id="productQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
    <constructor-arg index="0" value="${import.queue.feed.product}" /> 
</bean> 

<jms:message-driven-channel-adapter id="productMessageAdapter" channel="messageRoutingInputChannel" destination="productQueue" connection-factory="jmsConnectionFactory" 
    acknowledge="transacted" auto-startup="false" concurrent-consumers="${concurrent.consumers.product.queue}" max-concurrent-consumers="${max.concurrent.consumers.product.queue}" /> 

<integration:channel id="messageProductInputChannel" /> 

<integration:service-activator id="productMessageActivator" input-channel="messageProductInputChannel" ref="productMessageHandler" /> 

<bean id="productMessageHandler" parent="baseMessageHandler" class="com.mycompany.dataimport.service.feed.product.ProductMessageHandler" scope="prototype"> 
    <property name="importStrategyMap"> 
     <map> 
      <entry key="Assets" value-ref="assetFeedImportStrategy" /> 
      <entry key="Attributes" value-ref="attributeFeedImportStrategyChain" /> 
      <entry key="VariantCategories" value-ref="variantCategoryFeedImportStrategy" /> 
      <entry key="Categories" value-ref="categoryFeedImportStrategyChain" /> 
      <entry key="DeletedCategories" value-ref="deleteCategoryFeedImportStrategy" /> 
      <entry key="DeletedBaseProducts" value-ref="disableBaseProductImportStrategy" /> 
      <entry key="BaseProducts" value-ref="baseProductFeedImportStrategy" /> 
      <entry key="Products" value-ref="productFeedImportStrategyChain" /> 
      <entry key="ProductCrossReferences" value-ref="productReferenceFeedImportStrategy" /> 
     </map> 
    </property> 
</bean> 

答えて

0

インスタンスはすべて同じキューを待機しているため、メッセージを競合します。 ActiveMQにはプリフェッチ構成があり、パフォーマンスを向上させるために使用されます(メッセージを一度に1つずつ取り出すのではなく)。デフォルトでは1000に設定されているため、2000件のメッセージしかないため、消費者の中にはメッセージが枯渇している可能性があります。特に、各コンテナに複数のコンシューマがある場合は、コンフィギュレーションが意味します。

reducing the prefetchを試して、コンシューマ間での作業をより効果的に分散できるかどうかを確認できます。

私の経験上、これがうまくいかない場合、そのような「ハング」は通常、リスナースレッドがユーザーコードに「スタック」しているためです。スレッド・ダンプを取って、リスナー・コンテナ・スレッドが何をしているのかを確認する必要があります。

+0

クイックレスポンスをお寄せいただきありがとうございます。私はプリフェッチを調べます。 私たちは$ {concurrent.consumers.product.queue}のように複数のコンシューマーを容易にするプロパティを持っていますが、これは1だけを使用します。これはコード変更を必要とせずにパフォーマンス環境でのランタイムテストを容易にするためです。 – CWayman

+0

1つまたは2つのメッセージ処理後に残りの1998年が実際にキューから排水されていますが、エーテルにドリフトしているように見えますが、DLQではなく失われています。 – CWayman

+0

"エーテルに漂う" - 何かがミューズを消費している。 DEBUGロギングが役立つはずです。 –

関連する問題