2016-02-10 12 views
5

私はAppWarp(http://appwarp.shephertz.com)と呼ばれるマルチプレイヤーゲームクライアントを使用しています。イベントが発生したときにコールバックされるイベントリスナーを追加できます。ここでは、このインターフェイスを実装する必要があるConnection Listenerについて説明します。 :ここRxJavaを使用してリスナーをリアクティブ(Observables)に正しく変換する方法は?

public interface ConnectionRequestListener { 
    void onConnectDone(ConnectEvent var1); 
    void onDisconnectDone(ConnectEvent var1); 
    void onInitUDPDone(byte var1); 
} 

私の目標は、内部的に代わり、直接クライアント自体を使用しての(私もインターフェースに依存しているよ、後だけではなくに応じて、主に私のアプリで使用するこのクライアントの反応バージョンを作成することですこの例のようにWarpClient自体は重要ですが、最後に私の質問をお読みください)。

次のように私がやったことです:

1)私は、次のようにRxConnectionEvent(主にグループの接続関連のイベント)と命名し、新しいイベントを紹介:

public class RxConnectionEvent { 
    // This is the original connection event from the source client 
    private final ConnectEvent connectEvent; 
    // this is to identify if it was Connection/Disconnection 
    private final int eventType; 

    public RxConnectionEvent(ConnectEvent connectEvent, int eventType) { 
     this.connectEvent = connectEvent; 
     this.eventType = eventType; 
    } 

    public ConnectEvent getConnectEvent() { 
     return connectEvent; 
    } 

    public int getEventType() { 
     return eventType; 
    } 
} 

2次のように)いくつかのイベントタイプを作成:

public class RxEventType { 
    // Connection Events 
    public final static int CONNECTION_CONNECTED = 20; 
    public final static int CONNECTION_DISCONNECTED = 30; 
} 

3)私の新しいRxConnectionEvent

import com.shephertz.app42.gaming.multiplayer.client.WarpClient; 
import com.shephertz.app42.gaming.multiplayer.client.events.ConnectEvent; 
import rx.Observable; 
import rx.Subscriber; 
import rx.functions.Action0; 
import rx.subscriptions.Subscriptions; 

public class ConnectionObservable extends BaseObservable<RxConnectionEvent> { 

    private ConnectionRequestListener connectionListener; 

    // This is going to be called from my ReactiveWarpClient (Factory) Later. 
    public static Observable<RxConnectionEvent> createConnectionListener(WarpClient warpClient) { 
     return Observable.create(new ConnectionObservable(warpClient)); 
    } 

    private ConnectionObservable(WarpClient warpClient) { 
     super(warpClient); 
    } 

    @Override 
    public void call(final Subscriber<? super RxConnectionEvent> subscriber) { 
     subscriber.onStart(); 
     connectionListener = new ConnectionRequestListener() { 
      @Override 
      public void onConnectDone(ConnectEvent connectEvent) { 
       super.onConnectDone(connectEvent); 
       callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_CONNECTED)); 
      } 

      @Override 
      public void onDisconnectDone(ConnectEvent connectEvent) { 
       super.onDisconnectDone(connectEvent); 
       callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_DISCONNECTED)); 
      } 

      // not interested in this method (for now) 
      @Override 
      public void onInitUDPDone(byte var1) { } 

      private void callback(RxConnectionEvent rxConnectionEvent) 
      { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(rxConnectionEvent); 
       } else { 
        warpClient.removeConnectionRequestListener(connectionListener); 
       } 
      } 
     }; 

     warpClient.addConnectionRequestListener(connectionListener); 
     subscriber.add(Subscriptions.create(new Action0() { 
      @Override 
      public void call() { 
       onUnsubscribed(warpClient); 
      } 
     })); 
    } 

    @Override 
    protected void onUnsubscribed(WarpClient warpClient) { 
     warpClient.removeConnectionRequestListener(connectionListener); 
    } 
} 

4を放出する、次の観測可能に作成された)、そして最後に私のBaseObservableは、次のようになります。

public abstract class BaseObservable<T> implements Observable.OnSubscribe<T> { 

    protected WarpClient warpClient; 

    protected BaseObservable (WarpClient warpClient) 
    { 
     this.warpClient = warpClient; 
    } 

