私はscalikejdbcを使用して巨大なテーブルにアクセスしています。私が理解していることは、すべての行をマップしたり、繰り返したりする前に、すべての行をメモリにフェッチすることです。Scalikejdbc結果セットイテレータ
現在、私は非常にシンプルなrxscala Observableを使用して実装しています。しかし、受信者は、SQLを読み込むよりも遅いし、私はバッファリングのためOutOfMemoryを取得します。私はSQL.foreach方法を知ってる
def fetchProductsAsObservable(
sql: SQL[Nothing,NoExtractor],
extractor: (WrappedResultSet) => ProductItem)
) =
Observable[ProductItem](o =>
try {
sql.foreach(row => o.onNext(extractor(row)))
o.onCompleted()
} catch {
case e: Throwable => o.onError(e)
}
)
が、それはコールバックメソッドと戻り単位を取得します。ここでは、観察可能性があるので、私の現在のプロデューサーです。 私の背景は.NETにあります。 scalikejdbcを使ってscalaの簡単なイテレータを正しく実装する方法を理解できません。これは並列処理のために与えることができますか?
コードの一部を見せてもらえますか? – mfirry
確かに、質問を編集しました –