Observableを作成して、さまざまなソース(他のObservables)から情報を収集する必要があります。各ソースはイベント値に影響しますが、値は前の値(状態マシンの種類)。初期化値と、フィードバックを伴うObservablesのマージ
class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
}
そして、このような値のいくつかのソース:
は、我々はint型の値とオペレーションコードとメッセージを持っている
Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));
は、今イベントのソースがあります。これらのソースは、dynamicMessageのprevios値に基づいて、新しいdynamicMessage値が表示される値を出力します。実際には、イベントの種類は次のとおりです。
class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
}
class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
}
変更操作のためのチェンジャー1です。チェンジャー2は変化の価値を応答する。これらの値の
と情報源:
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
は今、私はdynamicMessage観察可能な変化とチェンジャーから尊敬メッセージに取る必要があります。ここで図は次のとおりです。
また、私は、私は解決策を参照してくださいどのように、プログラムを書いしようとするが、それはrigth最初の値の後に、OutOfMemoryException例外でハング:
メッセージ{値= 1、操作=「+」}
リスト:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxDilemma {
static class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" +
"value=" + value +
", operation='" + operation + '\'' +
'}';
}
}
static class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" +
"operation='" + operation + '\'' +
'}';
}
}
static class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" +
"value=" + value +
'}';
}
}
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer1 changer) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
message.operation = changer.operation;
return message;
}
});
}
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer2 changer2) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
if("+".equalsIgnoreCase(message.operation)){
message.value = message.value+changer2.value;
} else if("-".equalsIgnoreCase(message.operation)){
message.value = message.value-changer2.value;
}
return message;
}
});
}
}));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.subscribe(new Action1<Message>() {
@Override
public void call(Message message) {
System.out.println(message);
}
});
Thread.sleep(5000000);
}
}
だから私の予想outpurが何であるか:
メッセージ{値= 1、操作= '+'}
メッセージを{値= 1、操作= ' - '}
メッセージ{値= -1、操作= ' - '}
メッセージを{値= 1、操作= '+'}
メッセージ{値= 2、操作= '+'}また
IがCombineLatestですべてをマージしようとしているのですが、その後、CombineLatestはどの要素が変更されたのかわかりません。この情報がなければ、メッセージの変更を行う必要はありません。 助けてください?言い換えれば
この関数は正しい最終値を返しますが、中間値はありません。 'subscribe'は全てのチェンジャーが作業を終了したときだけ起動します。 – msangel
'valueChangers = Observable.interval(1、TimeUnit.SECONDS).map(aLong - >新しいValueChanger(1))'このハングを永遠に引き起こします。 – msangel
申し訳ありませんが、私はしばらくのうちに還元を使用していません。しかし、同じパラメータで 'scan'を' reduce'するだけで、期待どおりに動作します。すぐに私の答えを編集します – koperko