2016-04-16 4 views
0

StormのElasticsearchに文書を索引したいが、Elasticsearchに索引付けする文書を取得できませんでした。私のトポロジでelasticsearch-hadoopライブラリを使用して嵐からelasticsearchにタプルを索引付けすることはできません

私はこのようなJSON発するKafkaSpout持っている{「tweetId」:1、「テキスト」:「こんにちは」} EsBoltに嵐タプルを書き込みelasticsearch、HadoopのライブラリーからネイティブのボルトがありますElasticsearch(docはこちらです:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html) これは私のEsBoltのためのconfigsです:

Map conf = new HashMap(); 
conf.put("es.nodes","127.0.0.1"); 
conf.put("es.port","9200"); 
conf.put("es.resource","twitter/tweet"); 
conf.put("es.index.auto.create","no"); 
conf.put("es.input.json", "true"); 
conf.put("es.mapping.id", "tweetId"); 
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf); 

最初の2つの構成が、デフォルトでは、これらの値を持っているが、私は明示的に設定することにしました。私は同じ結果を得て、彼らなしで試してみました。

そして、これは私が私のトポロジを構築する方法である:

TopologyBuilder builder = new TopologyBuilder(); 

builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism) 
     .setNumTasks(kafkaSpoutNumberOfTasks); 


builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism) 
     .setNumTasks(elasticsearchBoltNumberOfTasks) 
     .shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID); 

return builder.createTopology(); 

私はローカルトポロジーを実行する前に、私は、このインデックスにElasticsearchの「さえずり」のインデックスとマッピング「つぶやき」を作成します。

{ 
    "twitter": { 
     "mappings": { 
     "tweet": { 
      "properties": { 
       "text": { 
        "type": "string" 
       }, 
       "tweetId": { 
        "type": "string" 
       } 
      } 
     } 
     } 
    } 
} 
私はローカルトポロジーを実行

、これはタプルを処理するとき、私は私のコンソールで得るものです:私は、新しく作成されたタイプ(カール-XGET「http://localhost:9200/twitter/_mapping/tweet」)のマッピングを取得した場合 これは私が得るものです:

Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] 

Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979] 

TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979] 

BOLT ack TASK: 6 TIME: TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] 

Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA: 

タプルは処理されているようです。しかし私はElasticsearchで索引付けされた文書を持っていません。

私はEsBoltの設定をしたときに何か問題があったと思います。あなたはフラッシュのサイズに達すると

答えて

0

ドキュメントのみインデックス付けされ、es.storm.bolt.flush.entries.size

代わりで指定された、あなたはキューのフラッシュをトリガTICKの周波数を設定することがあります。デフォルトでは

config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); 

、ES-Hadoopのダニのフラッシュを、es.storm.bolt.tick.tuple.flushパラメータごととして。

関連する問題