2017-12-26 8 views
2

オープンソースのライブラリDataflowExを次のDataflow宣言で使用しようとしています。 AAAA 18 | 54.3773:32:32:私はDataflowExが完了しない

var requestClientFlow = new RequestClientFlow(this); 
requestClientFlow.Post(""); 
requestClientFlow.Complete(); 
await requestClientFlow.InputBlock.Completion; 

が完了すると、私の出力が表示さ

18のようにそれを消費

class RequestClientFlow :Dataflow<string>{ 
    private readonly ILogger _logger; 
    private readonly Dataflow<string, WebProxy> _webproxyDataflow; 
    private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow; 

    public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){ 
     _logger = logger; 
    } 

    public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow; 

    public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){ 
     _webproxyDataflow = new TransformBlock<string,WebProxy>(s => { 
      _logger.WriteLine("aaaa"); 
      return new WebProxy(); 
     }).ToDataflow(); 
     _httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => { 
      _logger.WriteLine("bbbb"); 
      return new HttpClient(); 
     }).ToDataflow(); 
     _webproxyDataflow.LinkTo(_httpClientDataflow); 
     RegisterChild(_webproxyDataflow); 
     RegisterChild(_httpClientDataflow); 
    } 

    public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock; 
} 

54.3773 | BBBB

1合格、失敗した0、スキップした、1.45秒(xUnit.net 2.3.1 ビルド3858)。しかし、理解して私の

は、私もそれが完了しない

requestClientFlow.Complete(); 
    await requestClientFlow.CompletionTask; 

あるいは

await requestClientFlow.SignalAndWaitForCompletionAsync(); 

を使用することができるはずなフレームワークのドキュメントからです。誰かが私が間違っていることを理解するのを助けてくれますか?

答えて

1

最後のブロックがTransformBlockであるため、フローを完了できません。あなたの最初の例ではawait入力完了実際に完了するブロック。 出力ブロックは、出力バッファ内の項目がどこにもないため、完了できません。 DataflowExライブラリは、フローの最後のブロックで正しくawaitingです。最後にActionBlockまたはNullTargetを追加して完成を実現できます。

DataflowExの面では、最終的な流れは、ライブラリのショーのためのgithubページ

public interface IDataflow<in TIn> : IDataflow 
{ 
    ITargetBlock<TIn> InputBlock { get; } 
} 

そして、一例としてのimplmenetingする必要があります。答えを

public class AggregatorFlow : Dataflow<string> 
{ 
    //...// 

    public AggregatorFlow() : base(DataflowOptions.Default) 
    { 
     _splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s)); 
     _dict = new Dictionary<string, int>(); 

     //***Note The ActionBlock here*** 
     _aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p)); 

     //Block linking 
     _splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true }); 

     /* IMPORTANT */ 
     RegisterChild(_splitter); 
     RegisterChild(_aggregater); 
    } 

    //...// 
} 
+0

のHMM thnksまだ入手困難それ。 ActionBlockを使用すると、変換されたオブジェクトを別の再利用可能なコンポーネントまたはデータフローの権利に公開することはできません。 DataflowBlock.NullTargetオプションをデモできますか? –

+2

'ActionBlock'と' NullTarget'のどちらも、生成された 'HttpClient'を別のフローに流すことはできません。それがあなたの要件なら、あなたの流れは 'Dataflow 'でなければなりません。あなたのテストでは、 'DataflowEx'から' LinkLeftToNull() 'を使ってそのフローをリンクすることができます。 – JSteward

+0

この例では、基になるブロックで 'ToDataflow()'を呼び出さず、内部ブロック間で 'DataflowLinkOptions 'を使って補完を明示的に伝播することにも注意してください([#5高度なリンク] (){PropagateCompletion = true} ' – JSteward

関連する問題