    @Override 
    public abstract void call(Subscriber<? super T> subscriber); 

    protected abstract void onUnsubscribed(WarpClient warpClient); 
} 

私の質問は主にある:正しい上記の私の実装ですそうでなければ、このクライアントは40-50以上のイベントを持っていますが、イベントごとに個別のオブザーバブルを作成する必要がありますか?

は私も(簡単な「非最終」統合テストでそれを使用)以下のように上記のコードを使用します。

public void testConnectDisconnect() { 
    connectionSubscription = reactiveWarpClient.createOnConnectObservable(client) 
      .subscribe(new Action1<RxConnectionEvent>() { 
       @Override 
       public void call(RxConnectionEvent rxEvent) { 
        assertEquals(WarpResponseResultCode.SUCCESS, rxEvent.getConnectEvent().getResult()); 
        if (rxEvent.getEventType() == RxEventType.CONNECTION_CONNECTED) { 
         connectionStatus = connectionStatus | 0b0001; 
         client.disconnect(); 
        } else { 
         connectionStatus = connectionStatus | 0b0010; 
         connectionSubscription.unsubscribe(); 
         haltExecution = true; 
        } 
       } 
      }, new Action1<Throwable>() { 
       @Override 
       public void call(Throwable throwable) { 
        fail("Unexpected error: " + throwable.getMessage()); 
        haltExecution = true; 
       } 
      }); 

    client.connectWithUserName("test user"); 
    waitForSomeTime(); 
    assertEquals(0b0011, connectionStatus); 
    assertEquals(true, connectionSubscription.isUnsubscribed()); 
} 

答えて

2

は、私はあなたはそれが非常にエラーが発生しやすいですので、直接BaseObservableを延長しないようお勧めします。代わりに、Rx自身があなたにオブザーバブルを作成するためのツールを使用してみてください。

最も簡単な解決策は、ObservableとSubscriberの両方であるPublishSubjectを使用しています。リスナーは単に被験者のonNextを呼び出す必要があり、被験者はイベントを発する。ここでは単純化された作業例です:

public class PublishSubjectWarpperDemo { 

    public interface ConnectionRequestListener { 
     void onConnectDone(); 

     void onDisconnectDone(); 

     void onInitUDPDone(); 
    } 

    public static class RxConnectionEvent { 
     private int type; 

     public RxConnectionEvent(int type) { 
      this.type = type; 
     } 

     public int getType() { 
      return type; 
     } 

     public String toString() { 
      return "Event of Type " + type; 
     } 
    } 

    public static class SimpleCallbackWrapper { 
     private final PublishSubject<RxConnectionEvent> subject = PublishSubject.create(); 

     public ConnectionRequestListener getListener() { 
      return new ConnectionRequestListener() { 

       @Override 
       public void onConnectDone() { 
        subject.onNext(new RxConnectionEvent(1)); 
       } 

       @Override 
       public void onDisconnectDone() { 
        subject.onNext(new RxConnectionEvent(2)); 
       } 

       @Override 
       public void onInitUDPDone() { 
        subject.onNext(new RxConnectionEvent(3)); 
       } 
      }; 
     } 

     public Observable<RxConnectionEvent> getObservable() { 
      return subject; 
     } 

    } 

    public static void main(String[] args) throws IOException { 
     SimpleCallbackWrapper myWrapper = new SimpleCallbackWrapper(); 
     ConnectionRequestListener listner = myWrapper.getListener();// Get the listener and attach it to the game here. 
     myWrapper.getObservable().observeOn(Schedulers.newThread()).subscribe(event -> System.out.println(event)); 

     listner.onConnectDone(); // Call the listener a few times, the observable should print the event 
     listner.onDisconnectDone(); 
     listner.onInitUDPDone(); 

     System.in.read(); // Wait for enter 
    } 
} 

より複雑な解決策はObservable.create()を使用して、観察を作成するために、onSubscribe実装の1つを使用することです。たとえば、AsyncOnSubscibeとなります。このソリューションは、適切なバックペースを処理できるというメリットがあるため、イベントのサブスクライバがイベントに圧倒されることはありません。しかし、あなたのケースでは、そうは考えにくいシナリオのように聞こえるので、追加された複雑さはおそらくそれに値するものではありません。

関連する問題