私はApache Sparkを初めて使用しており、同時にSparkクラスタ上で複数の長期実行プロセス(ジョブ)を実行する必要があります。しばしば、これらの個々のプロセス(それぞれが独自の仕事です)は互いに通信する必要があります。一応、私はカフカをこれらのプロセスの間のブローカーに使うことを検討しています。だから、ハイレベルのジョブにジョブ通信は次のようになります。Kafkaを使用して長時間実行されるスパークジョブ間の通信
- ジョブ#1を
- ジョブ#2を使用して(ストリーミング受信機として設定されているいくつかの作業を行い、カフカのトピックにメッセージをパブリッシュ
StreamingContext
)ジョブ#2が - ジョブ#2が、今ではより
を消費したメッセージに基づいて、いくつかの作業を行うことができ、それを消費し、その同じカフカのトピックに、とすぐにメッセージがトピックに公開されているようにストリーミングコンテキストは、Spark Driverノードで動作するリスナーをブロックしています。 2つの意味合い今があることを...
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String,
String)] = {
// some configs here
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, props, topicsSet)
}
def consumerHandler(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(10))
createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
rdd.collect().foreach { msg =>
// Now do some work as soon as we receive a messsage from the topic
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()
:これは私がそうのようなストリーミングコンシューマを開始するとことを意味
- ドライバーは今、ブロッキングやカフカから消費する作業のために聞いているが。作業(メッセージ)を受信している場合、それらは実際にそう最初
時に実行されるように使用可能な任意のワーカーノードに送信され、私は上記の言った何かが間違っているか、誤解を招くの場合と
BTW - foreachRDDで 'rdd.collect'を使うと、データセット全体がドライバに返されます。あなたは間違いなくそれを望んでいます。 –
ありがとう@Yuval(+1)、消費されている個々のメッセージにアクセスするためのより良い/より効率的な方法はありますか?これは私の意図ではなく、私はAPIの初心者ですので、私のコードを更新してください! – smeeb
'rdd.foreach'を使うことができます。 –