これは意味ですか?
カフカ - >スパークストリーミング - > elasticsearchデシベル
val sqlContext = new SQLContext(sc)
//kafka group
val group_id = "receiveScanner"
// kafka topic
val topic = Map("testStreaming"-> 1)
// zk connect
val zkParams = Map(
"zookeeper.connect" ->"localhost",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> group_id)
// Kafka
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
val receiveData = kafkaConsumer.map(_._2)
// printer kafka data
receiveData.print()
receiveData.foreachRDD{ rdd=>
val transform = rdd.map{ line =>
val data = Json.parse(line)
// play json parse
val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
val name = (data \ "name" ).asOpt[String] match { case Some(x)=> x ; case None => "" }
val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
val address = (data \ "address" ).asOpt[String] match { case Some(x)=> x ; case None => "" }
Row(id,name,age,address)
}
val transfromrecive = sqlContext.createDataFrame(transform,schameType)
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
//filter age < 20 , to ES database
transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
.saveToEs("member/user",Map("es.mapping.id" -> "id"))
}
}
/** * データフレームschame * */
def schameType = StructType(
StructField("id",IntegerType,false)::
StructField("name",StringType,false)::
StructField("age",IntegerType,false)::
StructField("address",StringType,false)::
Nil
)
Elasticsearchは '実time'システムではありません。 __Source__:https://www.elastic.co/guide/en/elasticsearch/reference/current/_basic_concepts.html#_near_realtime_nrt – avr
ドキュメントでは、検索可能になるまでに1秒かかると言われていますストリーミングスパークに変換するには?そうでなければ、Kafka-> spark Streaming(ML) - >弾力的検索が優れていると思いますか? ありがとうございます。 –
「1秒」を超えるものは、リアルタイムではありません。アプリケーションを「リアルタイム」にしたい場合、elasticsearchはどこに(スパークストリーミングやそれ以前に)どこに置いても機能しないかもしれません。 – avr