MySQL到Elasticsearch数据同步
一、使用logstash進行同步
下載logstash,地址:https://www.elastic.co/downloads/logstash,我這里使用的7.11.1版本,解壓后文件如下:
其中mysql文件夾是自己導的,因為使用的是mysql8.0以及mysql-connector-java-8.0.20這個jar包
到導入的ealsticsearch的mysql表的結構如下:
bin文件夾下創建Logstash配置文件,文件名為mysql.conf
內容配置如下:
更多詳細的配置:
- jdbc_driver_library: jdbc驅動的路徑,在上一步中已經下載
- jdbc_driver_class: 驅動類的名字,mysql填com.mysql.jdbc.Driver
- jdbc_connection_string: mysql 地址
- jdbc_user: mysql 用戶
- jdbc_password: mysql密碼
- schedule: 執行sql時機,類似 crontab 的調度,上面配置表示每分鐘刷新一次。
- statement: 要執行的sql,以 “:” 開頭是定義的變量,可以通過parameters 來設置變量,這里的sql_last_value是內置的變量,表示上一次sql執行中> -> update_time的值
- statement_filepath:和上面statement參數二選一,存放需要執行的SQL語句的文件位置,適用于多個sql語句的場景。
- use_column_value: 使用遞增列的值
- tracking_column_type: 遞增字段的類型,numeric表示數值類型,
- timestamp 表示時間戳類型
- tracking_column: 遞增字段的名稱,這里使用updatetime這一列,這列的類型是timestamp
- last_run_metadata_path: 同步點文件,這個文件記錄了上次的同步點,重啟時會讀取這個文件,這個文件可以手動修改
- index: 導入到es中的index名,這里我直接設置成了mysql表的名字
- document_id: 導入到es中的文檔id,這個需要設置成主鍵,否則同一條記錄更新后在es中會出現兩條記錄,%{id} 表示引用mysql表中id字段的值
es新建poem索引:(注意跟mysql的數據段名保持一致)
#創建索引 PUT /poem {"settings": {"number_of_shards": 1,"number_of_replicas": 0}, "mappings":{"properties":{"id":{"type":"text"},"name":{"type":"text"},"author":{"type":"text"},"type":{"type": "text"},"content":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"href":{"type": "text"},"authordes":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"origin":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"categoryId":{"type": "text"}}} }進入bin目錄,啟動logstash服務,開始同步mysql數據到es
二、使用阿里云開源工具Canal
canal是阿里巴巴旗下的一款開源項目,純Java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議,mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)
簡單來說,Canal 會將自己偽裝成 MySQL 從節點(Slave),并從主節點(Master)獲取 Binlog,解析和貯存后供下游消費端使用。Canal 包含兩個組成部分:服務端和客戶端。服務端負責連接至不同的 MySQL 實例,并為每個實例維護一個事件消息隊列;客戶端則可以訂閱這些隊列中的數據變更事件,處理并存儲到數據倉庫中。下面我們來看如何快速搭建起一個 Canal 服務。
MySQL 默認沒有開啟 Binlog,因此我們需要對 my.cnf 文件做以下修改:
server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW注意 binlog_format 必須設置為 ROW, 因為在 STATEMENT 或 MIXED 模式下, Binlog 只會記錄和傳輸
SQL 語句(以減少日志大小),而不包含具體數據,我們也就無法保存了。
從節點通過一個專門的賬號連接主節點,這個賬號需要擁有全局的 REPLICATION 權限。我們可以使用 GRANT 命令創建這樣的賬號:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';從 GitHub 項目發布頁中下載 Canal 服務端代碼(https://github.com/alibaba/canal/releases )
配置文件在 conf 文件夾下,有以下目錄結構:
canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties
conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服務端監聽的端口。instanceA/instance.properties 則是各個實例的配置文件,主要的配置項有:
# slaveId 不能與 my.cnf 中的 server-id 項重復 canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 訂閱實例中所有的數據庫和表 canal.instance.filter.regex = .*\\..*從服務端消費變更消息時,我們需要創建一個 Canal 客戶端,指定需要訂閱的數據庫和表,并開啟輪詢。
參考文章
總結
以上是生活随笔為你收集整理的MySQL到Elasticsearch数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 服务器 Font family [‘sa
- 下一篇: 2021年上半年移动广告流量观察白皮书