私は、本質的にTopShelfを使用してホストされているコンソールアプリケーションで、Rebus 0.99.50を使用して通信するいくつかのサービスを用意しています。これらのサービスの1つ(StepManager)は、(Stepタイプの)オブジェクトの集合をループし、各インスタンスにはメッセージの送信に使用するBusインスタンスと応答を処理するためのハンドラが含まれています。このために、この例で使用される次のステップが(S)である:問題以降のRebusインスタンスからの公開
- ReceiveFile
- LogFileMetrics
- ArchiveIncomingFile私の実際のシナリオで
、Iは7ステップの合計が(s)...これらのステップをループするとき、ReceiveFileとLogFileMetricsは期待通りに動作しますが、ArchiveIncomingFileが実行されるときに.Send(req)が呼び出されますが、メッセージは宛先に到達せず、それは決して戻ってこない。リスト内のオブジェクトのStepオブジェクトまたはOrderのタイプにかかわらず、これはリスト内のStep(Run()メソッドの.Send(req)を実行する)の2番目のインスタンスで一貫して発生します。しかし、私がコメントしたとき(完了!){Task.Delay(25)を待ちます。 }ステートメントでは、メッセージは送信されたように見えますが、ステートメントがなくてもステップはすべて特定の実行順序なしで実行されるため、問題です。
どうしてですか?私はここで間違っていますか/間違っていますか?そして私がしようとしていることを達成するためのよりよい選択肢がありますか?ここで
は、問題のクラスの関連する部分です:
public class StepManager
{
...
public string ProcessName { get; set; }
public List<Step> Steps { get; set; }
public BuiltinHandlerActivator ServiceBus { get; set; }
...
public async Task Init()
{
...
Steps = new List<Step>();
var process = Db.Processes.Include("Steps")
.Where(p => p.Name == ProcessName)
.FirstOrDefault();
...
foreach (var s in process.Steps)
{
var step = container.Resolve<Step>(s.Name);
...
Steps.Add(step);
}
}
public async Task Run()
{
foreach (var step in Steps)
{
await step.Run();
}
}
}
public class Step
{
public BuiltinHandlerActivator ServiceBus { get; set; }
public Step()
{
Db = new ClearStoneConfigContext();
Timer = new Stopwatch();
StepId = Guid.NewGuid().ToString();
Completed = false;
}
public virtual async Task Run() { }
}
public class ReceiveFile : Step
{
public ReceiveFile()
{
ServiceBus = new BuiltinHandlerActivator();
Configure.With(ServiceBus)
.Logging(l => l.ColoredConsole(LogLevel.Info))
.Routing(r => r.TypeBased().Map<ProcessLog>("stepmanager"))
.Transport(t => t.UseMsmq("receivefile"))
.Start();
}
public override async Task Run()
{
...
LogEntry.Message = "File " + FileEvent.Name + " received.";
await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry);
Completed = true;
}
}
public class LogFileMetrics : Step
{
public LogFileMetrics()
{
SubscriptionTable = "SandboxServiceBusSubscriptions";
ServiceBus = new BuiltinHandlerActivator();
Configure.With(ServiceBus)
.Logging(l => l.ColoredConsole(LogLevel.Info))
.Routing(r => r.TypeBased().Map<LogFileMetricsRequest>("metrics"))
.Transport(t => t.UseMsmq("logfilemetrics"))
.Start();
ServiceBus.Handle<FileMetricsLogged>(async msg=> await FileMetricsLogged(msg));;
}
public override async Task Run()
{
...
await ServiceBus.Bus.Send(new LogFileMetricsRequest { ProcessId = ProcessId, FileEvent = FileEvent }).ConfigureAwait(false);
while (!Completed) { await Task.Delay(25); }
}
private async Task FileMetricsLogged(FileMetricsLogged msg)
{
...
await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry);
Completed = true;
}
}
public class ArchiveIncomingFile : Step
{
public ArchiveIncomingFile()
{
SubscriptionTable = "SandboxServiceBusSubscriptions";
ServiceBus = new BuiltinHandlerActivator();
Configure.With(ServiceBus)
.Logging(l => l.ColoredConsole(LogLevel.Info))
.Routing(r => r.TypeBased().Map<ArchiveIncomingFileRequest>("incomingarchivefilerouter"))
.Transport(t => t.UseMsmq("archiveincomingfile"))
.Start();
ServiceBus.Handle<IncomingFileArchived>(async msg => await IncomingFileArchived(msg));
}
public override async Task Run()
{
...
ServiceBus.Bus.Send(req);
while (!Completed) { await Task.Delay(25); }
}
private async Task IncomingFileArchived(IncomingFileArchived msg)
{
...
await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry);
Completed = true;
}
}