0
saveToEs
を使用してRDD[(Object,Object)] messages
をElasticSearchに保存します。以下のコードではDStream[String] transformed
を繰り返し、RDD[String] rdd
の場合はprepare
を使用してRDD[(Object,Object)]
を作成します。問題は、私はpom.xml
にアーティファクトelasticsearch-hadoop
(バージョン2.3.2)を加えてもsaveToEs
は赤でマークされていることである。Spark Streaming + Elasticsearch:シンボルsaveToEsを解決できません
transformed.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val messages = rdd.map(prepare)
messages.saveToEs(ec.getResource().toString)
}
})
private def prepare(message:String):(Object,Object) = {
val m = JSON.parseFull(message) match {
case Some(map) => map.asInstanceOf[Map[String,String]]
case None => Map.empty[String,String]
}
val kw = NullWritable.get
val vw = new MapWritable
for ((k, v) <- m) vw.put(new Text(k), new Text(v))
(kw, vw)
}
あなたはどのようなIDEを使うのですか? – MirMasej
@MirMasej:IntellijIDEA 2016.1.1。奇妙なことに、SparkContextでは 'saveToEs'が動作しますが、StreamingContextを使用したい場合は赤で強調表示されます。 – Klue
まず、コードがコマンドラインからコンパイルされていることを確認してください: 'mvn compile'。もしそうなら、それはIDEでそれをリフレッシュするという質問になるはずです。 – MirMasej