私は、以下のコード例と人々の考え方について少し不思議です。 アイデアはNetworkStream(〜20 msg/s)から読み込み、メインではなくMainframeProcessorに物事を渡して、バインドを処理して処理します。Rx経由でMailboxProcessorの結果を返すのは良い考えですか?
通常はPostAndReplyを使用していますが、C#ではListViewやその他のコントロールにバインドしたいと考えています。とにかくLastNのアイテムとフィルタリングで魔法をしなければならない。 さらに、Rxにはいくつかのエラー処理があります。
以下の例では、数字が2.1で、「hello X」が返されます。 に8のようにそれはEOFのように停止します。それ以外のスレッドは他のスレッドが終了する前に終了するためToEnumerableにしますが、Subscribeでも動作します。
- 再帰で周りの件名(OBJ)を渡す:私を悩ます何
。私は3〜4人ほどの問題は見られません。良いアイデア?
- 件名の有効期間。
open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency
type SerializedLogger() =
let _letters = new Subject<string>()
// create the mailbox processor
let agent = MailboxProcessor.Start(fun inbox ->
// the message processing function
let rec messageLoop (letters:Subject<string>) = async{
// read a message
let! msg = inbox.Receive()
printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
do! Async.Sleep 100
// write it to the log
match msg with
| 8 -> letters.OnCompleted() // like EOF.
| x -> letters.OnNext(sprintf "hello %d" x)
// loop to top
return! messageLoop letters
}
// start the loop
messageLoop _letters
)
// public interface
member this.Log msg = agent.Post msg
member this.Getletters() = _letters.AsObservable()
/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId
// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")
[<EntryPoint>]
let main argv =
async{
printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId
// test
let logger = SerializedLogger()
logger.Log 1 // ignored?
let xObs = logger
.Getletters() //.Where(fun x -> x <> "hello 5")
.SubscribeOn(Scheduler.CurrentThread)
.ObserveOn(Scheduler.CurrentThread)
.ToEnumerable() // this
//.Subscribe(onNext, onCompleted) // or with Dispose()
[2..10] |> Seq.iter (logger.Log)
xObs |> Seq.iter myPrint1
while true
do
printfn "waiting"
System.Threading.Thread.Sleep(1000)
return 0
} |> Async.RunSynchronously // return an integer exit code