2017-05-23 26 views
1

私はFuture[Option[Elephant]]を返して、ネットワーク上でElephantを探す関数を持っています。戻り値はFutureなので、ネットワークコールが非同期に発生している間に関数がすぐに戻ることができます。Scalaでのテール再帰的な機能的ポーリング

def checkForElephant : Future[Option[Elephant]] = ??? 

は、私は何をしたいことはpollForElephantと呼ばれる関数を記述です:Someは象が発見されたことを意味している間は、Noneはまだ利用できませんを意味しているためOptionが含まれています。

def pollForElephant : Future[Elephant] = ??? 

この機能さえあった場合、Elephantが発見されるまでcheckForElephantを呼び出し、すぐに成功する要素が最初のチェックで発見された場合、それ以降は再び10秒ごとにチェックしますプロセス上Futureを返す必要がありますゾウはいないし、永遠に試してみる必要があります。

これを行う簡単な方法は、単にポーリングして、全体の不倫上Futureを作成するためにFutureドメイン外の再帰関数を記述し、同期するチェックを強制することです:

import scala.annotation.tailrec 
import scala.concurrent.{Await,Future,blocking} 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits.global 

class Elephant; 

def checkForElephant : Future[Option[Elephant]] = ??? 

def synchronousCheckForElephant : Option[Elephant] = blocking { 
    Await.result(checkForElephant, Duration.Inf) 
} 

@tailrec 
def poll(last : Option[Elephant]) : Elephant = { 
    last match { 
    case Some(elephant) => elephant 
    case None => { 
     blocking { 
     Thread.sleep(10.seconds.toMillis) 
     } 
     poll(synchronousCheckForElephant) 
    } 
    } 
} 

def pollForElephant : Future[Elephant] = Future { 
    poll(synchronousCheckForElephant) 
} 

このドメインがFutureから始まり、強制的に同期させてから元に戻って、ひどく控えめなようです。私はFutureからすべてを行うことができるはずだと思った。だから、私はこれを試してみました:上記のコメントが言うよう

残念ながら
import scala.annotation.tailrec 
import scala.concurrent.{Await,Future,blocking} 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits.global 

class Elephant; 

def checkForElephant : Future[Option[Elephant]] = ??? 

// oops! this is not @tailrec 
def poll(last : Future[Option[Elephant]]) : Future[Elephant] = { 
    last.flatMap { mbElephant => 
    mbElephant match { 
     case Some(elephant) => Future.successful(elephant) 
     case None => { 
     blocking { 
      Thread.sleep(10.seconds.toMillis) 
     } 
     poll(checkForElephant) 
     } 
    } 
    } 
} 

def pollForElephant : Future[Elephant] = poll(checkForElephant) 

poll(...)関数は末尾再帰的ではありません。象は到着するまでに時間がかかるかもしれませんが、私は無期限に待つことになっていますが、スタックが壊れる可能性があります。

そして、全体がちょっと変わっています。私はちょうど理由が簡単な同期アプローチに戻すべきですか? Futureに滞在している間に私が意味することをする安全な方法はありますか?

+2

参照:https://stackoverflow.com/questions/16973838/how-do- i-make-a-function-with-future-tail-recursive – PH88

+0

@ PH88ありがとうございました。それは有益で面白いです。私はそれについて少し考えなければならない。 –

答えて

1

私はflatMapの中にcheckForElephantの中に新しいFutureとそれゆえ新しいスタックを作成するので、テール再帰的に呼び出す必要はありません。ここで私はあなたのcheckForElephantを模擬しようとしたシンプルなコードは次のとおりです。checkForElephant =へ
コールスリーピングcheckForElephant => falseを

コール>:

type Elephant = String 
val rnd = new Random() 

def checkForElephant: Future[Option[Elephant]] = Future({ 
    val success = rnd.nextDouble() < 0.2 
    println(s"Call to checkForElephant => $success") 
    if (success) Some(Thread.currentThread().getStackTrace().mkString("\n")) else None 
}) 

def poll(last: Future[Option[Elephant]]): Future[Elephant] = { 
    last flatMap { 
     case Some(elephant) => Future.successful(elephant) 
     case None => { 
     blocking { 
      println("Sleeping") 
      Thread.sleep(100.millisecond.toMillis) 
     } 
     poll(checkForElephant) 
     } 
    } 
} 

def pollForElephant: Future[Elephant] = poll(checkForElephant) 

val result = Await.result(pollForElephant, Duration.Inf) 
println(result) 

