2017-06-06 8 views
0

Observable<ResultSet>を頂点AsyncSqlClient からHashMapに集めようとしています。Observableを収集する方法

Map<String, Integer> catToIdMap = Maps.newHashMap(); 
asyncSQLClient 
     .getConnectionObservable() 
     .flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table") 
       .doOnCompleted(sqlConnection::close) 
       .doOnError(throwable -> log.error("Error while querying.", throwable))) 
     .flatMap(resultSet -> Observable.from(resultSet.getRows())) 
     .toBlocking() 
     .forEach(row -> map.put(row.getString("a"), row.getInteger("b"))); 

しかし、これは永遠にブロックされているようです。

何も結果を出さずに検索した後、手を貸してもらえますか?

+0

あなたはあなたの上司に電話をかけていますか? toBlocking()は動作する必要があります。 –

+0

あなたは詳しく説明できますか? –

答えて

1

@Phenix WangがコメントしたforEach()の方法will block until Observable will completes。無限のアイテムを放出するObservableを持っているか、Observable completeを通知するために間違ってonCompletedを放出しないObservableを持っていると、永遠にブロックされます。それはだ場合、あなたのgetConnectionObservable()メソッドの実装によって引き起こされる可能性がある

は、すべての項目が放出された後、あなたがonCompleted()を呼び出す必要があります例えばObservable.create()を使用して、カスタムObservableを作成します。 (それはreactivenessのすべての目的を破るだとして)いずれにせよ、あなたが生産コードで適切ではないかもしれない、toBlocking()がブロックして待つことになることに注意する必要があり、あなたがreduce()を使用して同じ目的を達成することができます

asyncSQLClient 
      .getConnectionObservable() 
      .flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table") 
        .doOnCompleted(sqlConnection::close) 
        .doOnError(throwable -> log.error("Error while querying.", throwable))) 
      .flatMap(resultSet -> Observable.from(resultSet.getRows())) 
      .reduce(Maps.newHashMap(), (map, o) -> map.put(row.getString("a"), row.getInteger("b"))) 
      .subscribe(map -> { 
         //do something with map 
        } 
      ); 

注:onCompletedの問題を解決する必要があります。これは、Observableが完了し、ソースObservableが完了すると信号アイテムを送信します。
さらにscan()(単にscanをreduceに置き換える)を使用してスキャンすると、ソースObservableで放出された各アイテムの排出が得られます。つまり、時間の経過とともにマップするアイテムが蓄積されます。

+0

Observableの 'length'を1に制限するために 'single'演算子を使用できますか? Observableが最初の要素の後に完了したことを知らせるには? –

+0

いいえ、single()も完了(または2回目の排出)を待ってからIllegalArgumentExceptionをスローしますが、first()を使用すると、後で何が起きても最初の排出が得られますが、 – yosriz

+0

あなたが助けを必要とするなら、あなたは 'getConnectionObservable'コードを投稿することができます。 – yosriz

関連する問題