別のキュー(SQS)からのデータを格納するサービスを実装したいと考えています。私はちょうど方法プリントにメッセージサービスとしてのSpring共有キュー
のペイロードのみをテストするために、リスナー
@Component
public class MyListener implements MessageListener{
@Override
@JmsListener(destination = "my-queue")
public void onMessage(Message message) {
try{
System.out.println(((TextMessage) message).getText());
}catch (Exception e){
e.printStackTrace();
}
}
}
を定義したそして今、私はservice
としてLinkedBlockingQueue
に基づいて、メモリ内のキューのいくつかの並べ替えを定義したいすべての ファースト
public interface IStoreService {
void save(Order order);
String get();
}
アイデアは、他のサービスは、最初のメッセージを取得し、それを使用するget()
メソッドを使用することができるだろうということです。このソリューションは、サービスが何らかの情報を取得したり、通知を待つことができる単一のエンドポイントを作成することで、遅延を減らすことができると私は考えています。また、それは私の仮説であり、何かを誤解する可能性があります。
最後に質問:1人のプロデューサと複数のコンシューマに対して、このキューを正しく共有するにはどうすればよいですか?
UPD1(アプリケーションの詳細)。
Simple Queue Service(Amazon)から情報(JSONオブジェクトとしましょう)を取得するスタンドアロンアプリケーションです。このロジックはMyListener
コンポーネントに実装されています。私もOrders
を持っている別のキューを持っていて、キューはActiveMQに基づいています(ええ、アプリケーションのアーキテクチャーはうんざりです)。
そして、ここでは、ワークフローです: ActiveMQListener
は注文にActiveMQのキューから一つ一つを取り出し、私はActiveMQListener
が私Storage
から最初のメッセージをつかむとのは、これら2つのオブジェクトをマージ言うようにしたいです。
唯一の問題は、SQSからデータをフェッチしてストレージに格納するMyListenerと、ActiveMQからデータをフェッチするActiveMQListenerとの間で、このストレージを共有する方法です。
StoreService
のデモコードがあります。この問題を解決しようとしているときに、私は問題に直面
public class StoreService implements IStoreService {
private final Queue<Order> queue = new LinkedBlockingQueue<>();
@Override
public void save(Order order) {
this.queue.add(order);
}
@Override
public String get() {
Order order = queue.poll();
return order.getId();
}
}
UPD2 。 System.out.prinlnのために、私は」receiveRequest
@JmsListener(destination = "request.queue")
@SendTo("response.queue")
public String receiveRequest(Order order){
System.out.println("[INFO] REQUEST:" + order.getId());
try{
String test = storeService.get();
System.out.println("[INFO] SQS:" + test);
}catch (Exception e){
e.printStackTrace();
}
return order.getId();
}
申し訳ありませんが、私もこの方法でStoreService
@Service
public class StoreService implements IStoreService {
private final BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
@Override
public void save(Order order) {
this.queue.add(order);
}
@Override
public String get() {
Order order = null;
try {
order = queue.take();
return order.getId();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
ActiveMQListenerを持って
+--------------+-----------------------------------------+
|request.queue | - get information from ActiveMQ |
|response.queue| - pass data to another ActiveMQ queue |
|sqs.queue | - queue based on Amazon SQS |
+--------------+-----------------------------------------+
:私は上記のように3つのキューがありますv無効化されたロガー このメソッドは、というキューから情報を取得することがわかりますもがstoreSerivce
ラインからデータを取得しようとした:String test = storeService.get()
しかし、問題は、私はそれにstoreServiceを渡す方法が分からないということです。私はそれがstoreService
の唯一の新しいインスタンスだから、この
private StoreService storeService;
@Autowired
public ActiveMQListener Listener(StoreService storeService){
this.storeService = storeService;
}
そしてstoreService
のようなものが空になっていました。
アイデア?
UPD3 また、github repositoryにもアップロードしました。ちょっと面倒ですが、構想を理解することは大丈夫だと思います。
あなたの質問に関連する情報をいくつか教えてください。すでにインターフェースを定義しています。プロデューサーと消費者は、単にそれを使うことができます。あなたの質問に「分け前」とはどういう意味ですか?たとえば、リモートアクセスが必要ですか?あなたの要件は何ですか? – mm759
@ mm759、UPD1セクションを追加しました。これをチェックしてください。 – Ascelhem
私の答えの候補(私はあなたが必要なものを理解したいと思います):MyListenerは、データを受け取るときにStoreServiceで呼び出しを呼び出します。 ActiveMQListenerはStoreService.getを呼び出してデータを取得します。並行性の問題を避けるために、StoreServiceのメソッドにsynchronizedを追加する必要があります。私が理解していないのは、StoreService.getがIDだけを返すということです。あなたが言及した遅れを遅らせるために、他の場所からそれを取得する必要がないようにオブジェクト全体を必要としないでください。インメモリストレージを使用すると、サーバークラッシュなどの場合にデータが失われることに注意してください。 – mm759