1
キューからメッセージを無期限に読み込み、RxとTPL DataFlow
を使用してメッセージを破棄するシステムをセットアップしました。DataFlowとRXを使用した連続データストリームは処理を停止します
何らかの理由で、数百のメッセージの後、ActionBlockの実行が停止し、理由を特定できません。 this.GetMessages()
は引き続き発砲しますが、this.ProcessMessages
は発動しません。
var source = Observable
.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
.SelectMany(x => this.GetMessages());
var actionBlock = new ActionBlock<List<QueueStream>>(
this.ProcessMessages,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
});
using (source.Subscribe(actionBlock.AsObserver()))
{
while (this.Run)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
actionBlock.Complete();
await actionBlock.Completion;
読者 - これは実際に作家
private async Task<List<QueueStream>> GetMessages()
{
var messageList = new List<QueueStream>();
var taskList = new List<Task>();
// Add up to N items in our queue
for (var i = 0; i < 25; i++)
{
var task = this
.ReadAndParseQueue()
.ContinueWith(async queueStreamTask =>
{
var queueStream = await queueStreamTask;
if (queueStream != null)
{
messageList.Add(queueStream);
}
});
taskList.Add(task);
}
await Task.WhenAll(taskList);
return messageList;
}
を実行し続けていることに注意してください - 数百のメッセージの後に、これは
private async Task ProcessMessages(List<QueueStream> streams)
{
var tasks = new List<Task>();
foreach (var queueStream in streams)
{
tasks.Add(this.ProcessMessage(queueStream));
}
await Task.WhenAll(tasks);
}
本当に[mcve]を入力する必要があります。この問題を再現するコードを貼り付けることができます。 – Enigmativity
@Chris 'ActionBlock'の状態は一度_hangs_になると、それはフォールトされている可能性があります。 'ProcessMessages'の中からスローされた例外は、' this.run'が常にセットされているとあなたのケースでは決して起こらない 'await'完了まで観察されません。 – JSteward