そして、ここでは、実行の1から出力されcheckForElephant =へcheckForElephant => falseを
コールをスリーピング
に偽
睡眠
コール> checkForElephantに
コールスリーピングcheckForElephant => falseを
=>真
れるjava.langに
コールスリーピングcheckForElephant => falseを

コールスリーピングcheckForElephant => falseを

コールスリーピング
偽。糸。(TestApp.scala:97)
so.TestApp $$ anonfun $ so $ TestApp $$ checkForElephant $ 1 $ 1.apply(TestApp.scala:94)
scala.concurrent.impl.Future $ PromiseCompletingRunnable.liftedTree1 $ -1(Future.scala:24)
scala.concurrent.impl.Future $ PromiseCompletingRunnable.run(Future.scala:24 ) scala.concurrent.impl.ExecutionContextImpl $ AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.pollAndExecAll (ForkJoinPool.java:1253)
scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1346)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread .javaファイル:107)

あなたはcheckForElephantNone最初の7回に戻っているという事実にもかかわらず、スタックトレースが浅いことを見ることができます。私はあなたのアプローチについては好きではない何

追記

は、あなたが10秒間睡眠を行うときにあなただけのいくつかのスレッドをブロックしているという事実です。これは私にとってはかなり非効率的なようです。そのような呼び出しをたくさんしたい場合は、Java ScheduledThreadPoolExecutorやAkka Actorsなど、よりスマートなものを使用することを検討してください。

更新

しかし、それは、ヒープ上のオブジェクトとして保持スタックフレームの論理的同等をメモリリークが発生しますか?

いいえ、あなたのcheckForElephantに非常に奇妙なものがない限り、メモリリークを起こすには、いくつかの "root"によってメモリが保持されるべきです。考えられる根は、静的変数、スレッドローカル変数、およびスタックです。私たちが知っているように、スタックは成長しないので、漏れの原因にはなりません。静的なものやスレッドのものを混乱させないなら、安全でなければなりません。

については、スレッド消費、あなたが本当にシステムで一つだけ「象」を持っている場合は、私が大幅にもっと良いものがあるとは思いません。しかし、checkForElephantが実際にはcheckForElephant(id)である場合は、正当な理由でスレッドを大量に消費する可能性があります。最初のステップは、(私はそれのためのScala同等の認識していないよ)PromiseScheduledThreadPoolExecutorを使用している場合がありますこれを改善し、より良いスレッドの使用などのために、いくつかの機能的なスタイルを犠牲にする:

// Just 1 thread should be enough assuming checkForElephant schedules 
// it's Future on some executor rather than current thread 
val scheduledExecutor = new ScheduledThreadPoolExecutor(1) 

def pollForElephant: Future[Elephant] = { 
    def scheduleDelayedPoll(p: Promise[Elephant]) = { 
    scheduledExecutor.schedule(new Runnable { 
     override def run() = poll(p) 
    }, 
     10, TimeUnit.SECONDS) 
    } 

    def poll(p: Promise[Elephant]): Unit = { 
    checkForElephant.onComplete { 
     case s: Success[Option[Elephant]] => if (s.value.isDefined) p.success(s.value.get) else scheduleDelayedPoll(p) 
     case f: Failure[_] => scheduleDelayedPoll(p) 
    } 
    } 

    val p = Promise[Elephant]() 
    poll(p) 
    p.future 
} 

あなたはより多くの負荷を持っている場合は、次のステップcheckForElephantのネットワーク上の要求に対するスレッドをブロックしないために、非ブロックI/Oを使用しています。あなたが実際にWebサービスを使用している場合は、を参照してください。Netty

+0

私はこれが厄介な方法で投票することに同意します。問題のモチベーションの一部は、 'checkForElephant(...)'メソッドが私をとにかく置くので、 'Future'の簡単なツールの中でそれほど厄介な方法がないかどうかです。多分、私はもっと良い解決策を探して、それを 'Future' APIに' Promise'でつなぎます。 –

+0

あなたと@ PH88は正しいです(実験的な確認のおかげで!)このコードが漏れるのはスタックではありません。しかし、ヒープ上にオブジェクトとして保持されているスタックフレームと論理的に等価なメモリがリークしますか?経験的には、スレッドが漏れていないように思えます。フォーク/ジョインプールの1つのスレッドだけがいつでも消費され、平穏に眠っているようです。これはそれほど悪くないかもしれません。 –

+0

@SteveWaldman、もしあなたがこの質問に戻ったら、あなたのコメントのいくつかに取り組んだ私のアップデートを見てください。 – SergGr

関連する問題