使用Logstash,JDBC将数据聚合并索引到Elasticsearch中
介紹
在我以前的帖子在這里和這里我展示了如何使用JDBC和Elasticsearch JDBC進(jìn)口商庫從SQL數(shù)據(jù)庫索引數(shù)據(jù)到Elasticsearch。 在這里的第一篇文章中,我提到了使用導(dǎo)入程序庫的一些缺點,這些缺點我已在此處復(fù)制:
- 不支持ES版本5及更高版本
- 嵌套對象數(shù)組中可能存在重復(fù)的對象。 但是重復(fù)數(shù)據(jù)刪除可以在應(yīng)用程序?qū)舆M(jìn)行處理。
- 對最新ES版本的支持可能會延遲。
使用Logstash及其以下插件可以克服以上所有缺點:
- JDBC Input插件 –用于使用JDBC從SQL DB讀取數(shù)據(jù)
- 聚合過濾器插件 –用于將SQL DB中的行聚合到嵌套對象中。
我將使用最新的ES版,即5.63可以從Elasticsearch網(wǎng)站下載這里 。 我們將使用此處可用的映射創(chuàng)建索引world_v2。
$ curl -XPUT --header "Content-Type: application/json" http://localhost:9200/world_v2 -d @world-index.json或使用Postman REST客戶端,如下所示:
要確認(rèn)索引已成功創(chuàng)建,請在瀏覽器中打開此URL http:// localhost:9200 / world_v2,以得到類似于以下內(nèi)容的內(nèi)容:
創(chuàng)建Logstash配置文件
我們應(yīng)該選擇等效的logstash版本,即5.6.3,可以從此處下載。 然后,我們需要使用以下命令安裝JDBC輸入插件,聚合過濾器插件和Elasticsearch輸出插件:
bin/logstash-plugin install logstash-input-jdbc bin/logstash-plugin install logstash-filter-aggregate bin/logstash-plugin install logstash-output-elasticsearch我們需要將以下內(nèi)容復(fù)制到bin目錄中,以便能夠運行我們將在接下來定義的配置:
我們將上述內(nèi)容復(fù)制到Logstash的bin目錄或您將擁有l(wèi)ogstash配置文件的任何目錄中,這是因為我們在配置中使用這兩個文件的相對路徑來引用這兩個文件。 下面是Logstash配置文件:
input {jdbc {jdbc_connection_string => "jdbc:mysql://localhost:3306/world"jdbc_user => "root"jdbc_password => "mohamed"# The path to downloaded jdbc driverjdbc_driver_library => "mysql-connector-java-5.1.6.jar"jdbc_driver_class => "Java::com.mysql.jdbc.Driver"# The path to the file containing the querystatement_filepath => "world-logstash.sql"} } filter {aggregate {task_id => "%{code}"code => "map['code'] = event.get('code')map['name'] = event.get('name')map['continent'] = event.get('continent')map['region'] = event.get('region')map['surface_area'] = event.get('surface_area')map['year_of_independence'] = event.get('year_of_independence')map['population'] = event.get('population')map['life_expectancy'] = event.get('life_expectancy')map['government_form'] = event.get('government_form')map['iso_code'] = event.get('iso_code')map['capital'] = {'id' => event.get('capital_id'), 'name' => event.get('capital_name'),'district' => event.get('capital_district'),'population' => event.get('capital_population')}map['cities_list'] ||= []map['cities'] ||= []if (event.get('cities_id') != nil)if !( map['cities_list'].include? event.get('cities_id') ) map['cities_list'] << event.get('cities_id')map['cities'] << {'id' => event.get('cities_id'), 'name' => event.get('cities_name'),'district' => event.get('cities_district'),'population' => event.get('cities_population')}endendmap['languages_list'] ||= []map['languages'] ||= []if (event.get('languages_language') != nil)if !( map['languages_list'].include? event.get('languages_language') )map['languages_list'] << event.get('languages_language')map['languages'] << {'language' => event.get('languages_language'), 'official' => event.get('languages_official'),'percentage' => event.get('languages_percentage')}endendevent.cancel()"push_previous_map_as_event => truetimeout => 5}mutate { remove_field => ["cities_list", "languages_list"]} } output {elasticsearch {document_id => "%{code}"document_type => "world"index => "world_v2"codec => "json"hosts => ["127.0.0.1:9200"]} }我們將配置文件放置在logstash的bin目錄中。 我們使用以下命令運行l(wèi)ogstash管道:
$ logstash -w 1 -f world-logstash.conf我們使用1個工作程序,因為當(dāng)匯總發(fā)生時,多個工作人員可能會破壞匯總,這是基于具有共同國家/地區(qū)代碼的事件序列。 成功完成Logstash管道后,我們將看到以下輸出:
在瀏覽器中打開以下URL http:// localhost:9200 / world_v2 / world / IND ,以查看在Elasticsearch中索引的印度的信息,如下所示:
翻譯自: https://www.javacodegeeks.com/2017/10/aggregate-index-data-elasticsearch-using-logstash-jdbc.html
總結(jié)
以上是生活随笔為你收集整理的使用Logstash,JDBC将数据聚合并索引到Elasticsearch中的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark减少内存消耗_将内存消耗减少2
- 下一篇: 海关备案手续流程(海关备案手续)