2016-08-28 8 views
0

別のキュー(SQS)からのデータを格納するサービスを実装したいと考えています。私はちょうど方法プリントにメッセージサービスとしてのSpring共有キュー

のペイロードのみをテストするために、リスナー

@Component 
public class MyListener implements MessageListener{ 


    @Override 
    @JmsListener(destination = "my-queue") 
    public void onMessage(Message message) { 
     try{ 
      System.out.println(((TextMessage) message).getText()); 
     }catch (Exception e){ 
      e.printStackTrace(); 
     } 
    } 
} 

を定義したそして今、私はserviceとしてLinkedBlockingQueueに基づいて、メモリ内のキューのいくつかの並べ替えを定義したいすべての ファースト

public interface IStoreService { 
    void save(Order order); 
    String get(); 
} 

アイデアは、他のサービスは、最初のメッセージを取得し、それを使用するget()メソッドを使用することができるだろうということです。このソリューションは、サービスが何らかの情報を取得したり、通知を待つことができる単一のエンドポイントを作成することで、遅延を減らすことができると私は考えています。また、それは私の仮説であり、何かを誤解する可能性があります。

最後に質問:1人のプロデューサと複数のコンシューマに対して、このキューを正しく共有するにはどうすればよいですか?

UPD1(アプリケーションの詳細)。

Simple Queue Service(Amazon)から情報(JSONオブジェクトとしましょう)を取得するスタンドアロンアプリケーションです。このロジックはMyListenerコンポーネントに実装されています。私もOrdersを持っている別のキューを持っていて、キューはActiveMQに基づいています(ええ、アプリケーションのアーキテクチャーはうんざりです)。

そして、ここでは、ワークフローです: ActiveMQListenerは注文にActiveMQのキューから一つ一つを取り出し、私はActiveMQListenerが私Storageから最初のメッセージをつかむとのは、これら2つのオブジェクトをマージ言うようにしたいです。

唯一の問題は、SQSからデータをフェッチしてストレージに格納するMyListenerと、ActiveMQからデータをフェッチするActiveMQListenerとの間で、このストレージを共有する方法です。

StoreServiceのデモコードがあります。この問題を解決しようとしているときに、私は問題に直面

public class StoreService implements IStoreService { 

    private final Queue<Order> queue = new LinkedBlockingQueue<>(); 
    @Override 
    public void save(Order order) { 
     this.queue.add(order); 
    } 

    @Override 
    public String get() { 
     Order order = queue.poll(); 
     return order.getId(); 
    } 
} 

UPD2 。 System.out.prinlnのために、私は」receiveRequest

@JmsListener(destination = "request.queue") 
    @SendTo("response.queue") 
    public String receiveRequest(Order order){ 

     System.out.println("[INFO] REQUEST:" + order.getId()); 
     try{ 
      String test = storeService.get(); 
      System.out.println("[INFO] SQS:" + test); 

     }catch (Exception e){ 
      e.printStackTrace(); 
     } 

     return order.getId(); 
    } 

申し訳ありませんが、私もこの方法でStoreService

@Service 
public class StoreService implements IStoreService { 

    private final BlockingQueue<Order> queue = new LinkedBlockingQueue<>(); 
    @Override 
    public void save(Order order) { 
     this.queue.add(order); 
    } 

    @Override 
    public String get() { 
     Order order = null; 
     try { 
      order = queue.take(); 
      return order.getId(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     return null; 
    } 
} 

ActiveMQListenerを持って

+--------------+-----------------------------------------+ 
|request.queue | - get information from ActiveMQ   | 
|response.queue| - pass data to another ActiveMQ queue | 
|sqs.queue  | - queue based on Amazon SQS    | 
+--------------+-----------------------------------------+ 

:私は上記のように3つのキューがありますv無効化されたロガー このメソッドは、というキューから情報を取得することがわかりますもがstoreSerivce

ラインからデータを取得しようとした:String test = storeService.get()

しかし、問題は、私はそれにstoreServiceを渡す方法が分からないということです。私はそれがstoreServiceの唯一の新しいインスタンスだから、この

private StoreService storeService; 
    @Autowired 
    public ActiveMQListener Listener(StoreService storeService){ 
     this.storeService = storeService; 
    } 

そしてstoreServiceのようなものが空になっていました。

アイデア?

UPD3 また、github repositoryにもアップロードしました。ちょっと面倒ですが、構想を理解することは大丈夫だと思います。

+0

あなたの質問に関連する情報をいくつか教えてください。すでにインターフェースを定義しています。プロデューサーと消費者は、単にそれを使うことができます。あなたの質問に「分け前」とはどういう意味ですか?たとえば、リモートアクセスが必要ですか?あなたの要件は何ですか? – mm759

+0

@ mm759、UPD1セクションを追加しました。これをチェックしてください。 – Ascelhem

+0

私の答えの候補(私はあなたが必要なものを理解したいと思います):MyListenerは、データを受け取るときにStoreServiceで呼び出しを呼び出します。 ActiveMQListenerはStoreService.getを呼び出してデータを取得します。並行性の問題を避けるために、StoreServiceのメソッドにsynchronizedを追加する必要があります。私が理解していないのは、StoreService.getがIDだけを返すということです。あなたが言及した遅れを遅らせるために、他の場所からそれを取得する必要がないようにオブジェクト全体を必要としないでください。インメモリストレージを使用すると、サーバークラッシュなどの場合にデータが失われることに注意してください。 – mm759

答えて

0

StoreServiceはこのように、依存性注入を使用したスプリングにより注入することができます。

@Autowired 
private StoreService storeService; 

これは、サービスは春によって管理されている唯一のインスタンスを取得するために使用されているすべての場所で行う必要があります。

関連する問題