2017-08-13 8 views
1

私はRxCPPを使用しており、その動作を理解するのが難しいです。RxCPPはRx.Netと異なる動作をします

ここにはRx.NetとRxCPPの2つのプログラムがあります。 同じ印刷物を出力すると思われますが、同じ印刷物は出力しません。
プログラムはマウスストリームからポイントを取得し、ポイント間のデルタのストリームを計算します。
マウスはポイントのストリームのストリームであり、すべてのストローク - から上から上へ押すことは1ストリームです。マウスはそのようなストリームを次々と提供します。これらの試験で予想される出力は


デルタない0ではありません:0,0
デルタ何1ではありません:5,0
デルタNO 2である:0,5
デルタなし3:2,3
これはRx.Netが出力するものです。
Rx.Cppは最初の行だけを出力します。デルタ0は:0,0

何か考えてください。

Rx.Cpp例:

#include <rx.hpp> 
    namespace rx = rxcpp; 
    namespace rxsub = rxcpp::subjects; 
    using rxob = rx::observable<>; 

    struct Point 
    { 
     Point(int x, int y) : x(x), y(y) {} 

     int x = 0, y = 0; 
     Point operator-() const { return {-x, -y}; } 
     Point operator+(const Point& other) const { return Point{x + other.x, y + other.y}; } 
     Point operator-(const Point& other) const { return operator+(-other); } 
    }; 

    std::ostream& operator<<(std::ostream& o, const Point& p) 
    { 
     return o << "(" << p.x << ", " << p.y << ")"; 
    } 

    void TestRxCPP() 
    { 
     using RxPoint = rx::observable<Point>; 
     using Strokes = rx::observable<RxPoint>; 
     using StrokesSubject = rxsub::subject<RxPoint>; 

     StrokesSubject mouseSource; 
     auto strokes = mouseSource.get_observable(); 

     auto deltaVectors = [](Strokes strokes) { 
     auto deltas = strokes.flat_map([=](RxPoint stroke) { 
      auto points = stroke; 
      // create stream of delta vectors from start point 
      auto firstPoint = points.take(1); 
      auto delta = 
       points.combine_latest([](Point v0, Point v1) { return v0 - v1; }, firstPoint); 
      return delta; 
     }); 

     return deltas; 
     }; 

     auto delta = deltaVectors(strokes); 
     int n = 0; 
     delta.subscribe(
     [&](const Point& d) { std::cout << "Delta no. " << n++ << " is: " << d << std::endl; }); 

     auto testMouse = rxob::from(Point{3 + 0, 4 + 0}, Point{3 + 5, 4 + 0}, Point{3 + 0, 4 + 5}, Point{3 + 2, 4 + 3}); 
     mouseSource.get_subscriber().on_next(testMouse); 
    } 

Rx.Net例:

void RxNET() 
    { 
     var strokesS = new Subject<IObservable<Point>>(); 

     Func<IObservable<IObservable<Point>>, IObservable<Point>> 
     deltaVectors = strokes => 
     { 
      var deltas = strokes.SelectMany(stroke => 
      { 
       var points = stroke; 
       // create stream of delta vectors from start point 
       var firstPoint = points.Take(1); 
       var deltaP = 
        points.CombineLatest(firstPoint, (v0, v1) => new Point(v0.X - v1.X, v0.Y - v1.Y)); 
       return deltaP; 
      }); 

      return deltas; 
     }; 

     var delta = deltaVectors(strokesS); 
     var n = 0; 
     delta.Subscribe(d => { Console.WriteLine($"Delta no {n++} is: {d}\n"); }); 

     var testMouse = new List<Point> 
     { 
      new Point(3 + 0, 4 + 0), 
      new Point(3 + 5, 4 + 0), 
      new Point(3 + 0, 4 + 5), 
      new Point(3 + 2, 4 + 3) 
     }.ToObservable(); 
     strokesS.OnNext(testMouse); 
    } 
+0

を逆転場合

COLDとHOTソースが動作しますか? –

+0

非常に難しい(私のために)rxcppライブラリをデバッグする、その非常にハイエンドのC++ :-( – ShaulF

答えて

0

Thanks to @Kirk Shoop at the rxcpp github :-)
このHOTvCOLD挙動です。

ストロークはCOLDで共有され、1つのスレッドのみが使用されます。 points.combine_latest(..., firstPoint)は、firstPointが購読される前にすべてのポイントが送信されることを意味します。したがって、最後のデルタだけが放出される。デバッガを使用または機能性の2枚が発散場所を見つけるためにトレースステートメントを含めてみましたあなたはcombine_latest

auto delta = 
    firstPoint.combine_latest([](Point v0, Point v1) { return v1 - v0; }, points); 
関連する問題