2016-05-05 12 views
1

私は、本質的にTopShelfを使用してホストされているコンソールアプリケーションで、Rebus 0.99.50を使用して通信するいくつかのサービスを用意しています。これらのサービスの1つ(StepManager)は、(Stepタイプの)オブジェクトの集合をループし、各インスタンスにはメッセージの送信に使用するBusインスタンスと応答を処理するためのハンドラが含まれています。このために、この例で使用される次のステップが(S)である:問題以降のRebusインスタンスからの公開

  • ReceiveFile
  • LogFileMetrics
  • ArchiveIncomingFile私の実際のシナリオで

、Iは7ステップの合計が(s)...これらのステップをループするとき、ReceiveFileとLogFileMetricsは期待通りに動作しますが、ArchiveIncomingFileが実行されるときに.Send(req)が呼び出されますが、メッセージは宛先に到達せず、それは決して戻ってこない。リスト内のオブジェクトのStepオブジェクトまたはOrderのタイプにかかわらず、これはリスト内のStep(Run()メソッドの.Send(req)を実行する)の2番目のインスタンスで一貫して発生します。しかし、私がコメントしたとき(完了!){Task.Delay(25)を待ちます。 }ステートメントでは、メッセージは送信されたように見えますが、ステートメントがなくてもステップはすべて特定の実行順序なしで実行されるため、問題です。

どうしてですか?私はここで間違っていますか/間違っていますか?そして私がしようとしていることを達成するためのよりよい選択肢がありますか?ここで

は、問題のクラスの関連する部分です:

public class StepManager 
{ 
    ... 

    public string ProcessName { get; set; } 
    public List<Step> Steps { get; set; } 
    public BuiltinHandlerActivator ServiceBus { get; set; } 

    ... 

    public async Task Init() 
    { 
     ... 

     Steps = new List<Step>(); 
     var process = Db.Processes.Include("Steps") 
           .Where(p => p.Name == ProcessName) 
           .FirstOrDefault(); 
     ... 

     foreach (var s in process.Steps) 
     { 
      var step = container.Resolve<Step>(s.Name); 

      ... 

      Steps.Add(step); 
     }  
    } 

    public async Task Run() 
    { 
     foreach (var step in Steps) 
     { 
      await step.Run(); 
     } 
    } 
} 

public class Step 
{ 
    public BuiltinHandlerActivator ServiceBus { get; set; } 

    public Step() 
    { 
     Db = new ClearStoneConfigContext(); 
     Timer = new Stopwatch(); 
     StepId = Guid.NewGuid().ToString(); 

     Completed = false; 
    } 

    public virtual async Task Run() { } 
} 

public class ReceiveFile : Step 
{ 
    public ReceiveFile() 
    { 
     ServiceBus = new BuiltinHandlerActivator(); 

     Configure.With(ServiceBus) 
       .Logging(l => l.ColoredConsole(LogLevel.Info))      
       .Routing(r => r.TypeBased().Map<ProcessLog>("stepmanager"))      
       .Transport(t => t.UseMsmq("receivefile")) 
       .Start(); 
    } 

    public override async Task Run() 
    { 
     ... 

     LogEntry.Message = "File " + FileEvent.Name + " received.";  
     await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry); 
     Completed = true;   
    } 
} 

public class LogFileMetrics : Step 
{ 
    public LogFileMetrics() 
    { 
     SubscriptionTable = "SandboxServiceBusSubscriptions"; 
     ServiceBus = new BuiltinHandlerActivator(); 

     Configure.With(ServiceBus) 
       .Logging(l => l.ColoredConsole(LogLevel.Info))      
       .Routing(r => r.TypeBased().Map<LogFileMetricsRequest>("metrics"))     
       .Transport(t => t.UseMsmq("logfilemetrics")) 
       .Start(); 

     ServiceBus.Handle<FileMetricsLogged>(async msg=> await FileMetricsLogged(msg));; 
    } 

    public override async Task Run() 
    { 
     ... 

     await ServiceBus.Bus.Send(new LogFileMetricsRequest { ProcessId = ProcessId, FileEvent = FileEvent }).ConfigureAwait(false); 

     while (!Completed) { await Task.Delay(25); }   
    } 

    private async Task FileMetricsLogged(FileMetricsLogged msg) 
    { 
     ... 

     await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry); 
     Completed = true; 
    } 
} 

public class ArchiveIncomingFile : Step 
{ 
    public ArchiveIncomingFile() 
    { 
     SubscriptionTable = "SandboxServiceBusSubscriptions"; 
     ServiceBus = new BuiltinHandlerActivator(); 

     Configure.With(ServiceBus) 
       .Logging(l => l.ColoredConsole(LogLevel.Info))      
       .Routing(r => r.TypeBased().Map<ArchiveIncomingFileRequest>("incomingarchivefilerouter"))      
       .Transport(t => t.UseMsmq("archiveincomingfile"))      
       .Start(); 

     ServiceBus.Handle<IncomingFileArchived>(async msg => await IncomingFileArchived(msg)); 
    } 

    public override async Task Run() 
    { 
     ... 

     ServiceBus.Bus.Send(req); 

     while (!Completed) { await Task.Delay(25); } 
    } 

    private async Task IncomingFileArchived(IncomingFileArchived msg) 
    { 
     ... 

     await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry); 
     Completed = true; 
    } 
} 

答えて

0

あなたが経験している変な行動を引き起こしているものを私に明らかではないが、私は、あなたのコードでいくつかの問題を見ることができます。

最初に、ステップを作成するたびに新しいバス・インスタンスを作成しているようです。 Rebusのバスインスタンスは、アプリケーションの起動時に一度作成され、シングルトンとして保持され、アプリケーションのシャットダウン時に適切に処理されなければならないことを認識していますか?

あなたはもちろん、何度でも何かを残すようなものではありませんが、あなたがバスをどこにでも置いていないという事実はあなたのアプリケーションがおそらくこれを忘れる。

Rebusのwiki、特にRebus' bus instanceのセクションで詳しく読むことができます。

public ArchiveIncomingFile() 
{ 
    SubscriptionTable = "SandboxServiceBusSubscriptions"; 
    ServiceBus = new BuiltinHandlerActivator(); 

    Configure.With(ServiceBus) 
      .Logging(l => l.ColoredConsole(LogLevel.Info))      
      .Routing(r => r.TypeBased().Map<ArchiveIncomingFileRequest>("incomingarchivefilerouter"))      
      .Transport(t => t.UseMsmq("archiveincomingfile"))      
      .Start(); 

    //<<< bus is receiving messages at this point, but there's no handler!! 

    ServiceBus.Handle<IncomingFileArchived>(async msg => await IncomingFileArchived(msg)); 
} 

あなたが見ることができるように、//<<<でマークされた(非常に非常に非常に短く、確かに)時間(あり:

もう一つの問題は、ctorのこのようになりますArchiveIncomingFileクラスの微妙な潜在的な競合状態ですこれにより、バスがまだ開始されていないため、ハンドラがまだ設定されていないメッセージが入力キューから取り出されます。

バスを起動する前に、必ずハンドラを構成する必要があります。

最後に、あなたは

を求めていると私がやろうとしています何を達成するために、より良い選択肢はありますか?

が、私は単にあなたがやろうとしているかを把握することはできませんので、私はその質問に答えることができません;)

(しかし、あなたがしようとしているどのような問題がわずかに高いレベルで私に説明場合解決する、私はあなたのためにいくつかのヒントを持っているかもしれません:))

関連する問題