2

私はApache Sparkを初めて使用しており、同時にSparkクラスタ上で複数の長期実行プロセス(ジョブ)を実行する必要があります。しばしば、これらの個々のプロセス(それぞれが独自の仕事です)は互いに通信する必要があります。一応、私はカフカをこれらのプロセスの間のブローカーに使うことを検討しています。だから、ハイレベルのジョブにジョブ通信は次のようになります。Kafkaを使用して長時間実行されるスパークジョブ間の通信

  1. ジョブ#1を
  2. ジョブ#2を使用して(ストリーミング受信機として設定されているいくつかの作業を行い、カフカのトピックにメッセージをパブリッシュStreamingContext)ジョブ#2が
  3. ジョブ#2が、今ではより

を消費したメッセージに基づいて、いくつかの作業を行うことができ、それを消費し、その同じカフカのトピックに、とすぐにメッセージがトピックに公開されているようにストリーミングコンテキストは、Spark Driverノードで動作するリスナーをブロックしています。 2つの意味合い今があることを...

def createKafkaStream(ssc: StreamingContext, 
     kafkaTopics: String, brokers: String): DStream[(String, 
     String)] = { 
    // some configs here 
    KafkaUtils.createDirectStream[String, String, StringDecoder, 
     StringDecoder](ssc, props, topicsSet) 
} 

def consumerHandler(): StreamingContext = { 
    val ssc = new StreamingContext(sc, Seconds(10)) 

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => { 
     rdd.collect().foreach { msg => 
      // Now do some work as soon as we receive a messsage from the topic 
     } 
    }) 

    ssc 
} 

StreamingContext.getActive.foreach { 
    _.stop(stopSparkContext = false) 
} 

val ssc = StreamingContext.getActiveOrCreate(consumerHandler) 
ssc.start() 
ssc.awaitTermination() 

:これは私がそうのようなストリーミングコンシューマを開始するとことを意味

  1. ドライバーは今、ブロッキングやカフカから消費する作業のために聞いているが。作業(メッセージ)を受信して​​いる場合、それらは実際にそう最初

時に実行されるように使用可能な任意のワーカーノードに送信され、私は上記の言った何かが間違っているか、誤解を招くの場合と

  • 、始めてください私を訂正して!私が多かれ少なかれ正しかったと仮定して、私は単に自分の基準を満たすために、これを達成するためのよりスケーラブルな、あるいは実行可能な方法があるのか​​どうか疑問に思っています。再び、私はSparkノード上で実行されている2つの長時間実行ジョブ(ジョブ#1とジョブ#2)を持っており、そのうちの1つは、もう一方に「仕事を送る」ことができる必要があります。何か案は?

  • +1

    BTW - foreachRDDで 'rdd.collect'を使うと、データセット全体がドライバに返されます。あなたは間違いなくそれを望んでいます。 –

    +0

    ありがとう@Yuval(+1)、消費されている個々のメッセージにアクセスするためのより良い/より効率的な方法はありますか?これは私の意図ではなく、私はAPIの初心者ですので、私のコードを更新してください! – smeeb

    +1

    'rdd.foreach'を使うことができます。 –

    答えて

    2

    私が知る限り、ストリーミングコンテキストは、 がSpark Driverノードで実行されているリスナーをブロックしています。

    StreamingContext(単数)はブロッキングリスナーではありません。ストリーミングジョブの実行グラフを作成するのが仕事です。

    Kafkaから読み込みを開始するときは、10秒ごとに新しいレコードをフェッチするように指定します。今から何が起こるかは、カフカのために使用しているカフカ抽象化、KafkaUtils.createStreamによるレシーバアプローチ、KafkaUtils.createDirectStreamによるレシーバレスアプローチのいずれかによって異なります。

    一般にどちらの方法でも、データはKafkaから消費され、次に処理する各Sparkワーカーにパラレルで処理されます。

    その後、私は単にこの

    を達成するために、よりスケーラブルやパフォーマンス 方法がある場合は、このアプローチは非常にスケーラブルであると思いまして。レシーバレスアプローチを使用する場合、各カフカパーティションは、特定のRDD内のスパークパーティションにマップされます。 Kafkaのパーティションの量を増やすか、またはSpark内のデータを再パーティション化することによって並列性を高めることができます(DStream.repartitionを使用)。この設定をテストして、パフォーマンス要件に合っているかどうかを判断することをお勧めします。

    +0

    ありがとうございます@Yuval(+1)、あなたのためのいくつかのフォローアップの質問ありがとう! **(1)**最初にSparkのKafkaトピックに対して「競合する消費者」を設定するためには、クラスタごとに1消費者が必要であることを確認できますか?これは受信機と受信機なしの両方の設定で当てはまりますか? **(2)**レシーバレスレシーバを使用する場合の一般的なガイドラインは何ですか? – smeeb

    +0

    **(3)**「カフカからの読み込みを開始すると、10秒ごとに新しいレコードを取得するように指定しています...」というとき、これはどこに設定されていますか? 10秒以外に設定することはできますか?最後に、**(4)**レシーバーアプローチを使用するときにマップされるカフカパーティションは何ですか?もう一度ありがとう! – smeeb

    +1

    @ smeeb 1)*「競合する消費者」*とはどういう意味ですか? 2)私は一般に直接ストリーミングアプローチを使用することを推奨します。これはSpark 1.3.0で導入されており、多くの利点があります。私はそれを読むことをお勧めします。 3)ここで設定されています: 'val ssc = new StreamingContext(sc、Seconds(10))'。 4)レシーバベースのアプローチでは、パーティションのマッピングはありません。 Kafkaから同時に読み込みたい場合、複数のコンシューマを接続する必要があります。つまり、複数の 'KafkaUtils.createStream'呼び出しを実行してそれらを結合する必要があります。 –

    関連する問題