2017-04-20 6 views
1

キューからメッセージを無期限に読み込み、RxとTPL DataFlowを使用してメッセージを破棄するシステムをセットアップしました。DataFlowとRXを使用した連続データストリームは処理を停止します

何らかの理由で、数百のメッセージの後、ActionBlockの実行が停止し、理由を特定できません。 this.GetMessages()は引き続き発砲しますが、this.ProcessMessagesは発動しません。

var source = Observable 
    .Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)) 
    .SelectMany(x => this.GetMessages()); 

var actionBlock = new ActionBlock<List<QueueStream>>(
    this.ProcessMessages, 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = Environment.ProcessorCount * 2, 
    }); 

using (source.Subscribe(actionBlock.AsObserver())) 
{ 
    while (this.Run) 
    { 
     await Task.Delay(TimeSpan.FromSeconds(1)); 
    } 
} 

actionBlock.Complete(); 
await actionBlock.Completion; 

読者 - これは実際に作家

private async Task<List<QueueStream>> GetMessages() 
{ 
    var messageList = new List<QueueStream>(); 
    var taskList = new List<Task>(); 

    // Add up to N items in our queue 
    for (var i = 0; i < 25; i++) 
    { 
     var task = this 
      .ReadAndParseQueue() 
      .ContinueWith(async queueStreamTask => 
       { 
        var queueStream = await queueStreamTask; 
        if (queueStream != null) 
        { 
         messageList.Add(queueStream); 
        } 
       }); 

     taskList.Add(task); 
    } 

    await Task.WhenAll(taskList); 

    return messageList; 
} 

を実行し続けていることに注意してください - 数百のメッセージの後に、これは

private async Task ProcessMessages(List<QueueStream> streams) 
{ 
    var tasks = new List<Task>(); 
    foreach (var queueStream in streams) 
    { 
     tasks.Add(this.ProcessMessage(queueStream)); 
    } 

    await Task.WhenAll(tasks); 
} 
+0

本当に[mcve]を入力する必要があります。この問題を再現するコードを貼り付けることができます。 – Enigmativity

+0

@Chris 'ActionBlock'の状態は一度_hangs_になると、それはフォールトされている可能性があります。 'ProcessMessages'の中からスローされた例外は、' this.run'が常にセットされているとあなたのケースでは決して起こらない 'await'完了まで観察されません。 – JSteward

答えて

1

を打た停止するよろしいですあなたのsourceはこのケースで稼動し続けますか?実際に新しいmesasgesを受け入れるようにactionBlockを防ぐ

actionBlock.Complete(); 
await actionBlock.Completion; 

ので、ProcessMessages:エラーが発生した場合、またはthis.Runは、それが停止し、設定解除され、その後、あなたがこれらの行を持っている場合は、そこにあなたのコード内の無限ループがあるが、メッセージは単に無視されるため、決して呼び出されません。

関連する問題