2016-06-28 5 views
0

saveToEsを使用してRDD[(Object,Object)] messagesをElasticSearchに保存します。以下のコードではDStream[String] transformedを繰り返し、RDD[String] rddの場合はprepareを使用してRDD[(Object,Object)]を作成します。問題は、私はpom.xmlにアーティファクトelasticsearch-hadoop(バージョン2.3.2)を加えてもsaveToEsは赤でマークされていることである。Spark Streaming + Elasticsearch:シンボルsaveToEsを解決できません

transformed.foreachRDD(rdd => { 
    if (!rdd.isEmpty) { 
    val messages = rdd.map(prepare) 
    messages.saveToEs(ec.getResource().toString) 
    } 
}) 

private def prepare(message:String):(Object,Object) = { 

    val m = JSON.parseFull(message) match { 
     case Some(map) => map.asInstanceOf[Map[String,String]] 
     case None => Map.empty[String,String] 
    } 

    val kw = NullWritable.get 

    val vw = new MapWritable 
    for ((k, v) <- m) vw.put(new Text(k), new Text(v)) 

    (kw, vw)  
} 
+0

あなたはどのようなIDEを使うのですか? – MirMasej

+0

@MirMasej:IntellijIDEA 2016.1.1。奇妙なことに、SparkContextでは 'saveToEs'が動作しますが、StreamingContextを使用したい場合は赤で強調表示されます。 – Klue

+0

まず、コードがコマンドラインからコンパイルされていることを確認してください: 'mvn compile'。もしそうなら、それはIDEでそれをリフレッシュするという質問になるはずです。 – MirMasej

答えて

1

あなたはorg.elasticsearch.spark

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark_2.10 --> 
<dependency> 
    <groupId>org.elasticsearch</groupId> 
    <artifactId>elasticsearch-spark_2.10</artifactId> 
    <version>2.2.0</version> 
</dependency> 
で定義されているsaveToESメソッドを使用するための適切なパッケージをインポートする必要があります

出典:

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-scala

関連する問題