2017-04-04 11 views
0

私は誰かが購読しているかどうかにかかわらず、アイテムを放出する特定の機能を持っています。また、この機能は一度だけ実行されなければならないと誰かがそれに加入するかどうかはどのようにしてobservableを作成するには?

Observable<CharSequence> observable = Observable.create(subscriber -> { 

      try { 

       sseEventSource.connect(); //this should be called once when created; 

       final SseEventReader sseEventReader = sseEventSource.getEventReader(); 
       SseEventType type = sseEventReader.next(); 
       while (type != SseEventType.EOS) { 
        if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) { 
         CharSequence data = sseEventReader.getData(); 
         if (!subscriber.isUnsubscribed()) { 
          subscriber.onNext(data); 
         } 
        } 
        type = sseEventReader.next(); 
       } 
       sseEventSource.close(); 
       Log.d("SseService", "closed"); 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onCompleted(); 
       } 
      } catch (URISyntaxException | IOException e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 

答えて

0

はあなたが達成することができます実行されてはならないことを、たとえば、Subjectsを使用して:

public static void main(String[] args) { 
    Main m = new Main(); 
    m.getChanges().subscribe(x -> { 
     //Data 
    }, e -> { 
     //Errors 
    }); 
    m.connect(); 
} 

private PublishSubject<CharSequence> ps = PublishSubject.create(); 

public void connect() { 
    sseEventSource.connect(); 
} 

public Observable<CharSequence> getChanges() { 
    try { 
     final SseEventReader sseEventReader = sseEventSource.getEventReader(); 
     SseEventType type = sseEventReader.next(); 
     while (type != SseEventType.EOS) { 
      if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) { 
       CharSequence data = sseEventReader.getData(); 
       ps.onNext(data); 
      } 
      type = sseEventReader.next(); 
     } 
     sseEventSource.close(); 
     ps.onComplete(); 
    } catch (URISyntaxException | IOException e) { 
     ps.onError(e); 
    } 
    return ps; 
} 

あなたが購読することができますこの方法/必要に応じて件名からの変更を退会し、接続を1回だけ確立します。バッファリングされた変更を受け取る場合は、ReplaySubjectを使用してください。

+0

私はこのconnect()などのものはIOスレッド内で実行する必要があることを忘れています。 –

+0

それでIOスレッドで実行してください – Divers

+0

誰も購読していなくてもアイテムを出すHot observableを作成する方法はありますか?このコードはすべて、1つのスレッドで同時に実行する必要があります。あなたは、私がrxチェーンの中でそれの一部を演奏し、他のどこかの詩を欲しいと思っています。 –

1

share()オペレータを試してください。

Observable<CharSequence> observable = Observable.create(subscriber -> { 

     try { 

      sseEventSource.connect(); //this should be called once when created; 

      final SseEventReader sseEventReader = sseEventSource.getEventReader(); 
      SseEventType type = sseEventReader.next(); 
      while (type != SseEventType.EOS) { 
       if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) { 
        CharSequence data = sseEventReader.getData(); 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onNext(data); 
        } 
       } 
       type = sseEventReader.next(); 
      } 
      sseEventSource.close(); 
      Log.d("SseService", "closed"); 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onCompleted(); 
      } 
     } catch (URISyntaxException | IOException e) { 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onError(e); 
      } 
     }).share() ; 

あなたは本当に熱い観測可能使用publish()connect()は誰が加入していない場合であっても、観察を開始します。

関連する問題