いくつかのシナリオの私は、すべてのメッセージの処理を完了するために、REBUSを待つために、次のアプローチを使用するために願っています。リバースエンドポイントは別々のexeでホストされ、rebusファイルシステムトランスポートは統合テストに使用されます(通常Azure SBです)。統合テストはexeファイルをスピンアップし、各exeファイルにRebusは0人の従業員で構成されているので、何もしません。次に、テストでは、処理するメッセージがなくなるまで、ワーカーとブロックの数を設定するWaitForMessagesProcessed()メソッドがあります。ここで
は、それが大体のコードでどのように見えるかです:これは完璧な解決策になるため
public class MessageProcessor() {
private string queueName;
private int messagesWaitingForProcessing;
private IBus bus;
public MessageProcessor(string queueName) {
this.queueName = queueName;
this.bus = Configure.With(adapter)
.Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName))
.Options(o =>
{
o.SetNumberOfWorkers(0);
})
.Events(e =>
{
e.BeforeMessageSent += (thebus, headers, message, context) =>
{
// When sending to itself, the message is not queued on the network.
var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>();
if (m.Any(t => t == this.queueName))
this.messagesWaitingForProcessing++;
};
e.AfterMessageHandled += (thebus, headers, message, context, args) =>
{
this.messagesWaitingForProcessing--;
};
})
.Start();
}
public async Task WaitForMessagesProcessed()
{
this.DetermineMessagesWaitingForProcessing();
while (this.messagesWaitingForProcessing > 0)
{
this.bus.Advanced.Workers.SetNumberOfWorkers(2);
while (this.messagesWaitingForProcessing > 0)
{
await Task.Delay(100);
}
this.bus.Advanced.Workers.SetNumberOfWorkers(0);
this.DetermineMessagesWaitingForProcessing();
}
}
public void DetermineMessagesWaitingForProcessing() {
this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName), "*.rebusmessage.json").Count();
}
private static string GetDirectoryForQueueNamed(string queueName)
{
return Path.Combine(this.baseDiretory, queueName);
}
}
テストでは、拡張メソッドを作成
[TestMethod]
public void Test() {
var endpoint1 = LaunchExe("1");
var endpoint2 = LaunchExe("2");
endPoint1.DoSomeAction();
endPoint1.WaitForMessagesProcessed();
Assert.AreEqual("expectation", endPoint1.Query());
}
のようである可能性があります。どうもありがとう! –