2012-02-03 7 views
0

2つの質問があります。以下はシナリオです -アプリケーションがオフラインのときにエンキューされたメッセージをデキューする方法(Oracle Advanced Queue)

プロセスAとプロセスBの2つの異なるプロセスがあります。 プロセスAエンキューはメッセージキューにあります。 プロセスBは、メッセージキューからメッセージをデキューします。

1)プロセスBはしばらくシャットダウンが、プロセスAはキューにメッセージをエンキューし続けます。プロセスBがライブに戻ると、プロセスBがオフラインのときにプロセスAによってポストされたメッセージキュー内のメッセージをデキューする方法は?メッセージをデキューするための1件の以上のプロセスBが必要であるように、私が使用してい

2)キューは、複数のコンシューマ・キューです。この設計の背後にある理由は、プロセスBの1つが死んでも、他のプロセスBが依然としてメッセージを処理し続ける可能性があるからです。同時に、プロセスBの1つのインスタンスがメッセージをピックアップした場合、他のプロセスBにメッセージを処理しないように通知する必要があります。

私は、任意のサンプルを見つけるcoudn't。どんな助けでも大歓迎です。

答えて

0

私はちょうど似たような要件のプロジェクトを完了しました。

問題1) 定期的に実行するWCF Restfulサービスを呼び出すWindowsサービスタイマーを作成しました。その後、WCFサービスは、すべてのエンキューされたメッセージをデキューします(各呼び出しで最大500個のメッセージ)。エンキューされたものはすべて自動的に処理されるはずです。このタイマーが再起動されても停止しても、中止された場所をピックアップします。

問題2)OracleからCouchBaseにデータを複製していたので、プロセスが開始されたときのタイムスタンプとCouchBaseの保存済みデータのタイムスタンプがありました。セーブ。 (これは競争条件を管理するためのものでした)。

私はまた、何かがエンキューされたときにIDとエンキューされた時間を2番目のテーブルにコピーするトリガを持っていました。定期的にこの2番目のテーブルがチェックされ、アイテムがキューテーブル内でデキューされているが、WCFサービスによって特定の時間枠内にこれを反映するように2番目のテーブルが更新されていない場合、プロセスで何かが失敗したとしてデータを再エンキューする。

ここでは、odp.netを使用したwcfの快適なサービスの例が参考になります。

OracleAQQueue _queueObj; 
OracleConnection _connObj; 
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString(); 
_connObj = new OracleConnection(_connString); 
_queueObj = new OracleAQQueue("QUEUENAME", _connObj); 
_connObj.Open(); 

    int i = 0; 
    bool messageAvailable = true; 

    while (messageAvailable && i < 500) 
    { 
    OracleTransaction _txn = _connObj.BeginTransaction(); 
    //Makes dequeue part of transaction 
    _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit; 
    _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME" 
    try 
    { 
     //Wait number of seconds for dequeue, default is forever 
     _queueObj.DequeueOptions.Wait = 2; 
     _queueObj.MessageType = OracleAQMessageType.Raw; 
     _queueObj.DequeueOptions.ProviderSpecificType = true; 
     OracleAQMessage _depMsq = _queueObj.Dequeue(); 
     var _binary = (OracleBinary)_depMsq.Payload; 
     byte[] byteArray = _binary.Value; 
     _txn.Commit(); 
    } 
    catch (Exception ex) 
    { 
     //This catch will always fire when all messages have been dequeued 
     messageAvailable = false; 
     if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1) 
      { 
      //Actual error present. 
      log.Info("Problem occurred during dequeue process : " + ex.Message); 
      } 
    } 
    } 

    _queueObj.Dispose(); 
    _connObj.Close(); 
    _connObj.Dispose(); 
    _connObj = null;