2017-12-27 17 views
1

目的は、データベースからデータをストリーミングし、このデータチャンクで何らかの計算を実行することです(この計算は、あるケースクラスの未来を返します)ユーザに提供する。現在、私はデータをストリーミングし、計算を実行せずに応答を送信できます。しかし、私はこの計算を実行できず、結果をストリーミングすることができません。Slickストリーミングデータの変換とAkka Httpを使用したチャンクレスポンスの送信

これは私が実装したルートです。

def streamingDB1 = 
path("streaming-db1") { 
    get { 
    val src = Source.fromPublisher(db.stream(getRds)) 
    complete(src) 
    } 
} 

関数getRdsは、ケースクラス(slickを使用)にマップされたテーブルの行を返します。ここで、各行を入力として受け取り、別のケースクラスのFutureを返す関数computeについて考えてみましょう。私は、変数SRCでこの機能を実装し、ユーザーにこの計算の(ストリーム)としてチャンクの応答を送信することができますどのように

def compute(x: Tweet) : Future[TweetNew] = ? 

ような何か。

答えて

1

あなたはmapAsyncを使用して、ソースを変換できます。必要に応じて

val src = 
    Source.fromPublisher(db.stream(getRds)) 
     .mapAsync(parallelism = 3)(compute) 

complete(src) 

は、並列処理のレベルを調整します。

+0

これは機能しません。私はcurlコマンドを実行してエンドポイントにヒットします。ただし、接続が閉じられます。 – user3294786

関連する問題