2017-12-05 8 views
-1

In this thread、アンサブスクライブ後にリスナーをクリーンアップして削除できるように、unsubscribeイベントを監視する方法について質問があります。しかし、RxJava2では、上記のスレッドはもはや動作しません。RxJava2のカスタムObservableでObservableオブザーバの通知を受け取る方法

def myObservable = Observable.create({ aEmitter -> 
    val listener = {event -> 
     aEmitter.onNext(event);     
    } 
    existingEventSource.addListener(listener) 

    // Fails since aEmitter doesn't have an add() method nor does Subscriptions exist. 
    aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener))); 
}) 

これはRxJava2でどのように対処するのですか?

答えて

1

stringObservableをご覧ください。サブスクリプションを処理する方法を理解してください。

public class MyTest { 
    @Mock private MyService mock; 

    @Before 
    public void setUp() throws Exception { 
    MockitoAnnotations.initMocks(this); 
    } 

    @Test 
    public void nam3e() { 
    ArrayList<Listener> listeners = new ArrayList<>(); 

    doAnswer(
      invocation -> { 
       Object[] args = invocation.getArguments(); 
       Listener arg = (Listener) args[0]; 

       listeners.add(arg); 

       return null; 
      }) 
     .when(mock) 
     .addListener(any()); 

    Observable<String> stringObservable = 
     Observable.create(
      e -> { 
       Listener listener = 
        s -> { 
        e.onNext(s); 
        }; 

       mock.addListener(listener); 

       e.setCancellable(
       () -> { 
        mock.removeListener(listener); 
        }); 
      }); 

    TestObserver<String> test = stringObservable.test(); 

    Listener listener = listeners.get(0); 
    listener.onNext("Wurst"); 

    test.assertNotComplete().assertValue("Wurst"); 
    verify(mock, times(1)).addListener(any()); 

    test.dispose(); 

    verify(mock, times(1)).removeListener(any()); 
    } 

    public interface MyService { 
    void addListener(Listener listener); 

    void removeListener(Listener listener); 
    } 

    @FunctionalInterface 
    public interface Listener { 
    void onNext(String s); 
    } 
} 
+0

ああありがとう。 setCancellableメソッドを理解できませんでしたが、これでクリアされます。 –

関連する問題