2017-10-12 17 views
0

私はカフカからJSONメッセージを取り、Elasticsearchにそれらを送信するために非常に単純なパイプラインを持っている:入力文書フィールドをelasticsearch_idフィールドにマップする方法はありますか?

{"TransactionID": "5440772161", "InvoiceNo": 5440772, "StockCode": 22294, "Description": "HEART FILIGREE DOVE SMALL", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 1.25, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 16, "InvoiceTime": "03:04:00", "StoreID": 1} 
{"TransactionID": "5440772191", "InvoiceNo": 5440772, "StockCode": 21733, "Description": "RED HANGING HEART T-LIGHT HOLDER", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 2.95, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 19, "InvoiceTime": "03:04:00", "StoreID": 1} 

は私が設定できます。

input { 
    kafka { 
     bootstrap_servers => "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093" 
     topics => [ "transactions_load" ] 
    } 
} 
filter { 
    json { 
    source => "message" 
    } 
    mutate{ 
    remove_field => ["kafka"] 
    remove_field => ["@version"] 
    remove_field => ["@timestamp"] 
    remove_field => ["message"] 
    remove_tag => ["multiline"] 
    } 
} 
output { 
    elasticsearch { 
     hosts => [ 
       "xxxxx.ibm-343.composedb.com:16915", 
       "xxxxx.ibm-343.composedb.com:16915" 
      ] 
     ssl => true 
     user => "logstash_kafka" 
     password => "*****" 
     index => "pos_transactions" 
    } 
} 

JSONレコード

は、各レコードを一意に識別する TransactionIDフィールドを持っていますlogstash TransactionID_idフィールドとして使用して、同じトランザクションのレコードを重複して処理すると、これらの更新は等しくなります。

答えて

0

私は自分自身の答えを考え出しました。それは他人のために有用である可能性があるので、ここに投稿:

output { 
    elasticsearch { 
     hosts => [ 
       "xxxxx.ibm-343.composedb.com:16915", 
       "xxxxx.ibm-343.composedb.com:16915" 
      ] 
     ssl => true 
     user => "logstash_kafka" 
     password => "*****" 
     index => "pos_transactions" 
     document_id => "%{TransactionID}" 
    } 
} 

document_id => "%{TransactionID}"構成エントリはelasticsearch _id

の着信文書 TransactionIDフィールドを使用しています
関連する問題