2017-04-25 18 views
1

2つのMySQLテーブル(ジョブデータと場所)からジョブ広告をインポートしようとしましたが、ジョブ広告に複数の場所があると問題に直面しています。すべてがOKである場所の問題を無視MySQLからLogstashの1対多のインポート

SELECT id, company, jobtitle, description, priority, DATE_FORMAT(date, '%Y-%m-%d %T') AS date, sa_locations.location AS location_name, sa_locations.lat AS location_lat, sa_locations.lon AS location_lon FROM sa_data JOIN sa_locations ON sa_data.id = sa_locations.id ORDER BY id 

を、私はこのような結果受け取る:私は、このMySQLのクエリを使用しています私が取得しようとしている何

{ 
    "_index" : "jk", 
    "_type" : "jobposting", 
    "_id" : "26362", 
    "_score" : 1.0, 
    "_source" : { 
     "date" : "2017-04-22 00:00:00", 
     "location_name" : "Berlin", 
     "location_lat" : "52.520007", 
     "location_lon" : "13.404954", 
     "@timestamp" : "2017-04-24T07:50:31.660Z", 
     "@version" : "1", 
     "description" : "Some text here", 
     "company" : "Test Company", 
     "id" : 26362, 
     "jobtitle" : "Architect Data Center Network & Security", 
     "priority" : 10, 
}, { 
    "_index" : "jk", 
    "_type" : "jobposting", 
    "_id" : "26363", 
    "_score" : 1.0, 
    "_source" : { 
     "date" : "2017-04-22 00:00:00", 
     "location_name" : "Hamburg", 
     "location_lat" : "53.551085", 
     "location_lon" : "9.993682", 
     "@timestamp" : "2017-04-24T07:50:31.660Z", 
     "@version" : "1", 
     "description" : "Some text here", 
     "company" : "Test Company", 
     "id" : 26363, 
     "jobtitle" : "Architect Data Center Network & Security", 
     "priority" : 10, 
} 

は、このようなものです:

{ 
    "_index" : "jk", 
    "_type" : "jobposting", 
    "_id" : "26362", 
    "_score" : 1.0, 
    "_source" : { 
     "date" : "2017-04-22 00:00:00", 
     "locations" : [ { "name": "Berlin", "lat" : "52.520007", "lon" : "13.04954" }, { "name": "Hamburg", "lat" : "53.551085", "lon" : 
"9.993682" } ] 
     "@timestamp" : "2017-04-24T07:50:31.660Z", 
     "@version" : "1", 
     "description" : "Some text here", 
     "company" : "Test Company", 
     "id" : 26362, 
     "jobtitle" : "Architect Data Center Network & Security", 
     "priority" : 10, 
    } 

したがって、geo_distanceフィルタを使用してBerlinまたはHamburgの近くのジョブを検索する場合、このジョブが表示されるはずです。 logstashを使ってデータをインポートする方法はありますか?

私logstash.confは、次のようになります。

input { 
jdbc { 
jdbc_connection_string => "jdbc:mysql://localhost:3306/jk" 
jdbc_user => "..." 
jdbc_password => "..." 
jdbc_driver_library => "/etc/logstash/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar" 
jdbc_driver_class => "com.mysql.jdbc.Driver" 
statement => "SELECT id, company, jobtitle, description, priority, DATE_FORMAT(date, '%Y-%m-%d %T') AS date, sa_locations.location AS location_name, sa_locations.lat AS location_lat, sa_locations.lon AS location_lon 
FROM sa_data JOIN sa_locations 
ON sa_data.id = sa_locations.id 
ORDER BY id 
} 
} 

#filter { 
# aggregate { 
# task_id => "%{id}" 
# code => " 
# map['location_name'] = event.get('location_name') 
# map['location_lat'] = event.get('location_lat') 
# map['location_lon'] = event.get('location_lon') 
# map['locations'] ||= [] 
# map['locations'] < event.get('location_name')} 
# map['locations'] < event.get('location_lat')} 
# map['locations'] < event.get('location_lon')} 
# event.cancel() 
# " 
# push_previous_map_as_event => true 
# timeout => 3 
# } 
#} 

output { 
elasticsearch { 
index => "jk" 
document_type => "jobposting" 
document_id => "%{id}" 
hosts => ["localhost:9200"] 
} 
} 

フィルターは間違ったアプローチであるように思われました。

+0

DOBを - あなたは最終的にはこの作業を取得することができましたか?私は同様の問題があり、それを動作させることができません:( – Birdy

答えて

2

単一のIDに対して複数の場所がある場合でも集計したいが、現在の設定ではロケーションごとにハッシュの配列を作成する必要はありません(ロケーションデータベースの各行に対して1つのハッシュ)。あなたはこのような何か行うことができます

filter { 
    mutate { 
    rename => { 'location_name' => '[location][name]' } 
    rename => { 'location_lat' => '[location][lat]' } 
    rename => { 'location_long' => '[location][long]' } 
    } 

    aggregate { 
    task_id => '%{id}' 
    code => " 
     map['locations'] ||= [] 
     map['locations'] << event.get('location') 
    " 
    push_previous_map_as_event => true 
    } 
} 
+0

私はこの解決策を試しましたが、このエラーメッセージが表示されます: "メインスレッド>" worker2 "java.lang.ClassCastException:例外リストまたはマップ、 org.logstash.bivalues.StringBiValue – DOB

+0

あなたの設定ファイルは何ですか? – cattastrophe

+0

上記のフィルタを追加して、私はちょうど "))"を ")"に変更し、 "map ['id'] = event.get( ' id ') "、しかし成功なし。 – DOB