2016-04-13 6 views
8

無制限ストリームを再生するためにRetrofit + RxJavaを使用できますか?例えば、Twitterのストリーム。私が持っているのは:Android Retrofit 2 + RxJava:無限ストリームを聞きます

public interface MeetupAPI { 
    @GET("http://stream.meetup.com/2/rsvps/") 
    Observable<RSVP> getRSVPs(); 
} 

MeetupAPI api = new Retrofit.Builder() 
      .baseUrl(MeetupAPI.RSVP_API) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .build() 
      .create(MeetupAPI.class); 

api.getRSVPs() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(rsvp -> Log.d(TAG, "got rsvp"), 
       error -> Log.d(TAG, "error: " + error), 
       () -> Log.d(TAG, "onComplete")); 

ですが、最初のオブジェクトが解析された後に "onComplete"が呼び出されます。 Retrofitにさらに通知するまで開いたままにするよう指示する方法はありますか?ここで

答えて

9

は私の解決策:私たちはResponseBodyから生の入力を取得することができます@Streaming

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

あなたは@Streamingアノテーションを使用することができます。

は、ここに私の関数は、イベントと線で分割体ラップする:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

をと利用結果:我々は、改造閉じを退会すると正常

の停止について

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

UPDを入力ストリーム。しかし、入力ストリームを入力ストリーム自体から検出するのは不可能です。したがって、ストリームからの読み取りを試みるには、Socket closedメッセージで例外が発生します。

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

との接続が応答終了するので閉鎖した場合、我々はすべての例外を持っていない: 私たちは、クロージングとして、この例外を解釈することができます。ここでisCompletedを確認してください。私が間違っている場合は教えてください:)

+0

ありがとう!ストリームの聞き取りをうまくやめる方法についてのヒント? – ticofab

関連する問題