2016-10-19 11 views
1

Rssフィードを取得するアプリケーションにはRxJAvaでRetrofitを使用していますが、rssにすべての情報が含まれていないため、jsoupを使用してアイテムリンクを解析して画像と記事の説明を取得します。今私はそれをこのように使用しています:RxJavaフラットマップチェーニングリクエスト

public Observable<Rss> getDumpData() { 
    return newsAppService.getDumpData() 
      .flatMap(rss -> Observable.from(rss.channel.items) 
      .observeOn(Schedulers.io()) 
      .flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get()) 
      .observeOn(Schedulers.io()) 
      .map(document -> document.select("div[itemprop=image] > img").first()) 
        .doOnNext(element -> item.image = element.attr("src")) 
      ))) 
      .defaultIfEmpty(rss) 
      .ignoreElements() 
      .observeOn(Schedulers.io()) 
      .subscribeOn(AndroidSchedulers.mainThread()); 
} 

を、私はこの行のエラーを取得しています:defaultIfEmpty(rss) それはflatmapのRSSを認識しません。フラットマップ括弧でdefaultIfEmpty(rss)を移動すると、戻り値の型をElementに変更する必要があるというエラーが表示されます。彼らの解決策は何ですか?

+0

'defaultIfEmpty'パラメータに' flatMap'パラメータを渡すことによって達成しようとしていますか?あなたは例外をキャッチしたいですか? R.Zagó[email protected] –

+0

観察可能 – Mamadou

+0

はエラーが画像を取得するために 'newsAppService.getDumpData()'からか、変換から起こることを期待してくださいそのわずかエラーが発生した場合におけるその放射源? R.Zagó[email protected] –

答えて

2

最初の参照を失うことができないこと.zip機能を使用していました。

.observeOn(Schedulers.io()) 

バックイベントループに別のスレッドからのデータをバック同期したい場合はAndroidSchedulerでobserveOn使用することを検討してください。通常は、オブザーバブルに登録する前にobserveOnを使用して、ui-loopに同期してui-informationを変更します。

.observeOn(AndroidSchedulers.mainThread()) 

第2に、パイプラインのオブジェクトを変更することはお勧めしません。あなたは新しいオブジェクトを非常に時間を返す必要があります。

.doOnNext(element -> item.image = element.attr("src")) 

最初の2つの点を考慮してソリューションをリファクタリングしようとしました。私はRxJava2-RC5を使用しています

flatMap演算子には多くのオーバーロードがあります。そのうちの1つは、入力値と作成された値を一緒に圧縮する機能を提供します。

Observable<Rss> rssItemObservable = newsService.getDumpData() 
       .flatMap(rss -> getRssItemInformation(rss).subscribeOn(Schedulers.io()), 
         (r, rItemList) -> { 
          Rss rInterim = new Rss(); 
          rInterim.items = rItemList; 
          return rInterim; 
         }); 

Rssの各項目の情報を取得するためのヘルプメソッド。 maxConcurrencyでオーバーロードを使用することを検討してください。デフォルトでは、すべてのストリームを一度に購読します。したがって、flatMapは多くのhttp要求を作成します。

private Observable<List<RssItem>> getRssItemInformation(Rss rss) { 
     return Observable.fromIterable(rss.items) 
       .flatMap(rssItem -> getImageUrl(rssItem).subscribeOn(Schedulers.io()), (rItem, img) -> { 
        RssItem item = new RssItem(); 
        printCurrentThread("merge1"); 
        item.image = img; 
        item.link = rItem.link; 
        return item; 
       }).toList().toObservable(); 
} 

イメージURLを取得するためのヘルプメソッド。観察可能なリターンは、並行性については議論されていません。エラーが発生した場合、空の文字列がデフォルト値として返されます。

private Observable<String> getImageUrl(String link) { 
      return Observable.fromCallable(() -> Jsoup.connect(link).get()) 
       .map(document -> document.select("div[itemprop=image] > img").first()) 
       .map(element -> element.attr("src")) 
       .onErrorResumeNext(throwable -> { 
        return Observable.just(""); 
       }); 
} 

githubで完全な例を見ることができます。要旨:https://gist.github.com/anonymous/a8e36205fc2430517c66c802f6eef38e

+0

ハンスを保存しました感謝の言葉を述べたいと思っていましたが、この答えは間接的に私がチェーンリクエストで私の問題を解決するのを助けました。 flatMap内の別のAPIコールをしながら、私は私が意味する、要求をチェーンにsubscribeOn(Schedulers.io()を呼び出していませんでした。 –

1

RxJavaパラメータのパラメータ(flatMapラムダパラメータ)と別の演算子パラメータ(defaultIfEmpty)を混在させることはできません。すべての

まず、メインの反応ストリームクリーナーを維持するためにヘルパー関数を作成します。

private Observable<List<Item>> getDetails(List<Item> items) { 
    return Observable.from(items) 
       .observeOn(Schedulers.io()) 
       .flatMap(Checked.f1(item -> 
        Observable.zip(
         Observable.just(item), 
          Observable.just(Jsoup.connect(item.link).get()) 
          .observeOn(Schedulers.io()) 
          .map(document -> document.select("div[itemprop=image] > img").first()), 
          (itemInner, element) -> { 
           itemInner.image = element.attr("src"); 
           return itemInner; 
          } 
        ) 
       )) 
       .toList(); 
} 

はその後、main関数を再フォーマット:私が正しくあなたの目的を持っ

newsAppService.getDumpData() 
    .flatMap(rss -> 
     Observable.zip(
      Observable.<Rss>just(rss), 
      getDetails(rss.channel.items), 
      (rssInner, items) -> { 
       rssInner.channel.items = items; 
       return rss; 
      }).onErrorResumeNext((throwable -> Observable.just(rss)) 
     ) 
    ) 
    .observeOn(Schedulers.io()) 
    .subscribeOn(AndroidSchedulers.mainThread()); 

希望を。私はそれをテストすることができないので、それは動作しませんが、私はあなたがそのアイデアを得ることを願っています。その理由は、私はあなたが現在item解析されたり、observeOnですべての同時実行を取り除くとsubscribeOnを使用取得するために必要なすべてのrss

+0

おかげで多く、それは完璧に動作しますが、私は非常によく、コードを理解しない例外をスローすることができます変換からそれを期待し、私はジップドキュメントを読んでます、あなたは私の – Mamadou