私はKafkaトピックからJSONメッセージを読み込み、Elasticsearchインデックスに送信するためにLogstash 2.4を使用しています。Logstashフィルタを使用したKafkaトピックからのJSONメッセージの操作
JSON形式は以下の通りです - 私が欲しい、
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfcyTU4SyCFNFP2z5-l",
"_score" : 1.0,
"_source" : {
"schema" : {
"type" : "struct",
"fields" : [ {
"type" : "string",
"optional" : false,
"field" : "reloadID"
}, {
"type" : "string",
"optional" : false,
"field" : "externalAccountID"
}, {
"type" : "int64",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Timestamp",
"version" : 1,
"field" : "reloadDate"
}, {
"type" : "int32",
"optional" : false,
"field" : "reloadAmount"
}, {
"type" : "string",
"optional" : true,
"field" : "reloadChannel"
} ],
"optional" : false,
"name" : "reload"
},
"payload" : {
"reloadID" : "155559213",
"externalAccountID" : "9831200014",
"reloadDate" : 1449529746000,
"reloadAmount" : 140,
"reloadChannel" : "C1"
},
"@version" : "1",
"@timestamp" : "2016-10-19T11:56:09.973Z",
}
}
をしかし -
{
"schema":
{
"type": "struct",
"fields": [
{
"type":"string",
"optional":false,
"field":"reloadID"
},
{
"type":"string",
"optional":false,
"field":"externalAccountID"
},
{
"type":"int64",
"optional":false,
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1,
"field":"reloadDate"
},
{
"type":"int32",
"optional":false,
"field":"reloadAmount"
},
{
"type":"string",
"optional":true,
"field":"reloadChannel"
}
],
"optional":false,
"name":"reload"
},
"payload":
{
"reloadID":"328424295",
"externalAccountID":"9831200013",
"reloadDate":1446242463000,
"reloadAmount":240,
"reloadChannel":"C1"
}
}
私のconfigファイル内の任意のフィルタがなければ、ESインデックスから対象文書は、以下のように見えます「ペイロード」フィールドの値の部分だけがターゲットJSON本体としてESインデックスに移動します。だから私は以下のように設定ファイルのフィルタ「を変異させる」を使用してみました - このフィルタで
input {
kafka {
zk_connect => "zksrv-1:2181,zksrv-2:2181,zksrv-4:2181"
group_id => "logstash"
topic_id => "reload"
consumer_threads => 3
}
}
filter {
mutate {
remove_field => [ "schema","@version","@timestamp" ]
}
}
output {
elasticsearch {
hosts => ["datanode-6:9200","datanode-2:9200"]
index => "kafka_reloads"
}
}
を、ES文書は現在、以下のように見える -
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfch0yhSyCFNFP2z59f",
"_score" : 1.0,
"_source" : {
"payload" : {
"reloadID" : "850846698",
"externalAccountID" : "9831200013",
"reloadDate" : 1449356706000,
"reloadAmount" : 30,
"reloadChannel" : "C1"
}
}
}
しかし、実際にはそれはする必要があります以下のように -
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfch0yhSyCFNFP2z59f",
"_score" : 1.0,
"_source" : {
"reloadID" : "850846698",
"externalAccountID" : "9831200013",
"reloadDate" : 1449356706000,
"reloadAmount" : 30,
"reloadChannel" : "C1"
}
}
これを行う方法はありますか?誰かが私にこれを助けることができますか?
私は、フィルタの下にしようとした -
filter {
json {
source => "payload"
}
}
しかし、それは私のようなエラー与えている -
を解析エラーJSON {:ソース=> "ペイロード"、:生=> {」 "reloadID" => "572584696"、 "externalAccountID" => "9831200011"、 "reloadDate" => 1449093851000、 "reloadAmount" => 180、 "reloadChannel" => "C1"}、例外=> java.lang.ClassCastException :org.jruby.RubyHashをorg.jruby.RubyIOにキャストすることはできません::level =>:warn}
ご協力いただければ幸いです。
おかげ ゴータムゴーシュ
優秀!!それは完璧に働いた..おかげでたくさんの友達! –
素晴らしいです、うれしかった! – Val
すごいもの、ありがとう@Val。これは、Oracle GoldenGateを使用している人にとっては大いに役立つでしょう - > Kafka - > Logstash - > –