私は、IMDBデータベースをCSV形式でダンプしています。 CSVは、次のようになります。Logstashでのアグリゲーションにもかかわらず、ElasticSearchの重複エントリ
name, movie, role
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
.........
"A.S., Alwi","Rumah masa depan (1984)",PLAYED_IN
"A.S., Giri","Sumangali (1940)",PLAYED_IN
"A.S., Luis","Bob the Drag Queen: Bloodbath (2016)",PLAYED_IN
"A.S., Pragathi","Suli (2016)",PLAYED_IN
"A.S.F. Dancers, The","D' Lucky Ones! (2006)",PLAYED_IN
.........
私の目標は、弾性検索にデータを置くことですが、私は、彼らがデータセットようで演奏している動画を集約したいので、私は俳優の重複を持っている必要はありません
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBm",
"_score": 13.028783,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Naomi and Ely's No Kiss List (2015)",
"Staten Island Summer (2015/II)",
"What Happened Last Night (2016)",
...
],
"@version": "1",
"name": "Abernethy, Kevin",
}
}
私はElasticSearchにデータをプッシュするためにLogstashを使用しています。私は集計プラグインを使用して、私の設定ファイルは以下の通りである。私は、インデックスの簡単な検索を行う際に、すべての俳優が同じように、「映画」フィールドに一つだけの映画で複製され、その後、
input {
file {
path => "/home/maeln/imdb-data/roles.csv"
start_position => "beginning"
}
}
filter {
csv {
columns => [ "name", "movie" ]
remove_field => ["role", "message", "host", "column3", "path"]
separator => ","
}
aggregate {
task_id => "%{name}"
code => "
map['movie'] ||= []
event.to_hash.each do |key,value|
map[key] = value unless map.has_key?(key)
map[key] << value if map[key].is_a?(Array)
end
"
push_previous_map_as_event => true
timeout => 30
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "imdb13"
}
}
しかしをこれは:
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 149,
"max_score": 13.028783,
"hits": [
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBm",
"_score": 13.028783,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Naomi and Ely's No Kiss List (2015)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBq",
"_score": 12.998644,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Staten Island Summer (2015/II)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBu",
"_score": 12.998644,
"_source": {
"@timestamp": "2017-01-18T09:42:15.150Z",
"movie": [
"What Happened Last Night (2016)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
.....
これを修正する方法はありますか?
logstashの--debugオプションによるログ(部分的にログ全体が〜1Gioです):paste(私はstackoverflowの30000文字制限のためにpastebinに置きます)。
ログの最後の行:logstash.filters.aggregateを含む行だけで
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"role"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.900Z %{host} %{message}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"message"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"host"}
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.915Z %{host} %{message}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"column3"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Christopher'
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["Tabi tabi po! (2001)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.921Z %{host} %{message}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.911Z, "movie"=>["21 Jump Street (1987)"], "@version"=>"1", "name"=>"Ansley, Zachary", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anseth, Elias Moussaoui'
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Death Match: Fighting Fist of Samurai Joe (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Diplomat Hotel (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Alvin'
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] filter received {"event"=>{"path"=>"/home/maeln/Projets/oracle-of-bacon/imdb-data/roles.csv", "@timestamp"=>2017-01-18T10:34:09.900Z, "@version"=>"1", "host"=>"maeln-GE70-2PE", "message"=>"\"Ansfelt, Jacob\",\"Manden med de gyldne ører (2009)\",PLAYED_IN"}}
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
ペーストビン:あなたが直面しているlink
コマンドラインで '--debug'フラグを付け加えて、得られた出力で質問を更新できますか? – Val
@Val完了しましたが、ログは巨大なので、部分的なものだけを入れます。 – Maeln
'logstash.filters.aggregate'を持つ行だけgrepできますか? – Val