2

Observableを作成して、さまざまなソース(他のObservables)から情報を収集する必要があります。各ソースはイベント値に影響しますが、値は前の値(状態マシンの種類)。初期化値と、フィードバックを伴うObservablesのマージ

class Message{ 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 
} 

そして、このような値のいくつかのソース:

は、我々はint型の値とオペレーションコードとメッセージを持っている

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")); 

は、今イベントのソースがあります。これらのソースは、dynamicMessageのprevios値に基づいて、新しいdynamicMessage値が表示される値を出力します。実際には、イベントの種類は次のとおりです。

class Changer1 { 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 
} 


class Changer2 { 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 
} 

変更操作のためのチェンジャー1です。チェンジャー2は変化の価値を応答する。これらの値の

と情報源:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+")) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Changer2> changers2 = Observable.just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2)) 
       .delay(2, TimeUnit.SECONDS)); 

は今、私はdynamicMessage観察可能な変化とチェンジャーから尊敬メッセージに取る必要があります。ここで図は次のとおりです。 state machine

また、私は、私は解決策を参照してくださいどのように、プログラムを書いしようとするが、それはrigth最初の値の後に、OutOfMemoryException例外でハング:
メッセージ{値= 1、操作=「+」}
リスト:

import rx.Observable; 
import rx.functions.Action1; 
import rx.functions.Func1; 

public class RxDilemma { 


static class Message{ 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + 
       "value=" + value + 
       ", operation='" + operation + '\'' + 
       '}'; 
    } 
} 

static class Changer1 { 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Changer1{" + 
       "operation='" + operation + '\'' + 
       '}'; 
    } 
} 


static class Changer2 { 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "Changer2{" + 
       "value=" + value + 
       '}'; 
    } 
} 





static Observable<Changer1> changers1 = Observable.just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+")) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Changer2> changers2 = Observable.just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2)) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() { 
    @Override 
    public Observable<Message> call(final Changer1 changer) { 
     return dynamicMessage.last().map(new Func1<Message, Message>() { 
      @Override 
      public Message call(Message message) { 
       message.operation = changer.operation; 
       return message; 
      } 
     }); 
    } 
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() { 
    @Override 
    public Observable<Message> call(final Changer2 changer2) { 
     return dynamicMessage.last().map(new Func1<Message, Message>() { 
      @Override 
      public Message call(Message message) { 
       if("+".equalsIgnoreCase(message.operation)){ 
        message.value = message.value+changer2.value; 
       } else if("-".equalsIgnoreCase(message.operation)){ 
        message.value = message.value-changer2.value; 
       } 
       return message; 
      } 
     }); 
    } 
})); 

public static void main(String[] args) throws InterruptedException { 

    dynamicMessage.subscribe(new Action1<Message>() { 
     @Override 
     public void call(Message message) { 
      System.out.println(message); 
     } 
    }); 


    Thread.sleep(5000000); 
} 
} 

だから私の予想outpurが何であるか:

メッセージ{値= 1、操作= '+'}
メッセージを{値= 1、操作= ' - '}
メッセージ{値= -1、操作= ' - '}
メッセージを{値= 1、操作= '+'}
メッセージ{値= 2、操作= '+'}また

IがCombineLatestですべてをマージしようとしているのですが、その後、CombineLatestはどの要素が変更されたのかわかりません。この情報がなければ、メッセージの変更を行う必要はありません。 助けてください?言い換えれば

答えて

1

、あなたは状態の変化を反映するためにスキャンあなたの行動を減らすにしたいです。 reducescanという名前のオペレータがあります。これはあなたが求めているものです。それは来るすべてのイベントを取り、あなたの状態になるその変換の前回の結果を加えてそれを変換させます。

interface Changer { 
    State apply(State state); 
} 

class ValueChanger implements Changer { 
    ... 
    State apply(State state){ 
     // implement your logic of changing state by this action and return a new one 
    } 
    ... 
} 


class OperationChanger implements Changer { 
    ... 
    State apply(State state){ 
     // implement your logic of changing state by this action and return a new one 
    } 
    ... 
} 

Observable<ValueChanger> valueChangers 
Observable<OperationChanger> operationChangers 

Observable.merge(valueChangers, operationChangers) 
    .scan(initialState, (state, changer) -> changer.apply(state)) 
    .subscribe(state -> { 
     // called every time a change happens 
    }); 

私はあなたの命名をちょっと変えてより意味を分かりました。また、ここではJava8ラムダ式を使用します。あなたのプロジェクトでそれらを使用できない場合は、匿名のクラスのためにそれらを交換してください。

EDIT:それはかなりうまくいけば、あなたのニーズを十分にいくつかの変更を行っただけで、最終的な結果

+0

この関数は正しい最終値を返しますが、中間値はありません。 'subscribe'は全てのチェンジャーが作業を終了したときだけ起動します。 – msangel

+0

