StormのElasticsearchに文書を索引したいが、Elasticsearchに索引付けする文書を取得できませんでした。私のトポロジでelasticsearch-hadoopライブラリを使用して嵐からelasticsearchにタプルを索引付けすることはできません
私はこのようなJSON発するKafkaSpout持っている{「tweetId」:1、「テキスト」:「こんにちは」} EsBoltに嵐タプルを書き込みelasticsearch、HadoopのライブラリーからネイティブのボルトがありますElasticsearch(docはこちらです:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html) これは私のEsBoltのためのconfigsです:
Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);
最初の2つの構成が、デフォルトでは、これらの値を持っているが、私は明示的に設定することにしました。私は同じ結果を得て、彼らなしで試してみました。
そして、これは私が私のトポロジを構築する方法である:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
.setNumTasks(kafkaSpoutNumberOfTasks);
builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
.setNumTasks(elasticsearchBoltNumberOfTasks)
.shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);
return builder.createTopology();
私はローカルトポロジーを実行する前に、私は、このインデックスにElasticsearchの「さえずり」のインデックスとマッピング「つぶやき」を作成します。
{
"twitter": {
"mappings": {
"tweet": {
"properties": {
"text": {
"type": "string"
},
"tweetId": {
"type": "string"
}
}
}
}
}
}
私はローカルトポロジーを実行
、これはタプルを処理するとき、私は私のコンソールで得るものです:私は、新しく作成されたタイプ(カール-XGET「http://localhost:9200/twitter/_mapping/tweet」)のマッピングを取得した場合 これは私が得るものです:
Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]
TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]
BOLT ack TASK: 6 TIME: TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:
タプルは処理されているようです。しかし私はElasticsearchで索引付けされた文書を持っていません。
私はEsBoltの設定をしたときに何か問題があったと思います。あなたはフラッシュのサイズに達すると