この質問はに関するものです。rxpyは項目を観察可能にします
ソースオブザーバブルからのメッセージを処理するリアクティブシステムを構築しようとしています。それに加えて、私はそれを飼い犬飼い主に基づくリーダー選挙システムと統合しようとしています。
この組み合わせでは、プロセスファーム内のリーダーだけがメッセージストリームを処理できます。以下は私が構築しようとしているコードの要点です。
# event_source is an observable of messages
# manager.leaders is an observable of leader election events
# manager.followers is an observable of leader relinquish events
event_source\
.skip_until(manager.leaders)\
.take_until(manager.followers)\
.subscribe(observer)
それは罰金と、すべての作品が、私はskip_until
とtake_until
間を埋め戻し処理する部分を注入する必要があります。これはリーダープロセスの失敗とリーダーシップを前提とした別のプロセスとの潜在的なギャップに対処するように設計されています。処理されたすべてのメッセージはレコードを残して、新しいリーダーがストリームを続行する前に見つからなかったメッセージに追いつくことができるようにレコードを残します。
私はstart_with
演算子を試してみましたが、成功しませんでした。私はそれが使用されることを意図していない方法でそれに近づいていませんか?
最終的に、私が探している解決策は、別のストリームからのイベントによってトリガされた特定の数のアイテムをストリームに注入することです。
manager.leaders \
.flat_map(lambda e: event_source
.start_with(...)
.take_until(manager.followers))
たびmanager.leaders
がevent_source
がmanager.followers
発するまで、注入されたアイテムで始まる、に加入されるメッセージを発する:これについて何