2016-10-25 1 views
2

私はscalikejdbcを使用して巨大なテーブルにアクセスしています。私が理解していることは、すべての行をマップしたり、繰り返したりする前に、すべての行をメモリにフェッチすることです。Scalikejdbc結果セットイテレータ

現在、私は非常にシンプルなrxscala Observableを使用して実装しています。しかし、受信者は、SQLを読み込むよりも遅いし、私はバッファリングのためOutOfMemoryを取得します。私はSQL.foreach方法を知ってる

def fetchProductsAsObservable(
    sql: SQL[Nothing,NoExtractor], 
    extractor: (WrappedResultSet) => ProductItem) 
) = 
    Observable[ProductItem](o => 
     try { 
      sql.foreach(row => o.onNext(extractor(row))) 
      o.onCompleted() 
     } catch { 
     case e: Throwable => o.onError(e) 
     } 
    ) 

が、それはコールバックメソッドと戻り単位を取得します。ここでは、観察可能性があるので、私の現在のプロデューサーです。 私の背景は.NETにあります。 scalikejdbcを使ってscalaの簡単なイテレータを正しく実装する方法を理解できません。これは並列処理のために与えることができますか?

+0

コードの一部を見せてもらえますか? – mfirry

+0

確かに、質問を編集しました –

答えて

-1

あなたの質問はScalikeJDBCの動作とは関係なく、RxJavaに関係しています。 Observableは熱源によって調達されるので、背圧戦略を使用する必要があります。

+0

遅れていただきありがとうございます。私の実際の背圧戦略はイテレータを使うことでした。 ScalaRxは、.NETの脳からスカラに移植できる唯一のものだったからです。当時、私は少し反応的な流れを書いていました。そしてscalikejdbc 3.xは自分自身のscalikejdbc-streamsライブラリを追加しました。これも私の場合に役立ちませんでした。 –