あなたは常に役者システムでグローバルな怠惰のvalを定義することができます。
object Execution {
implicit lazy val actorSystem: ActorSystem = ActorSystem()
implicit lazy val materializer: Materializer = ActorMaterializer()
}
次に、あなたはちょうどあなたがアッカストリームを使用したいクラスのいずれかでそれをインポートします。
import Execution._
val stream: DStream[...] = ...
stream.foreachRDD { rdd =>
...
rdd.foreachPartition { records =>
val (queue, done) = Source.queue(...)
.via(Producer.flow(...))
.toMat(Sink.ignore)(Keep.both)
.run() // implicitly pulls `Execution.materializer` from scope,
// which in turn will initialize `Execution.actorSystem`
... // push records to the queue
// wait until the stream is completed
Await.result(done, 10.minutes)
}
}
上記は擬似コードのようなものですが、一般的な考え方を伝えるべきだと思います。
このようにして、システムはすべての実行者JVMで必要なときに一度だけ初期化されます。また、あなたは、JVMが終了すると、それが自動的にシャットダウンするために俳優システム「鬼神の」を作ることができます。
object Execution {
private lazy val config = ConfigFactory.parseString("akka.daemonic = on")
.withFallback(ConfigFactory.load())
implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config)
implicit lazy val materializer: Materializer = ActorMaterializer()
}
当社は、スパークの仕事でこれをやっているし、それが完璧に動作します。
これはブロードキャスト変数なしでも機能し、あらゆる種類のSparkジョブ、ストリーミングなどで使用できます。システムはシングルトンオブジェクトで定義されているため、JVMインスタンスごとに1回だけ初期化されることが保証されています(さまざまなクラスローダーを使用しますが、Sparkのコンテキストでは重要ではありません)。同じJVMに(おそらく別のスレッドで)、アクターシステムを1回だけ初期化します。 lazy val
は初期化のスレッド安全性を保証し、ActorSystem
はスレッドセーフであるため、この点でも問題は発生しません。
バニラカフカプロデューサーとAkkaストリームを使用することの間のパフォーマンスの比較を確認することは興味深いでしょう。私は誰が他の人よりもはるかに速く行く強い理由を見ることができません。 – maasg
私は、本質的にはよりダイナミックで速いと思われるリアクティブストリーム機能のメリットがあると私は想定しています。ブロックしにくい。どのように背圧をかけ、元のプロデューサーと一緒にすべてを実装するのか分かりません。 – MaatDeamon
Kafkaプロデューサはスループットを最適化するためのバッファリングメカニズムを内部的に実装しています。書面では、言及されている利点のいずれも見られません。背圧の必要はありません。カフカは、通常、適切な寸法を与えて荷重を受け入れるのに十分速いです。ブロッキング:バッファリングの問題ではありません。書き込みを完了して作業を続行し、オフセットをコミットする必要があると思います。遠方から見ると、勝てない最適化のように見えます。しかし、私は数字が意見に勝つと思うので、比較を見るのは面白いでしょう。 – maasg