2017-05-10 6 views
2

ストリーミングスパークする:Elasticsearch私は、ログを分析していると私は、このアーキテクチャを持って

kafka->スパークストリーミング - >弾性検索

私の主な目標は、ストリーミングで機械学習モデルを作成することです。 - >弾性検索

2)Kafka->スパークStreaming-> elasticsearch - >スパークストリーミング(ML)

1)Kafka->スパークストリーミング(ML):私は、私は2つのことを行うことができると思います

- スパークストリーミングではインデックスデータを直接使用するため、2番目のアーキテクチャが最適です。どう思いますか?あれは正しいですか? - スパークストリーミングをelasticsearchにリアルタイムで簡単に接続できますか? - スパークストリーミング(弾性検索後)でモデルを作成する場合は、このモデルをこの場所で使用する必要があります(弾性検索後)か、スパークストリーミングで使用できますか? #使用する==リアルタイムで予測する - elasticsearch後にモデルを作成すると、モデルが静的になる(またはリアルタイムアッチではない)

ありがとう。

+0

Elasticsearchは '実time'システムではありません。 __Source__:https://www.elastic.co/guide/en/elasticsearch/reference/current/_basic_concepts.html#_near_realtime_nrt – avr

+0

ドキュメントでは、検索可能になるまでに1秒かかると言われていますストリーミングスパークに変換するには?そうでなければ、Kafka-> spark Streaming(ML) - >弾力的検索が優れていると思いますか? ありがとうございます。 –

+0

「1秒」を超えるものは、リアルタイムではありません。アプリケーションを「リアルタイム」にしたい場合、elasticsearchはどこに(スパークストリーミングやそれ以前に)どこに置いても機能しないかもしれません。 – avr

答えて

0

これは意味ですか?

カフカ - >スパークストリーミング - > elasticsearchデシベル

val sqlContext = new SQLContext(sc) 

//kafka group 
val group_id = "receiveScanner" 
// kafka topic 
val topic = Map("testStreaming"-> 1) 
// zk connect 
val zkParams = Map(
    "zookeeper.connect" ->"localhost", 
    "zookeeper.connection.timeout.ms" -> "10000", 
    "group.id" -> group_id) 

// Kafka 
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER) 
val receiveData = kafkaConsumer.map(_._2) 
// printer kafka data 
receiveData.print() 
receiveData.foreachRDD{ rdd=> 
    val transform = rdd.map{ line => 
    val data = Json.parse(line) 
    // play json parse 
    val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0} 
    val name = (data \ "name" ).asOpt[String] match { case Some(x)=> x ; case None => "" } 
    val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0} 
    val address = (data \ "address" ).asOpt[String] match { case Some(x)=> x ; case None => "" } 
    Row(id,name,age,address) 
    } 

    val transfromrecive = sqlContext.createDataFrame(transform,schameType) 
    import org.apache.spark.sql.functions._ 
    import org.elasticsearch.spark.sql._ 
    //filter age < 20 , to ES database 
    transfromrecive.where(col("age").<(20)).orderBy(col("age").asc) 
    .saveToEs("member/user",Map("es.mapping.id" -> "id")) 
} 

} 

/** * データフレームschame * */

def schameType = StructType(
    StructField("id",IntegerType,false):: 
    StructField("name",StringType,false):: 
    StructField("age",IntegerType,false):: 
    StructField("address",StringType,false):: 
    Nil 
) 
関連する問題