2017-07-25 10 views
0

sparkストリーミング(pyspark)からelasticserachにデータをインデックス付けする際に問題が発生しています。データはdstreamです。インデックス= CLUSと種類を=データpyspark - elasticsearchにdstreamを書き込むときにエラーが発生する

GET /clus/_mapping/data 
{ 
    "clus": { 
     "mappings": { 
     "data": { 
      "properties": { 
       "content": { 
        "type": "text" 
       } 
      } 
     } 
     } 
    } 
} 

をここに私のコードです:

ES_HOST = { 
    "host" : "localhost", 
    "port" : 9200 
} 

INDEX_NAME = 'clus' 
TYPE_NAME = 'data' 
ID_FIELD = 'responseID' 

# create ES client 
es = Elasticsearch(hosts = [ES_HOST]) 

# some config before sending to elastic  
if not es.indices.exists(INDEX_NAME): 
    request_body = { 
     "settings" : { 
      "number_of_shards": 1, 
      "number_of_replicas": 0 
     } 
    } 
    res = es.indices.create(index = INDEX_NAME, body = request_body) 
es_write_conf = { 
     "es.nodes": "localhost", 
     "es.port": "9200", 
     "es.resource": INDEX_NAME+"/"+TYPE_NAME 
    } 
sc = SparkContext(appName="PythonStreamingKafka") 
    ssc = StreamingContext(sc, 30) 

# ..... 
#loading data to put in elastic : lines4 

    lines4.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(
     path='-', 
     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
     keyClass="org.apache.hadoop.io.NullWritable", 
     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
     conf=es_write_conf)) 




    ssc.start() 
    ssc.awaitTermination() 

ここでエラーです:

それはここに私が使用していた弾性指標だ

(u'01B', 0) 
(u'1A5', 1) 
.... 

をどのように見えるか、以下の

17/07/25 15:31:31 ERROR Executor: Exception in task 2.0 in stage 11.0 (TID 23) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1295) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 21) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1295) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 22) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1295) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

+0

エラーをローカルに再現するために、スクリプトに完全なパッケージインポートを追加することができます。また、あなたのelasticsearchがhadoopクラスタからアクセス可能であることを確認しましたか? – MedAli

答えて

1

エラーがあるようですあなたがインデックスを作成する方法で。インデックスを作成する際には、ご希望のbodymappingを送信する必要があります。ここでは実施例である:

from elasticsearch import Elasticsearch 

es = Elasticsearch(["http://localhost:9200"]) 
# create index 
index_name = "clus" 
index_mapping = { 
    "clus": { 
     "mappings": { 
     "data": { 
      "properties": { 
       "content": { 
        "type": "text" 
       } 
      } 
     } 
     } 
    } 
} 


if not es.indices.exists(index_name): 
     res = es.indices.create(index=index_name, body=index_mapping) 
     print res 

あなたは、インデックスが作成されたことを確認するためのrepsonseとして、この{u'acknowledged': True}を取得する必要があります。あなたのデータをごループがforeachRDDを使用してDSTREAMとサイドノートとして

doc = {"content": str((u'1A5', 1))} 
res = es.index(index="clus", doc_type='data', body=doc) 

を次のようにJSON構造{"content": str((u'1A5', 1))}にデータを変換します機能とインデックスそれを適用し、それがインデックスにデータをお勧めしません

リストとして(u'1A5', 1)あなたがキバナの視覚化のような他の文脈でそれを使用することは難しいでしょう。

関連する問題