3

私たちはkafkaからデータを取得し、MongoDBにデータを挿入する前に(各メッセージで)いくつかの変換を行うスパークストリーミングアプリケーションを提供しています。我々は、メッセージをバルクでカフカにプッシュし、スパークストリーミングアプリケーションからの(各メッセージの)確認応答を待つミドルウェアアプリケーションを持っています。メッセージをKafkaに送信した後、一定期間(5秒)以内にミドルウェアによって肯定応答が受信されない場合、ミドルウェアアプリケーションはメッセージを再送する。スパークストリーミングアプリケーションは、約50〜100件のメッセージ(1回のバッチで)を受信し、5秒以内にすべてのメッセージに対する確認応答を送信することができます。しかし、ミドルウェア・アプリケーションが100以上のメッセージをプッシュすると、ミドルウェア・アプリケーションは、通知を送信するスパーク・ストリーミングの遅延のためにメッセージを再送する結果となる。現在の実装では、承認を送信するたびにプロデューサを作成しますが、これには3-4秒かかります。スパークストリーミングでkafkaプロデューサを再利用

package com.testing 

import org.apache.spark.streaming._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame } 
import java.util.HashMap 
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } 
import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 

import org.joda.time._ 
import org.joda.time.format._ 

import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 
import com.mongodb.util.JSON 

import scala.io.Source._ 
import java.util.Properties 
import java.util.Calendar 

import scala.collection.immutable 
import org.json4s.DefaultFormats 


object Sample_Streaming { 

    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Sample_Streaming") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val props = new HashMap[String, Object]() 


    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 

    val TopicMap = Map("sampleTopic" -> 1) 
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2) 

     val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", "connectionURI") 
     .option("spark.mongodb.input.collection", "schemaCollectionName") 
     .load() 

     val outSchema = schemaDf.schema 
     var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema) 

    KafkaDstream.foreachRDD(rdd => rdd.collect().map { x => 
     { 
     val jsonInput: JValue = parse(x) 


     /*Do all the transformations using Json libraries*/ 

     val json4s_transformed = "transformed json" 

     val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil) 
     val df = sqlContext.read.schema(outSchema).json(rdd) 

     df.write.option("spark.mongodb.output.uri", "connectionURI") 
        .option("collection", "Collection") 
        .mode("append").format("com.mongodb.spark.sql").save() 

     val producer = new KafkaProducer[String, String](props) 
     val message = new ProducerRecord[String, String]("topic_name", null, "message_received") 

     producer.send(message) 
     producer.close() 


     } 

    } 

    ) 

    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 

そこで我々はforeachRDDの外プロデューサを作成し、全体のバッチ間隔のためにそれを再利用する別のアプローチを試みた(以下のコードです)。これは、承認を送信するたびにプロデューサーを作成していないため、これは役に立ちましたようです。しかし何らかの理由で、スパークのUIでアプリケーションを監視すると、ストリーミングアプリケーションのメモリ消費は着実に増加しています。私たちは、spark-submitで--num-executors 1オプションを使用して、糸によって開始されるエグゼキュータの数を制限しようとしました。

object Sample_Streaming { 

    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Sample_Streaming") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val props = new HashMap[String, Object]() 


    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 

    val TopicMap = Map("sampleTopic" -> 1) 
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2) 

     val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", "connectionURI") 
     .option("spark.mongodb.input.collection", "schemaCollectionName") 
     .load() 

     val outSchema = schemaDf.schema 
    val producer = new KafkaProducer[String, String](props) 
    KafkaDstream.foreachRDD(rdd => 
      { 

      rdd.collect().map (x => 
      { 

       val jsonInput: JValue = parse(x) 


       /*Do all the transformations using Json libraries*/ 

       val json4s_transformed = "transformed json" 

       val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil) 
       val df = sqlContext.read.schema(outSchema).json(rdd) 

       df.write.option("spark.mongodb.output.uri", "connectionURI") 
         .option("collection", "Collection") 
         .mode("append").format("com.mongodb.spark.sql").save() 


       val message = new ProducerRecord[String, String]("topic_name", null, "message_received") 

       producer.send(message) 
       producer.close() 


      } 

      ) 
     } 

    ) 

    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 

私の質問は以下のとおりです。私は現在、我々は手動でそれが私たちのクラスタ内で使用可能なメモリを使い果たす5分ごとまで、アプリケーションを監視している、スパークアプリケーションのメモリ消費量を監視するにはどうすればよい

  1. (2ノードそれぞれ16GB)?

  2. 業界でスパークストリーミングとカフカを使用する際のベストプラクティスは何ですか?

答えて

3

カフカは、プロデューサーとコンシューマーの配送保証を提供します。プロデューサーと消費者の間で「トップ・オーバー」認知メカニズムを実装するのは残酷です。プロデューサが正常に動作し、障害が発生した場合にコンシューマがリカバリできることを確認し、2-endのデリバリが確実に行われるようにします。

なぜパフォーマンスが悪いのか不思議ではありません。処理は、外部DBへの書き込みまで要素ごとに順番に実行されています。これはが間違ったであり、メモリ消費の問題を修正する前に対処する必要があります。私たちはcase classのインスタンスですべての文字列中心のJSON変換を置き換えることができれば、このプロセスをさらに向上させることができ

val producer = // create producer 

val jsonDStream = kafkaDstream.transform{rdd => rdd.map{elem => 
    val json = parse(elem) 
    render(doAllTransformations(json)) // output should be a String-formatted JSON object 
    } 
} 

jsonDStream.foreachRDD{ rdd => 
    val df = sqlContext.read.schema(outSchema).json(rdd) // transform the complete collection, not element by element 
    df.write.option("spark.mongodb.output.uri", "connectionURI") // write in bulk, not one by one 
    .option("collection", "Collection") 
    .mode("append").format("com.mongodb.spark.sql").save() 
    val msg = //create message 
    producer.send(msg) 
    producer.flush() // force send. *DO NOT Close* otherwise it will not be able to send any more messages 
} 

:よう

このプロセスを向上させることができました。

+0

あなたのご意見ありがとうございます、私はあなたの提案を実装し、あなたにご連絡します – Sid

+0

私はあなたの提案を分析するために戻ってきた、彼らは私を啓発した。我々はforeachRDDの外ですべての変換を動かす過程にある。あなたの提案に応じて、謝辞を削除してチェックポイントに置き換えることも考えています。私たちがやっていることの一つは、複数の話題から調達することです。メッセージが壊れている可能性があります(探している情報がありません)。その場合、そのメッセージを除外します。私は、要素の存在に基づいてDStream RDDを(JSON Stringで)フィルタリングする方法のいくつかの例を見つけることを求めました。あなたはこれについていくつかの光を当てることができますか? – Sid

+0

私はそれを理解することができました。 JValue =パース(x)の ヴァル・検証=(JSONの\ "キー1")= JNothing && (:ここで私はRDD ヴァルfilteredDStream = kafkaDstream.filter {X => ヴァルJSONをフィルタリングすることができる午前方法です! !JSON \ "KEY2")= JNothing && (JSONの\ "KEY3")= JNothing 検証} ヴァルtransformedVibrationStream = filteredDStream.transform {RDD => rdd.map {ELEM => ヴァルのJSON: JValue = parse(elem) /*すべての変換*/ コンパクト(render(json_transfor) med)) } – Sid

関連する問題