2017-08-16 6 views
1

SparkからElasticsearchにすぐにJSON文字列をインデックス登録することは可能ですか?私はScalaの中間的なクラス、つまりPOJOSを望んでいません。SparkのJson StringsをElasticsearchに即時インデックスする

私はSpark、Scala、およびElastic 5.5を使用しています。

私のコードは次のようになります。

val s = xml 

    .map { x => 
     import org.json.XML 

     XML.toJSONObject(x).toString 

    }.top(1) 

    spark.sparkContext.makeRDD(s).saveToEs("test/article") 

は、しかし、私は入れません:

org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. 
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) 

でも、私は、インデックスすることができますがKibana、または他のESのクライアントを使用してJSON文字列。

このコードでは、RDDをXMLコンテンツの文字列でJSONに変換した後、ESでインデックスを作成しようとしています。

答えて

1

を:

import org.apache.spark.SparkContext 
import org.elasticsearch.spark.rdd.EsSpark 
val jsonField = .....//some json          
val rdd = sc.makeRDD(jsonField)    
EsSpark.saveToEs(rdd, "spark/docs") 

あなたのオブジェクトは、書き込み動作の前にJSON型ではない場合には、あなたの人生を簡素化し、JSON形式に変換する方法を使用することができます。詳細については

val persitedObject = .....//some json             
sparkContext.makeRDD(persitedObject) 
         .saveJsonToEs("spark/docs") 

はをチェック。

関連する問題