2017-02-17 7 views
0

PythonでReactiveXを使用すると、Observablesのストリームをどのように要約できますか?reactx PythonでObservablesを要約

{"user": "..."、 "date":...}の辞書があります。私は、各ユーザの最新の日付で辞書を累積し、ストリームの終わりがヒットしたときに累積された観測値を出力する関数を作成したい(maxのようだが、ユーザフィールドを見る必要があり、値)。

例 - 入力ストリーム:

{ "user": "a", "date": "2017-02-14" } 
{ "user": "b", "date": "2016-01-01" } 
{ "user": "c", "date": "2015-01-01" } 
{ "user": "a", "date": "2017-01-01" } 
{ "user": "b", "date": "2017-01-01" } 

の予想される出力(順番は関係ありません)

{ "user": "a", "date": "2017-02-14" } 
{ "user": "c", "date": "2015-01-01" } 
{ "user": "b", "date": "2017-01-01" } 

私は " "観測を組み合わせる" "フィルタリング観測"、 "形質転換観測" を、読み、 Decision Tree of Observable Operators "をhttps://ninmesara.github.io/RxPY/api/operators/index.htmlに変更し、reduce/aggregate(最後に単一の値を出す)とflat_map(ストリームの終わりを検出する方法を知らない)を調べました。 many_selectとwindow(特に)は有望ですが、私はそれらを理解するのに苦労しています。

どのように私は(どちらかの既存事業者のいずれかを使用して、または[私はまだやるのか分からない]カスタム演算子を作ることによって?)

答えて

0

が、私は以下のかもしれないと思うRXでこれを行うことができますあなたが望むことをやりなさい。

import rx 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .group_by(lambda x: x['user']) \ 
     .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \ 
     .subscribe(print) 
+0

すごいが得られます。これは料理本の品質です。 –

0

ハンスの答えは近いですが、ちょっとした調整が必要です。

私のオブザーバーは{ 'user': ..., 'date': }辞書得ることを期待:

import rx 

def pp1(x): 
    print type(x), x 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .map(lambda x: x[0]) \ 
     .subscribe(pp1) 

利回り要約を含む長さ1のリストを取得終わりにオブザーバーで.group_byと.flat_map結果を行う

<type 'dict'> {'date': '2017-02-14', 'user': 'a'} 
<type 'dict'> {'date': '2016-01-01', 'user': 'b'} 
<type 'dict'> {'date': '2015-01-01', 'user': 'c'} 
<type 'dict'> {'date': '2017-01-01', 'user': 'a'} 
<type 'dict'> {'date': '2017-01-01', 'user': 'b'} 

を、単に要約の代わりに。

import rx 

def pp1(x): 
    print type(x), x 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .group_by(lambda x: x['user']) \ 
     .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \ 
     .subscribe(pp1) 

利回りマップを追加するために必要な

<type 'list'> [{'date': '2017-02-14', 'user': 'a'}] 
<type 'list'> [{'date': '2017-01-01', 'user': 'b'}] 
<type 'list'> [{'date': '2015-01-01', 'user': 'c'}] 

import rx 

def pp1(x): 
    print type(x), x 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .group_by(lambda x: x['user']) \ 
     .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \ 
     .map(lambda x: x[0]) \ 
     .subscribe(pp1) 

期待

<type 'dict'> {'date': '2017-02-14', 'user': 'a'} 
<type 'dict'> {'date': '2017-01-01', 'user': 'b'} 
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}