2017-01-18 11 views
0

私は、IMDBデータベースをCSV形式でダンプしています。 CSVは、次のようになります。Logstashでのアグリゲーションにもかかわらず、ElasticSearchの重複エントリ

name, movie, role 
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN 
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN 
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN 
......... 
"A.S., Alwi","Rumah masa depan (1984)",PLAYED_IN 
"A.S., Giri","Sumangali (1940)",PLAYED_IN 
"A.S., Luis","Bob the Drag Queen: Bloodbath (2016)",PLAYED_IN 
"A.S., Pragathi","Suli (2016)",PLAYED_IN 
"A.S.F. Dancers, The","D' Lucky Ones! (2006)",PLAYED_IN 
......... 

私の目標は、弾性検索にデータを置くことですが、私は、彼らがデータセットようで演奏している動画を集約したいので、私は俳優の重複を持っている必要はありません

{ 
    "_index": "imdb13", 
    "_type": "logs", 
    "_id": "AVmw9JHCrsOFTsZwAmBm", 
    "_score": 13.028783, 
    "_source": { 
     "@timestamp": "2017-01-18T09:42:15.149Z", 
     "movie": [ 
     "Naomi and Ely's No Kiss List (2015)", 
     "Staten Island Summer (2015/II)", 
     "What Happened Last Night (2016)", 
     ... 
     ], 
     "@version": "1", 
     "name": "Abernethy, Kevin", 
    } 
    } 

私はElasticSearchにデータをプッシュするためにLogstashを使用しています。私は集計プラグインを使用して、私の設定ファイルは以下の通りである。私は、インデックスの簡単な検索を行う際に、すべての俳優が同じように、「映画」フィールドに一つだけの映画で複製され、その後、

input { 
     file { 
      path => "/home/maeln/imdb-data/roles.csv" 
      start_position => "beginning" 
     } 
} 

filter { 
     csv { 
      columns => [ "name", "movie" ] 
      remove_field => ["role", "message", "host", "column3", "path"] 
      separator => "," 
     } 

     aggregate { 
      task_id => "%{name}" 
      code => " 
       map['movie'] ||= [] 
        event.to_hash.each do |key,value| 
        map[key] = value unless map.has_key?(key) 
        map[key] << value if map[key].is_a?(Array) 
       end 
       " 
      push_previous_map_as_event => true 
      timeout => 30 
      timeout_tags => ['aggregated'] 
     } 

     if "aggregated" not in [tags] { 
      drop {} 
     } 
} 

output { 
    elasticsearch { 
     hosts => "localhost:9200" 
     index => "imdb13" 
    } 
} 

しかしをこれは:

{ 
    "took": 4, 
    "timed_out": false, 
    "_shards": { 
    "total": 5, 
    "successful": 5, 
    "failed": 0 
    }, 
    "hits": { 
    "total": 149, 
    "max_score": 13.028783, 
    "hits": [ 
     { 
     "_index": "imdb13", 
     "_type": "logs", 
     "_id": "AVmw9JHCrsOFTsZwAmBm", 
     "_score": 13.028783, 
     "_source": { 
      "@timestamp": "2017-01-18T09:42:15.149Z", 
      "movie": [ 
      "Naomi and Ely's No Kiss List (2015)" 
      ], 
      "@version": "1", 
      "name": "Abernethy, Kevin", 
      "tags": [ 
      "aggregated" 
      ] 
     } 
     }, 
     { 
     "_index": "imdb13", 
     "_type": "logs", 
     "_id": "AVmw9JHCrsOFTsZwAmBq", 
     "_score": 12.998644, 
     "_source": { 
      "@timestamp": "2017-01-18T09:42:15.149Z", 
      "movie": [ 
      "Staten Island Summer (2015/II)" 
      ], 
      "@version": "1", 
      "name": "Abernethy, Kevin", 
      "tags": [ 
      "aggregated" 
      ] 
     } 
     }, 
     { 
     "_index": "imdb13", 
     "_type": "logs", 
     "_id": "AVmw9JHCrsOFTsZwAmBu", 
     "_score": 12.998644, 
     "_source": { 
      "@timestamp": "2017-01-18T09:42:15.150Z", 
      "movie": [ 
      "What Happened Last Night (2016)" 
      ], 
      "@version": "1", 
      "name": "Abernethy, Kevin", 
      "tags": [ 
      "aggregated" 
      ] 
     } 
     }, 
     ..... 

これを修正する方法はありますか?

logstashの--debugオプションによるログ(部分的にログ全体が〜1Gioです):paste(私はstackoverflowの30000文字制限のためにpastebinに置きます)。

ログの最後の行:logstash.filters.aggregateを含む行だけで

[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"path"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"role"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] Event after csv filter {:event=>2017-01-18T10:34:09.900Z %{host} %{message}} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"message"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"path"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"host"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] Event after csv filter {:event=>2017-01-18T10:34:09.915Z %{host} %{message}} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"column3"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Christopher' 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"path"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["Tabi tabi po! (2001)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.csv  ] Event after csv filter {:event=>2017-01-18T10:34:09.921Z %{host} %{message}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.911Z, "movie"=>["21 Jump Street (1987)"], "@version"=>"1", "name"=>"Ansley, Zachary", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anseth, Elias Moussaoui' 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Death Match: Fighting Fist of Samurai Joe (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Diplomat Hotel (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Alvin' 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] filter received {"event"=>{"path"=>"/home/maeln/Projets/oracle-of-bacon/imdb-data/roles.csv", "@timestamp"=>2017-01-18T10:34:09.900Z, "@version"=>"1", "host"=>"maeln-GE70-2PE", "message"=>"\"Ansfelt, Jacob\",\"Manden med de gyldne ører (2009)\",PLAYED_IN"}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"} 

ペーストビン:あなたが直面しているlink

+0

コマンドラインで '--debug'フラグを付け加えて、得られた出力で質問を更新できますか? – Val

+0

@Val完了しましたが、ログは巨大なので、部分的なものだけを入れます。 – Maeln

+0

'logstash.filters.aggregate'を持つ行だけgrepできますか? – Val

答えて

1

問題は、一度ラインがあるという事実に関連それがフィルタ+出力スレッドに渡されることを読んでください。

複数のCPUを使用している場合、それらのスレッドのいくつかが並行して行を処理するため、出力順序はもはや保証されません。より重要なのは、各aggregateフィルタが特定のスレッドに対してローカルになるため、同じアクタに関連する複数の行(異なる順序であっても)が異なるスレッドによって並列処理され、出力順序が異なる可能性があることは間違いありません。

logstashをオプション-w 1で実行すると、ワーカースレッドが1つだけ作成されますが、スループットが低下します。

関連する問題