0

私はSparkとScalaの初心者です。スパークストリーミングを使用してリアルタイムストリーミングデータ/ログを処​​理するにはどうすればよいですか?

私は、Kafka Publisherから1分ごとに[1GB程度のJSONログライン/分を取り込む]ネットワークログを読み込み、最後にElasticSearchに集計値を保存することができるREAL TIME Spark Consumerを実装したいと考えています。

集約はコンポジットキー[クライアントMAC、クライアントIP、サーバーMAC、サーバーIPなど]を使用していくつかの値[like bytes_in、bytes_outなど]に基づいています。私が書いた

スパーク消費者は次のとおりです。

object LogsAnalyzerScalaCS{ 
    def main(args : Array[String]) { 
      val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION") 
      sparkConf.set("es.nodes", "my ip address") 
      sparkConf.set("es.port", "9200") 
      sparkConf.set("es.index.auto.create", "true") 
      sparkConf.set("es.nodes.discovery", "false") 

      val elasticResource = "conrec_1min/1minute" 
      val ssc = new StreamingContext(sparkConf, Seconds(30)) 
      val zkQuorum = "my zk quorum IPs:2181" 
      val consumerGroupId = "LogsConsumer" 
      val topics = "Logs" 
      val topicMap = topics.split(",").map((_,3)).toMap 
      val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap) 
      val logJSON = json.map(_._2) 
      try{ 
      logJSON.foreachRDD(rdd =>{ 
       if(!rdd.isEmpty()){ 
        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
        import sqlContext.implicits._ 
        val df = sqlContext.read.json(rdd) 
        val groupedData = 
((df.groupBy("id","start_time_formated","l2_c","l3_c", 
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration")) 
        val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time") 
        dataForES.saveToEs(elasticResource) 
        dataForES.show(); 
       } 
       }) 
      } 
      catch{ 
      case e: Exception => print("Exception has occurred : "+e.getMessage) 
      } 
      ssc.start() 
      ssc.awaitTermination() 
     } 

object SQLContextSingleton { 
    @transient private var instance: org.apache.spark.sql.SQLContext = _ 
    def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext = { 
     if (instance == null) { 
     instance = new org.apache.spark.sql.SQLContext(sparkContext) 
     } 
     instance 
    } 
    } 
} 

私はすべての私のアプローチでは正確であるかどうかを知りたいのですが、すべての最初に[私は1分ログの集約が必要考慮]?

このコードを使用して問題があるようです:

  1. この消費者は、カフカブローカーから30秒ごとに をデータを取得し、それゆえ、その30 秒のデータのためにElasticsearchする最終集計を保存します ユニークキー[少なくとも1分間に2つのエントリ]のElasticsearchの行数を増やします。 UIツール[木場屋]がさらに集計する必要があるとします。 のポーリング時間を30秒から60秒に増やすと、 には多くの時間がかかり、したがってリアルタイムでは残りません。
  2. 私は、ElasticSearchでキーごとに1行だけ 行を保存するように実装したいと思います。したがって、 カフカブローカーから[1分単位]を取得する私のデータセットに新しいキー値を取得していない時間まで、 という集計を実行したいとします。 私はこれを groupByKey()とupdateStateByKey()関数を使用して達成できることがわかりましたが、私はこれをどのように私のケースで使用することができるかを確認することができません [JSONを変換する必要があります フラットな値を持つログ行の文字列を使用して、 これらの関数]を使用しますか?これらの関数を使用する場合は、 を最終的な集計値をElasticSearchに保存する必要がありますか?
  3. これを達成する他の方法はありますか?

あなたの迅速なヘルプは高く評価されます。

よろしく、 Bhupesh

+0

アプローチがよさそうです。スループットを向上させるために、エグゼキュータを追加して起動することはできますか? – maasg

+0

応答ありがとうございます。私が記載したポイントで確認してください。 –

答えて

0
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

object Main { 
def main(args: Array[String]): Unit = { 


val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]") 
val ssc = new StreamingContext(conf, Seconds(15)) 

val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "group1", 
    "auto.offset.reset" -> "earliest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
)//,localhost:9094,localhost:9095" 

val topics = Array("test") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

val out = stream.map(record => 
    record.value 
) 

val words = out.flatMap(_.split(" ")) 
val count = words.map(word => (word, 1)) 
val wdc = count.reduceByKey(_+_) 

val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) 

wdc.foreachRDD{rdd=> 
     val es = sqlContext.createDataFrame(rdd).toDF("word","count") 
     import org.elasticsearch.spark.sql._ 
     es.saveToEs("wordcount/testing") 
    es.show() 
} 

ssc.start() 
ssc.awaitTermination() 

} 
} 

To see full example and sbt

関連する問題