2017-08-29 10 views
1

カフカにデータを書き込む際に最も効率的であると思っています。Akka Streamを使用してRDDパーティションをKafkaに書き込むことに興味があります。Akkaストリームをスパークジョブ内からkafkaに書き込む

問題は、私はエグゼキュータごとにアクターシステムを作成する方法が必要であり、ばかばかしいパーティションではないことです。 1つのJVM上の1つのノード上に8つのactorSystemsが存在する可能性があります。しかし、パーティションごとにストリームを持つことは問題ありません。

すでに誰でも行っていますか?

私の理解では、俳優システムはシリアル化できないため、 には実行者ごとにブロードキャスト変数があります。

解決策を見つけた経験があり、テストしていただければ分かりますか?

その他私はいつもhttps://index.scala-lang.org/benfradet/spark-kafka-writer/spark-kafka-0-10-writer/0.3.0?target=_2.11にフォールバックすることができますが、それが最も効率的な方法であるかどうかはわかりません。

+0

バニラカフカプロデューサーとAkkaストリームを使用することの間のパフォーマンスの比較を確認することは興味深いでしょう。私は誰が他の人よりもはるかに速く行く強い理由を見ることができません。 – maasg

+0

私は、本質的にはよりダイナミックで速いと思われるリアクティブストリーム機能のメリットがあると私は想定しています。ブロックしにくい。どのように背圧をかけ、元のプロデューサーと一緒にすべてを実装するのか分かりません。 – MaatDeamon

+0

Kafkaプロデューサはスループットを最適化するためのバッファリングメカニズムを内部的に実装しています。書面では、言及されている利点のいずれも見られません。背圧の必要はありません。カフカは、通常、適切な寸法を与えて荷重を受け入れるのに十分速いです。ブロッキング:バッファリングの問題ではありません。書き込みを完了して作業を続行し、オフセットをコミットする必要があると思います。遠方から見ると、勝てない最適化のように見えます。しかし、私は数字が意見に勝つと思うので、比較を見るのは面白いでしょう。 – maasg

答えて

1

あなたは常に役者システムでグローバルな怠惰のvalを定義することができます。

object Execution { 
    implicit lazy val actorSystem: ActorSystem = ActorSystem() 
    implicit lazy val materializer: Materializer = ActorMaterializer() 
} 

次に、あなたはちょうどあなたがアッカストリームを使用したいクラスのいずれかでそれをインポートします。

import Execution._ 

val stream: DStream[...] = ... 

stream.foreachRDD { rdd => 
    ... 
    rdd.foreachPartition { records => 
    val (queue, done) = Source.queue(...) 
     .via(Producer.flow(...)) 
     .toMat(Sink.ignore)(Keep.both) 
     .run() // implicitly pulls `Execution.materializer` from scope, 
       // which in turn will initialize `Execution.actorSystem` 

    ... // push records to the queue 

    // wait until the stream is completed 
    Await.result(done, 10.minutes) 
    } 
} 

上記は擬似コードのようなものですが、一般的な考え方を伝えるべきだと思います。

このようにして、システムはすべての実行者JVMで必要なときに一度だけ初期化されます。また、あなたは、JVMが終了すると、それが自動的にシャットダウンするために俳優システム「鬼神の」を作ることができます。

object Execution { 
    private lazy val config = ConfigFactory.parseString("akka.daemonic = on") 
    .withFallback(ConfigFactory.load()) 
    implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config) 
    implicit lazy val materializer: Materializer = ActorMaterializer() 
} 

当社は、スパークの仕事でこれをやっているし、それが完璧に動作します。

これはブロードキャスト変数なしでも機能し、あらゆる種類のSparkジョブ、ストリーミングなどで使用できます。システムはシングルトンオブジェクトで定義されているため、JVMインスタンスごとに1回だけ初期化されることが保証されています(さまざまなクラスローダーを使用しますが、Sparkのコンテキストでは重要ではありません)。同じJVMに(おそらく別のスレッドで)、アクターシステムを1回だけ初期化します。 lazy valは初期化のスレッド安全性を保証し、ActorSystemはスレッドセーフであるため、この点でも問題は発生しません。

+0

ありがとう。もう少し理解してください。だからあなたはまったく放送変数を使用しないのですか?コンテキスト・コードをもう少し与えることができますか?スパーク操作でExecutionオブジェクトをどのように使用しますか?スパークストリーミングでそれを使用したいのですが、RDDごとに新しい俳優システムを導入するのか、それともJVMを導入するのかは分かりません。あなたは少し明確にすることができますか?あなたの文脈、スパークストリーミングジョブまたは通常のスパークジョブは何ですか? – MaatDeamon

+0

私は自分の答えを拡大しました。うまくいけばあなたの質問に今すぐ答えます。 –

+0

どのようにストリームを停止しますか?レコードのサイズでテイクを使用しますか? – MaatDeamon

関連する問題