'valueChangers = Observable.interval(1、TimeUnit.SECONDS).map(aLong - >新しいValueChanger(1))'このハングを永遠に引き起こします。 – msangel

+0

申し訳ありませんが、私はしばらくのうちに還元を使用していません。しかし、同じパラメータで 'scan'を' reduce'するだけで、期待どおりに動作します。すぐに私の答えを編集します – koperko

1

発光よりも、道に沿ってすべての結果を提供して代わりにreducescan演算子を使用します。ご覧になり結果を取得してください。

static class Message { 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + "value=" + value + ", operation='" + operation 
       + '\'' + '}'; 
    } 

    public Message applyChange(Object changer){ 
     if(changer instanceof Changer1){ 
      this.operation = ((Changer1)changer).operation; 
     }else if(changer instanceof Changer2){ 
      int v2 = ((Changer2)changer).value; 
      if("+".equals(operation)){ 
       value += v2; 
      }else if("-".equals(operation)){ 
       value -= v2; 
      } 
     } 
     return this; 
    } 
} 

static class Changer1{ 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Changer1{" + "operation='" + operation + '\'' + '}'; 
    } 
} 

static class Changer2{ 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "Changer2{" + "value=" + value + '}'; 
    } 
} 

static Observable<? extends Object> changers1 = Observable 
     .just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS) 
     .concatWith(
       Observable.just(new Changer1("+")).delay(2, 
         TimeUnit.SECONDS)); 

static Observable<? extends Object> changers2 = Observable 
     .just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS) 
     .concatWith(
       Observable.just(new Changer2(2)).delay(2, TimeUnit.SECONDS)); 


static Observable<Message> dynamicMessage = Observable 
     .just(new Message(1, "+")) 
     .flatMap(m -> ((Observable<Object>)changers1).mergeWith((Observable<Object>)changers2).map(c -> m.applyChange(c))); 

public static void main(String[] args) throws InterruptedException { 
    dynamicMessage.toBlocking().subscribe(v -> System.out.println(v)); 
} 

変更:

MessageChangerが放出されるたびに呼び出される新しいメソッドを持っているが、それはMessageオブジェクトの値を変更します。これは実際にはinit値である1つのMessageオブジェクトしかないために使用されます。

public Message applyChange(Object changer){ 
    if(changer instanceof Changer1){ 
     this.operation = ((Changer1)changer).operation; 
    }else if(changer instanceof Changer2){ 
     int v2 = ((Changer2)changer).value; 
     if("+".equals(operation)){ 
      value += v2; 
     }else if("-".equals(operation)){ 
      value -= v2; 
     } 
    } 
    return this; 
} 

dynamicMessage必要に応じて変更される -

static Observable<Message> dynamicMessage = Observable 
    .just(new Message(1, "+")) 
    .flatMap(m -> ((Observable<Object>)changers1) 
     .mergeWith((Observable<Object>)changers2) 
     .map(c -> m.applyChange(c))); 
+0

これを試しました。できます。私は思考の中でどのように、そしてなぜ、5分を費やさなければならなかったのですか?***。短い結論:それは動作しますが、最初のflatMap関数は1つの値しか受け取らないため、ネストされたオブザーバブル内のすべての操作は同じオブジェクトに対して物理的に実行されます。 'applyChange'関数を同期させると、私の状態が破壊されるのを防ぐことができます。 2番目の答えが働いているようですので、私はconnect noneとマークします。 (しかし、これを読んだ人は、彼らが受け入れた別の投票を答えます)。ありがとうございました。 – msangel

0

[OK]を、私はここで2つの答えを持っています。そのうちの1人は働いていますが、手作業による同時実行を制御するという設計上の問題があります。デザインによって動作しない別のもの(おそらくreduceの演算子をいくつか操作した後に、すべての演算子ではなく、各組のデータで動作させることができます)。しかし、私はよりエレガントでより正確な解決策を見つけたので、私はここで自分の答えを掲示しています。

しかし、答えの前に、質問を見てください。なぜそれがハングアップしますか?答えは:演算子last()がブロックされているためです。だから私はそれを非ブロックにする必要があります。別の回答に基づいてここに:https://stackoverflow.com/a/37015509/449553、私はBehaviorSubjectクラスを使用することができます。ここで

が私の作業コードがあります:

final BehaviorSubject<Message> subject = BehaviorSubject.create(new Message(1, "+")); 
    Observable<Message> observable = Observable.<Changer>merge(changers1, changers2).map(new Func1<Changer, Message>() { 
     @Override 
     public Message call(Changer changer) { 
      Message value = subject.getValue(); 
      Message v = value.applyChange(changer); 
      subject.onNext(v); 
      return v; 
     } 
    }); 

(あなたが他の回答herehereで見つけるコードの足りない部分) はそれでも、私は正しいと答えをマークしていないだろう、私は誰かがしたいかもしれない、と考えていますより適切な方法でこのパズルを自分自身で解決してください。

EDIT:正しい方法が見つかりました。

関連する問題