0
私はカフカからJSONメッセージを取り、Elasticsearchにそれらを送信するために非常に単純なパイプラインを持っている:入力文書フィールドをelasticsearch_idフィールドにマップする方法はありますか?
{"TransactionID": "5440772161", "InvoiceNo": 5440772, "StockCode": 22294, "Description": "HEART FILIGREE DOVE SMALL", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 1.25, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 16, "InvoiceTime": "03:04:00", "StoreID": 1}
{"TransactionID": "5440772191", "InvoiceNo": 5440772, "StockCode": 21733, "Description": "RED HANGING HEART T-LIGHT HOLDER", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 2.95, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 19, "InvoiceTime": "03:04:00", "StoreID": 1}
は私が設定できます。
input {
kafka {
bootstrap_servers => "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093"
topics => [ "transactions_load" ]
}
}
filter {
json {
source => "message"
}
mutate{
remove_field => ["kafka"]
remove_field => ["@version"]
remove_field => ["@timestamp"]
remove_field => ["message"]
remove_tag => ["multiline"]
}
}
output {
elasticsearch {
hosts => [
"xxxxx.ibm-343.composedb.com:16915",
"xxxxx.ibm-343.composedb.com:16915"
]
ssl => true
user => "logstash_kafka"
password => "*****"
index => "pos_transactions"
}
}
JSONレコード
は、各レコードを一意に識別するTransactionID
フィールドを持っていますlogstash
TransactionID
を
_id
フィールドとして使用して、同じトランザクションのレコードを重複して処理すると、これらの更新は等しくなります。