2016-04-24 12 views
1

私は初めてlogstashを使用しています。amavisd-newのJSONレポートを検索および分析に使用しようとしています。 Amavisd-newはredisにjsonのログを書き込むことができます。私はすべてを完全にインポートしており、これを通してすべての方法を学び始めました。logstash - 含まれている配列に基づいてイベントを2つに分割する

amavisのJSONレポートの形式は次のようになります。「受信者」には配列があり、受信者ごとに1つのエントリがあります。

受信者ごとに1つのイベントを2つに分割し、他のすべてのフィールドを同じにしたいが、各受信者の配列の "action"、 "ccat_main"、 "queued_as"などのフィールドを置き換えたいメンバーをメインのメンバーにする

2人の受信者を持つ1つの着信イベントは、logstashで2人の別々のログイベント(各人に1つ)をもたらすという考えがあります。

私はイベントのための分割を見ましたが、私はこれを行う方法を見ていない - どこでも適切な例を見つけることができないようです。実ワード例については、だから、

、この与えられた:私はこれらのような二つの異なるイベントで終わるしたい

{ 
    "@timestamp" => "2014-05-06T09:29:47.048Z", 
    "time_unix" => 1399368587.048, 
    "time_iso_week_date" => "2014-W19-2", 
    "partition" => "19", 
    "type" => "amavis", 
    "host" => "mailer.example.net", 
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"], 
    "recipients" => [ 
     { "action" => "PASS", 
     "ccat_main" => "Clean", 
     "queued_as" => "3gNFyR4Mfjzc3", 
     "rcpt_is_local" => false, 
     "rcpt_to" => "[email protected]", 
     "smtp_code" => "250", 
     "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3", 
     "spam_score" => -2.0 
     }, 
     { "action" => "PASS", 
     "ccat_main" => "Clean", 
     "mail_id_related" => "men7HTERZaOF", 
     "penpals_age" => 1114599, 
     "queued_as" => "3gNFyR4n6Lzc4", 
     "rcpt_is_local" => true, 
     "rcpt_to" => "[email protected]", 
     "smtp_code" => "250", 
     "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4", 
     "spam_score" => -5.272 
     } 
    ], 
    "smtp_code" => ["250"], 
    } 

を:

{ 
    "@timestamp" => "2014-05-06T09:29:47.048Z", 
    "time_unix" => 1399368587.048, 
    "time_iso_week_date" => "2014-W19-2", 
    "partition" => "19", 
    "type" => "amavis", 
    "host" => "mailer.example.net", 
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"], 
    "action" => "PASS", 
    "ccat_main" => "Clean", 
    "queued_as" => "3gNFyR4Mfjzc3", 
    "rcpt_is_local" => false, 
    "rcpt_to" => "[email protected]", 
    "smtp_code" => "250", 
    "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3", 
    "spam_score" => -2.0 
    "smtp_code" => ["250"], 
    } 

{ 
    "@timestamp" => "2014-05-06T09:29:47.048Z", 
    "time_unix" => 1399368587.048, 
    "time_iso_week_date" => "2014-W19-2", 
    "partition" => "19", 
    "type" => "amavis", 
    "host" => "mailer.example.net", 
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"], 
    "recipients" => [ 
    "action" => "PASS", 
    "ccat_main" => "Clean", 
    "mail_id_related" => "men7HTERZaOF", 
    "penpals_age" => 1114599, 
    "queued_as" => "3gNFyR4n6Lzc4", 
    "rcpt_is_local" => true, 
    "rcpt_to" => "[email protected]", 
    "smtp_code" => "250", 
    "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4", 
    "spam_score" => -5.272 
    "smtp_code" => ["250"], 
    } 

編集:

さて、私は単純にスプリットフィルターを使用しました。しかし、私を混乱させることが1つあります。

単一の受信者があるとき、それは右を通じてブロックを渡す - のように見えるkibanaで結果:

recipients  { 
    "action": "PASS", 
    "bypass_banned_checks": true, 
    "bypass_spam_checks": true, 
    "ccat_main": "Clean", 
    "queued_as": "3qv7Km4Ybpz14Kyh", 
    "rcpt_is_local": true, 
    "rcpt_to": "[email protected]", 
    "rid": "552213780", 
    "smtp_code": "250", 
    "smtp_response": "250 2.0.0 from MTA(smtp:[127.0.0.1]:10025): 250 2.0.0 Ok: queued as 3qv7Km4Ybpz14Kyh" 
} 

しかし、2人以上の受信者がある場合には、新しいイベントがそれぞれ自分で、次のようになり適切な情報:

recipients.action  PASS 
recipients.ccat_main  CleanTag 
recipients.queued_as  3qv7Ly4Pqvz4wyS 
recipients.rcpt_is_local  true 
recipients.rcpt_to  [email protected] 
recipients.rid  552278239 
recipients.smtp_code  250 
recipients.smtp_response  250 2.0.0 from MTA(smtp:[127.0.0.1]:10025): 250 2.0.0 Ok: queued as 3qv7Ly4Pqvz4wyS 
recipients.whitelisted  true 

2つの違いは何ですか?私は受信者のフィールドを値のハッシュとして保つことを望んでいると思うので、分割されたイベントを単一のイベントと一致させる最良の方法は何ですか?

答えて

1

これはsplit filterです。各コピーで、フィールドの名前を適切なレベルに変更するか、各コピーで必要でないフィールドを削除します。

0

ここで私は何をしたのですか。これにより、配列のメンバーが分割されているかどうかが一貫しています。

おそらくもっと簡単な方法ですが、これは今のところ私をカバーしています。もし私が別のものを考え出すなら、私は戻って改訂するでしょう。

filter { 
    split { 
     field => "recipients" 
     target => "recipcopy" 
     remove_field => "recipients" 
    } 
} 

filter { 
    if [recipients] { 
     ruby { 
      code => "event['recipcopy'] = event['recipients'][0]" 
      remove_field => "recipients" 
     } 
    } 
} 

filter { 
    if [recipcopy] { 
     mutate { 
      rename => { "recipcopy" => "recipients" } 
     } 
    } 
} 
関連する問題