2016-08-18 8 views
3

私は、以下のコード例と人々の考え方について少し不思議です。 アイデアはNetworkStream(〜20 msg/s)から読み込み、メインではなくMainframeProcessorに物事を渡して、バインドを処理して処理します。Rx経由でMailboxProcessorの結果を返すのは良い考えですか?

通常はPostAndReplyを使用していますが、C#ではListViewやその他のコントロールにバインドしたいと考えています。とにかくLastNのアイテムとフィルタリングで魔法をしなければならない。 さらに、Rxにはいくつかのエラー処理があります。

以下の例では、数字が2.1で、「hello X」が返されます。 に8のようにそれはEOFのように停止します。それ以外のスレッドは他のスレッドが終了する前に終了するためToEnumerableにしますが、Subscribeでも動作します。

  1. 再帰で周りの件名(OBJ)を渡す:私を悩ます何

    。私は3〜4人ほどの問題は見られません。良いアイデア?

  2. 件名の有効期間。

open System 
open System.Threading 
open System.Reactive.Subjects 
open System.Reactive.Linq // NuGet, take System.Reactive.Core also. 
open System.Reactive.Concurrency 

type SerializedLogger() = 

    let _letters = new Subject<string>() 
    // create the mailbox processor 
    let agent = MailboxProcessor.Start(fun inbox -> 

     // the message processing function 
     let rec messageLoop (letters:Subject<string>) = async{ 

      // read a message 
      let! msg = inbox.Receive() 

      printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId 
      do! Async.Sleep 100 
      // write it to the log  
      match msg with 
      | 8 -> letters.OnCompleted() // like EOF. 
      | x -> letters.OnNext(sprintf "hello %d" x) 

      // loop to top 
      return! messageLoop letters 
      } 

     // start the loop 
     messageLoop _letters 
     ) 

    // public interface 
    member this.Log msg = agent.Post msg 
    member this.Getletters() = _letters.AsObservable() 

/// Print line with prefix 1. 
let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId 

// Actions 
let onNext = new Action<string>(myPrint1) 
let onCompleted = new Action(fun _ -> printfn "Complete") 

[<EntryPoint>] 
let main argv = 
    async{ 
    printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId 

    // test 
    let logger = SerializedLogger() 
    logger.Log 1 // ignored? 

    let xObs = logger 
       .Getletters() //.Where(fun x -> x <> "hello 5") 
       .SubscribeOn(Scheduler.CurrentThread) 
       .ObserveOn(Scheduler.CurrentThread) 
       .ToEnumerable() // this 
       //.Subscribe(onNext, onCompleted) // or with Dispose() 

    [2..10] |> Seq.iter (logger.Log) 

    xObs |> Seq.iter myPrint1 

    while true 
     do 
     printfn "waiting" 
     System.Threading.Thread.Sleep(1000) 

    return 0 
    } |> Async.RunSynchronously // return an integer exit code 

答えて

3

私は同様のことを行って、ではなくSubjectより平易F#Eventタイプを使用しています。基本的にはIObservableを作成して購読をトリガーすることができます。これはもっと複雑なものを使用するのと同じです。Subjectイベントベースのバージョンは次のようになります。

type SerializedLogger() = 
    let letterProduced = new Event<string>() 
    let lettersEnded = new Event<unit>() 
    let agent = MailboxProcessor.Start(fun inbox -> 
    let rec messageLoop (letters:Subject<string>) = async { 
     // Some code omitted 
     match msg with 
     | 8 -> lettersEnded.Trigger() 
     | x -> letterProduced.Trigger(sprintf "hello %d" x) 
     // ... 

member this.Log msg = agent.Post msg 
member this.LetterProduced = letterProduced.Publish 
member this.LettersEnded = lettersEnded.Publish 

重要な相違点は以下のとおりです。

  • EventOnCompletedをトリガすることはできませんので、私が代わりに二つの別々のイベントを露呈しました。これは非常に残念です! Subjectが他のすべての側面のイベントと非常によく似ていることを考えると、これはプレーンなイベントの代わりにサブジェクトを使用する正当な理由です。

  • Eventを使用すると、それは標準のF#タイプであるため、エージェントに外部依存関係は必要ありません。

  • Logへの最初の呼び出しが無視されたことに気づいたことに感謝しました。これは、この呼び出しが発生した後にのみイベントハンドラを購読するためです。私はあなたがここでReplaySubject variation on the Subject ideaを使うことができると思う - それはあなたがそれを購読するときにすべての出来事をリプレイするので、以前に起こったものは失われないでしょう(しかし、キャッシュするコストがあります)。要約すると

、私はSubjectを使用すると、おそらく良いアイデアだと思う - それは、基本的に(私はエージェントから通知をさらすのはかなり標準的な方法であると考えている)Eventを使用するのと同じパターンであるが、それはあなたがOnCompletedをトリガーすることができます。キャッシングコストのために私はたぶんReplaySubjectを使用しないでしょう - イベントをトリガーする前に必ず購読する必要があります。

関連する問題