2016-01-19 11 views
6

を使用して、複雑なネストされたelasticsearchドキュメントの更新のは、Oracleスキーマは、テーブルと列を次のようしていると仮定しよう:はlogstashおよびJDBC

 

    Country 
     country_id; (Primary Key) 
     country_name; 

    Department 
     department_id; (Primary Key) 
     department_name; 
     country_id; (Foreign key to Country:country_id) 

    Employee 
     employee_id; (Primary Key) 
     employee_name; 
     department_id; (Foreign key to Department:department_id) 

そして私は、ルート要素が国であり、それは すべての部門が含まれている私のElasticsearchの文書を持っていますその国には各部門のすべての従業員が含まれています。

だから、文書構造は次のようになります。

 

    { 
     "mappings": { 
     "country": { 
      "properties": { 
      "country_id": { "type": "string"}, 
      "country_name": { "type": "string"},   
      "department": { 
       "type": "nested", 
       "properties": { 
       "department_id": { "type": "string"}, 
       "department_name": { "type": "string"}, 
       "employee": { 
        "type": "nested", 
        "properties": { 
        "employee_id": { "type": "string"}, 
        "employee_name": { "type": "string"} 
        } 
       } 
       } 
      } 
      } 
     } 
     } 
    }   

私は各テーブル上で実行している別の入力JDBCクエリを持ってできるようにしたいといつでも、彼らは/更新を作成する必要があります/ elasticsearch文書に データを削除基本テーブル内のデータが追加/更新/削除されます。

これは例示的な問題であり、実際のテーブルとデータ構造はより複雑です。だから私は解決策を探していません これに限られます。

これを達成する方法はありますか?

ありがとうございました。

+0

私はしかし、あなたはすでにこれを解決したかもしれない推測しているを使用します必要なデータをドキュメント構造形式(国、部署、従業員)に結合し、単一のJDBCクエリとして、最も低い一意のレベルとしてelasticsearchドキュメントIDを作成できるようにするためのOracleビュー(この中のemployee_idケース)とそこに変更を管理する? –

答えて

0

レベル1の場合は、aggregate filterを使用してまっすぐに進みます。参照するには、それらの間に共通のIDが必要です。

filter {  

    aggregate { 
    task_id => "%{id}" 

    code => "  
     map['id'] = event.get('id') 
     map['department'] ||= [] 
     map['department'] << event.to_hash.each do |key,value| { key => value } end  
    " 
    push_previous_map_as_event => true 
    timeout => 150000 
    timeout_tags => ['aggregated']  
    } 

    if "aggregated" not in [tags] { 
    drop {} 
    } 
} 

重要:出力アクションが

output { 
     elasticsearch { 
      action => "update" 
      ... 
      } 
     } 

を更新する必要があるレベル2を解決する1つの方法は、すでにインデックス化文書を照会し、ネストされた記録でそれを更新することです。再度aggregate filterを使用して、 ;ドキュメントの共通IDがあるので、正しいドキュメントを検索して挿入することができます。

filter {  
    #get the document from elastic based on id and store it in 'emp' 
    elasticsearch { 
      hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"] 
      query => "id:%{id}" 
      fields => { "employee" => "emp" } 
     } 



    aggregate { 
    task_id => "%{id}" 
    code => "  
       map['id'] = event.get('id') 
       map['employee'] = [] 
       employeeArr = [] 
       temp_emp = {} 

       event.to_hash.each do |key,value|      
        temp_emp[key] = value 
       end  

       #push the objects into an array 
       employeeArr.push(temp_emp) 

       empArr = event.get('emp')     

       for emp in empArr 
        emp['employee'] = employeeArr      
        map['employee'].push(emp) 
       end 
    " 
    push_previous_map_as_event => true 
    timeout => 150000 
    timeout_tags => ['aggregated'] 

    } 

    if "aggregated" not in [tags] { 
    drop {} 
    } 

} 

output { 

elasticsearch { 
     action => "update" #important 
     ... 
     } 
} 

また、Rubyのコードをデバッグするためには、あなたが利用できなかった、出力に以下

output{ 
    stdout { codec => dots } 
} 
関連する問題