私は多くの春の統合コンポーネントでアプリケーションを継承しました(そして、私は春の統合に慣れていません)。春の統合実装のレビュー/フィードランダムに処理を停止します
この問題は、グループ化された(correlId)、再順序付けされたほぼ2000個の個別のJMSメッセージでシステムに入ってくる(Large)ProductFeedで最も一般的です。この問題は電子商取引アプリケーションであり、 Apache MQサーバーから取得された後に正しい順序で実行されます。これは、従属オブジェクトが他のオブジェクトの前にDBに保存されるように行われます。製品情報システムからの輸出構造の結果。しかし、価格などの他の飼料でも目撃されています。
私たちには同じバネ構成の管理サーバ/ JVMが3つあります。フィード内の2000個のメッセージの1〜2個のメッセージの後で、Product Feedsがメッセージの処理をランダムに停止することがあるという問題があります。すべてのメッセージはAMQで正常にエンキューされ、キューはすべてのメッセージを排除したように見えますが、実際には1〜2だけログが受信されて処理されます。
これは、複数の管理サーバーが存在する環境でのみ発生します。 Springの統合ドキュメントでは、複数のJVMがクラスタ内でどのメッセージをどのメッセージで取り出すかについてどのように機能するのか分かりません。JMS Consumersが何らかの理由でこの問題を引き起こしているのかどうか疑問に思っています。メッセージを受信し、待ち行列に長時間住んでいるため、取り出されて再配列される)。
- 過去にこのフィード処理の問題が発生しましたか?
- このようなメッセージを処理するために使用するより優れたアーキテクチャがあるか、バネ統合構成から何かが欠落していますか?
私はスプリング統合コード、製品を具体的にすべての処理豆に保つために、いくつかのスプリングコードを省略しています。すべての感謝の気持ちで助けてください。
<bean id="jmsDefaultRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="queue" value="*"/>
<property name="maximumRedeliveries" value="${import.jms.redelivery.count}"/>
<property name="initialRedeliveryDelay" value="${import.jms.redelivery.delay}"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${import.jms.url}"/>
<property name="redeliveryPolicy" ref="jmsDefaultRedeliveryPolicy"/>
</bean>
<integration:control-bus input-channel="operationChannel"/>
<!-- message resequencing flow -->
<integration:channel id="messageRoutingInputChannel"/>
<integration:header-value-router input-channel="messageRoutingInputChannel" header-name="REQUIRES_RESEQUENCING">
<integration:mapping value="true" channel="messageResequenceInputChannel" />
<integration:mapping value="false" channel="messageHandleInputChannel" />
</integration:header-value-router>
<integration:channel id="messageResequenceInputChannel"/>
<integration:chain input-channel="messageResequenceInputChannel" output-channel="messageHandleInputChannel">
<integration:header-enricher id="messageHeaderEnricher">
<integration:header name="correlationId" expression="headers['jms_correlationId']" />
<integration:header name="sequenceNumber" expression="headers['JMSXGroupSeq']" />
<integration:header name="sequenceSize" expression="headers['GROUP_SIZE']" />
</integration:header-enricher>
<integration:resequencer id="messageResequencer" release-partial-sequences="true" />
</integration:chain>
<!-- message resequencing flow -->
<integration:channel id="messageHandleInputChannel"/>
<integration:header-value-router input-channel="messageHandleInputChannel" header-name="FEED_TYPE">
<integration:mapping value="Assets" channel="messageProductInputChannel"/>
<integration:mapping value="Attributes" channel="messageProductInputChannel"/>
<integration:mapping value="DeletedCategories" channel="messageProductInputChannel"/>
<integration:mapping value="Categories" channel="messageProductInputChannel"/>
<integration:mapping value="VariantCategories" channel="messageProductInputChannel"/>
<integration:mapping value="DeletedBaseProducts" channel="messageProductInputChannel"/>
<integration:mapping value="BaseProducts" channel="messageProductInputChannel"/>
<integration:mapping value="Products" channel="messageProductInputChannel"/>
<integration:mapping value="ProductCrossReferences" channel="messageProductInputChannel"/>
</integration:header-value-router>
<bean id="baseMessageHandler" abstract="true" class="com.mycompany.dataimport.service.feed.impl.AbstractMessageHandler">
<property name="activeTenantId" value="${tenantId}" />
</bean>
<util:list id="activeChannelAdapters">
<value>productMessageAdapter</value>
</util:list>
<bean id="baseFeedImportStrategy" abstract="true" class="com.mycompany.dataimport.service.feed.impl.AbstractFeedImportStrategy">
<property name="feedSessionSetups">
<list>
<ref bean="adminUserSessionSetup"/>
<ref bean="catalogSessionSetup"/>
</list>
</property>
<property name="errorLoggers">
<list>
<bean class="com.mycompany.dataimport.service.feed.impl.ConsoleImportErrorLogger" />
<ref bean="emailImportErrorLogger" />
</list>
</property>
</bean>
<bean id="productQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="${import.queue.feed.product}" />
</bean>
<jms:message-driven-channel-adapter id="productMessageAdapter" channel="messageRoutingInputChannel" destination="productQueue" connection-factory="jmsConnectionFactory"
acknowledge="transacted" auto-startup="false" concurrent-consumers="${concurrent.consumers.product.queue}" max-concurrent-consumers="${max.concurrent.consumers.product.queue}" />
<integration:channel id="messageProductInputChannel" />
<integration:service-activator id="productMessageActivator" input-channel="messageProductInputChannel" ref="productMessageHandler" />
<bean id="productMessageHandler" parent="baseMessageHandler" class="com.mycompany.dataimport.service.feed.product.ProductMessageHandler" scope="prototype">
<property name="importStrategyMap">
<map>
<entry key="Assets" value-ref="assetFeedImportStrategy" />
<entry key="Attributes" value-ref="attributeFeedImportStrategyChain" />
<entry key="VariantCategories" value-ref="variantCategoryFeedImportStrategy" />
<entry key="Categories" value-ref="categoryFeedImportStrategyChain" />
<entry key="DeletedCategories" value-ref="deleteCategoryFeedImportStrategy" />
<entry key="DeletedBaseProducts" value-ref="disableBaseProductImportStrategy" />
<entry key="BaseProducts" value-ref="baseProductFeedImportStrategy" />
<entry key="Products" value-ref="productFeedImportStrategyChain" />
<entry key="ProductCrossReferences" value-ref="productReferenceFeedImportStrategy" />
</map>
</property>
</bean>
クイックレスポンスをお寄せいただきありがとうございます。私はプリフェッチを調べます。 私たちは$ {concurrent.consumers.product.queue}のように複数のコンシューマーを容易にするプロパティを持っていますが、これは1だけを使用します。これはコード変更を必要とせずにパフォーマンス環境でのランタイムテストを容易にするためです。 – CWayman
1つまたは2つのメッセージ処理後に残りの1998年が実際にキューから排水されていますが、エーテルにドリフトしているように見えますが、DLQではなく失われています。 – CWayman
"エーテルに漂う" - 何かがミューズを消費している。 DEBUGロギングが役立つはずです。 –