[Spring cloud 一步步实现广告系统] 15. 使用开源组件监听Binlog 实现增量索引准备...
MySQL Binlog簡(jiǎn)介
- 什么是binlog?
一個(gè)二進(jìn)制日志,用來記錄對(duì)數(shù)據(jù)發(fā)生或潛在發(fā)生更改的SQL語句,并以而進(jìn)行的形式保存在磁盤中。
- binlog 的作用?
最主要有3個(gè)用途:
- 數(shù)據(jù)復(fù)制(主從同步)
Mysql 的Master-Slave協(xié)議,讓Slave可以通過監(jiān)聽binlog實(shí)現(xiàn)數(shù)據(jù)復(fù)制,達(dá)到數(shù)據(jù)一致性目的 - 數(shù)據(jù)恢復(fù)
通過mysqlbinlog工具恢復(fù)數(shù)據(jù) - 增量備份
- Binlog 變量
- log_bin (Binlog 開關(guān),使用show variables like 'log_bin';查看)
- binlog_format (Binlog 日志格式,使用show variables like 'binlog_format';查看)
日志格式總共有三種:- ROW, 僅保存記錄被修改的細(xì)節(jié),不記錄SQL語句上下文相關(guān)信息。(能清晰的記錄下每行數(shù)據(jù)的修改細(xì)節(jié),不需要記錄上下文相關(guān)信息,因此不會(huì)發(fā)生某些特定情況下的procedure、function以及trigger 的調(diào)用無法被準(zhǔn)確復(fù)制的問題,任何情況下都可以被復(fù)制,且能加快從庫重放日志的效率,保證從庫數(shù)據(jù)的一致性)
- STATEMENT,每一條修改數(shù)據(jù)的SQL都會(huì)被記錄。(只記錄執(zhí)行語句的細(xì)節(jié)和上下文環(huán)境,避免了記錄每一行的變化,在一些修改記錄較多的情況下,相比ROW類型能大大減少binlog的日志量,節(jié)約IO,提高性能。還可以用于實(shí)時(shí)的還原,同時(shí)主從版本可以不一樣,從服務(wù)器版本可以比主服務(wù)器版本高)
- MIXED, 上述2種的混合使用
- Binlog 管理
- show master logs; 查看所有binlog的日志列表
- show master status; 查看最后一個(gè)binlog日志編號(hào)名稱,以及最后一個(gè)事件技術(shù)的位置(position)
- Flush logs; 刷新binlog,此刻開始產(chǎn)生一個(gè)新編號(hào)的binlog日志文件
- reset master; 清空所有的binlog日志
- Binlog 相關(guān)SQL show binlog events[in 'log_name'][from position][limit [offset,]row_count]
- 常用的Binlog event
- QUERY - 與數(shù)據(jù)無關(guān)的操作,begin、drop table、truncate table等等
- TABLE_MAP - 記錄下一個(gè)操作所對(duì)應(yīng)的表信息,存儲(chǔ)了數(shù)據(jù)庫名稱和表名稱
- XID - 標(biāo)記事務(wù)提交
- WRITE_ROWS 插入數(shù)據(jù),即insert操作
- UPDATE_ROWS 更新數(shù)據(jù),即update操作
- DELETE_ROWS 刪除數(shù)據(jù),即delete操作
Event包含header和data兩部分,header提供了event的創(chuàng)建時(shí)間,哪個(gè)服務(wù)器等信息,data部分提供的是針對(duì)該event的具體信息,如具體數(shù)據(jù)的修改。
Tip: binlog不會(huì)記錄數(shù)據(jù)表的列名
在接下來的實(shí)現(xiàn)中,我們會(huì)將自己的系統(tǒng)包裝成一個(gè)假的Mysql Slave,通過開源工具mysql-binlog-connector-java來實(shí)現(xiàn)監(jiān)聽binlog。
開源工具mysql-binlog-connector-java
- 工具源碼:Github傳送門
- 組件使用
1.加依賴
2.創(chuàng)建一個(gè)測(cè)試接口
package com.sxzhongf.ad.service;import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;import java.io.IOException;/*** BinlogServiceTest for 測(cè)試Mysql binlog 監(jiān)控* {@code* Mysql8 連接提示 Client does not support authentication protocol requested by server; consider upgrading MySQL client 解決方法* USE mysql;* ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';* FLUSH PRIVILEGES;* }** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/ public class BinlogServiceTest {/*** --------Update-----------* UpdateRowsEventData{tableId=90, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[* {before=[11, 10, Test Bin Log, 1, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019], after=[11, 10, zhangpan test Binlog, 1, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019]}* ]}** --------Insert-----------* WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[* [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]* ]}*/public static void main(String[] args) throws IOException {// //構(gòu)造BinaryLogClient,填充mysql鏈接信息BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,"root", "12345678");//設(shè)置需要讀取的Binlog的文件以及位置,否則,client會(huì)從"頭"開始讀取Binlog并監(jiān)聽 // client.setBinlogFilename("binlog.000035"); // client.setBinlogPosition();//給客戶端注冊(cè)監(jiān)聽器,實(shí)現(xiàn)對(duì)Binlog的監(jiān)聽和解析//event 就是監(jiān)聽到的Binlog變化信息,event包含header & data 兩部分client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof UpdateRowsEventData) {System.out.println("--------Update-----------");System.out.println(data.toString());} else if (data instanceof WriteRowsEventData) {System.out.println("--------Insert-----------");System.out.println(data.toString());} else if (data instanceof DeleteRowsEventData) {System.out.println("--------Delete-----------");System.out.println(data.toString());}});client.connect();} }運(yùn)行:
八月 08, 2019 9:13:32 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect 信息: Connected to 127.0.0.1:3306 at binlog.000038/951 (sid:65535, cid:336) ...執(zhí)行sql update ad_user set user_status=1 where user_id=10;
我們需要知道的是,我們的目的是實(shí)現(xiàn)對(duì)Mysql數(shù)據(jù)表的變更實(shí)現(xiàn)監(jiān)聽,并解析成我們想要的格式,也就是我們的java對(duì)象。根據(jù)上面我們看到的監(jiān)聽結(jié)果,我們知道了返回信息的大概內(nèi)容,既然我們已經(jīng)學(xué)會(huì)了簡(jiǎn)單的使用BinaryLogClient 來監(jiān)聽binlog,接下來,我們需要定義一個(gè)監(jiān)聽器,來實(shí)現(xiàn)我們自己的業(yè)務(wù)內(nèi)容。
因?yàn)槲覀冎恍枰狤vent中的內(nèi)容,那么我們也就只需要通過實(shí)現(xiàn)com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener接口,來自定義一個(gè)監(jiān)聽器實(shí)現(xiàn)我們的業(yè)務(wù)即可。通過Event的內(nèi)容,來判定是否需要處理當(dāng)前event以及如何處理。
構(gòu)造解析binlog的模版文件
我們監(jiān)聽binlog來構(gòu)造增量數(shù)據(jù)的根本原因,是為了將我們的廣告投放系統(tǒng)和廣告檢索系統(tǒng) 業(yè)務(wù)解耦,由于我們的檢索系統(tǒng)中沒有定義數(shù)據(jù)庫以及數(shù)據(jù)表的相關(guān),所以,我們通過定義一份模版文件,通過解析模版文件來得到我們需要的數(shù)據(jù)庫和表信息,因?yàn)閎inlog的監(jiān)聽是不區(qū)分是哪個(gè)數(shù)據(jù)庫和哪個(gè)數(shù)據(jù)表信息的,我們可以通過模版來指定我們想要監(jiān)聽的部分。
{"database": "advertisement","tableList": [{"tableName": "ad_plan","level": 2,"insert": [{"column": "plan_id"},{"column": "user_id"},{"column": "plan_status"},{"column": "start_date"},{"column": "end_date"}],"update": [{"column": "plan_id"},{"column": "user_id"},{"column": "plan_status"},{"column": "start_date"},{"column": "end_date"}],"delete": [{"column": "plan_id"}]},{"tableName": "ad_unit","level": 3,"insert": [{"column": "unit_id"},{"column": "unit_status"},{"column": "position_type"},{"column": "plan_id"}],"update": [{"column": "unit_id"},{"column": "unit_status"},{"column": "position_type"},{"column": "plan_id"}],"delete": [{"column": "unit_id"}]},{"tableName": "ad_creative","level": 2,"insert": [{"column": "creative_id"},{"column": "type"},{"column": "material_type"},{"column": "height"},{"column": "width"},{"column": "audit_status"},{"column": "url"}],"update": [{"column": "creative_id"},{"column": "type"},{"column": "material_type"},{"column": "height"},{"column": "width"},{"column": "audit_status"},{"column": "url"}],"delete": [{"column": "creative_id"}]},{"tableName": "relationship_creative_unit","level": 3,"insert": [{"column": "creative_id"},{"column": "unit_id"}],"update": [],"delete": [{"column": "creative_id"},{"column": "unit_id"}]},{"tableName": "ad_unit_district","level": 4,"insert": [{"column": "unit_id"},{"column": "province"},{"column": "city"}],"update": [],"delete": [{"column": "unit_id"},{"column": "province"},{"column": "city"}]},{"tableName": "ad_unit_hobby","level": 4,"insert": [{"column": "unit_id"},{"column": "hobby_tag"}],"update": [],"delete": [{"column": "unit_id"},{"column": "hobby_tag"}]},{"tableName": "ad_unit_keyword","level": 4,"insert": [{"column": "unit_id"},{"column": "keyword"}],"update": [],"delete": [{"column": "unit_id"},{"column": "keyword"}]}] }上面的模版文件中,指定了一個(gè)數(shù)據(jù)庫為advertisement,大家可以方便添加多個(gè)監(jiān)聽?zhēng)?。在?shù)據(jù)庫下面,我們監(jiān)聽了幾個(gè)表的CUD操作以及每個(gè)操作所需要的字段信息。
- 實(shí)現(xiàn)模版 —> Java Entity
- 定義模版文件對(duì)應(yīng)的實(shí)體
- 對(duì)應(yīng)的json 中 table信息
- 讀取的對(duì)應(yīng)表信息對(duì)象(最主要目的就是為了能將字段索引 映射到 字段名稱)
- 解析模版文件到j(luò)ava對(duì)象
- 解析 字段索引 -> 字段名稱 的一個(gè)轉(zhuǎn)換映射
首先,我們來看一下binlog的具體日志信息:
--------Insert----------- WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[ [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019] --------Update----------- UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}可以看到,在日志中includedColumns只包含了{(lán)0, 1, 2, 3, 4, 5}位置信息,那么我們?cè)趺茨苤浪唧w代表的是哪個(gè)字段呢,接下來我們來實(shí)現(xiàn)這步映射關(guān)系,在實(shí)現(xiàn)之前,我們先來查詢一下數(shù)據(jù)庫中我們的表中字段所處的具體位置:
sql> SELECT table_schema,table_name,column_name,ordinal_position FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = 'advertisement' AND TABLE_NAME='ad_user'我們可以看到ordinal_position對(duì)應(yīng)的是1-6,可是上面監(jiān)聽到的binlog日志索引是0-5,所以我們就可以看出來之間的對(duì)應(yīng)關(guān)系。
我們開始編碼實(shí)現(xiàn),我們使用JdbcTemplate進(jìn)行查詢數(shù)據(jù)庫信息:
@Slf4j @Component public class TemplateHolder {private ParseCustomTemplate template;private final JdbcTemplate jdbcTemplate;private String SQL_SCHEMA = "SELECT TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION FROM information_schema.COLUMNS " +"WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";@Autowiredpublic TemplateHolder(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}/*** 需要在容器加載的時(shí)候,就載入數(shù)據(jù)信息*/@PostConstructprivate void init() {loadJSON("template.json");}/*** 對(duì)外提供加載服務(wù)*/public TableTemplate getTable(String tableName) {return template.getTableTemplateMap().get(tableName);}/*** 加載需要監(jiān)聽的binlog json文件*/private void loadJSON(String path) {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();InputStream inputStream = classLoader.getResourceAsStream(path);try {BinlogTemplate binlogTemplate = JSON.parseObject(inputStream,Charset.defaultCharset(),BinlogTemplate.class);this.template = ParseCustomTemplate.parse(binlogTemplate);loadMeta();} catch (IOException ex) {log.error((ex.getMessage()));throw new RuntimeException("fail to parse json file");}}/*** 加載元信息* 使用表索引到列名稱的映射關(guān)系*/private void loadMeta() {for (Map.Entry<String, TableTemplate> entry : template.getTableTemplateMap().entrySet()) {TableTemplate table = entry.getValue();List<String> updateFields = table.getOpTypeFieldSetMap().get(OperationTypeEnum.UPDATE);List<String> insertFields = table.getOpTypeFieldSetMap().get(OperationTypeEnum.ADD);List<String> deleteFields = table.getOpTypeFieldSetMap().get(OperationTypeEnum.DELETE);jdbcTemplate.query(SQL_SCHEMA, new Object[]{template.getDatabase(), table.getTableName()}, (rs, i) -> {int pos = rs.getInt("ORDINAL_POSITION");String colName = rs.getString("COLUMN_NAME");if ((null != updateFields && updateFields.contains(colName))|| (null != insertFields && insertFields.contains(colName))|| (null != deleteFields && deleteFields.contains(colName))) {table.getPosMap().put(pos - 1, colName);}return null;});}} }- 監(jiān)聽binlog實(shí)現(xiàn)
- 定義Event 解析所需要轉(zhuǎn)換的java對(duì)象
- 定義binlog client BinaryLogClient
- 使用client注冊(cè)事件監(jiān)聽器com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener
- 監(jiān)聽Binlog, 收集mysql binlog datas
- 解析binlog 數(shù)據(jù)對(duì)象BinlogRowData ,用于增量索引的后續(xù)處理
因?yàn)槲覀冃枰獙inlog EventType轉(zhuǎn)換為我們的操作類型OperationTypeEnum,所以,我們?cè)贠perationTypeEnum中添加一個(gè)轉(zhuǎn)換方法:
public enum OperationTypeEnum { ...public static OperationTypeEnum convert(EventType type) {switch (type) {case EXT_WRITE_ROWS:return ADD;case EXT_UPDATE_ROWS:return UPDATE;case EXT_DELETE_ROWS:return DELETE;default:return OTHER;}} }我們還需要定義一個(gè)表包含的各個(gè)列名稱的java類,方便我們后期對(duì)數(shù)據(jù)表的CUD操作:
package com.sxzhongf.ad.mysql.constant;import java.util.HashMap; import java.util.Map;/*** Constant for 各個(gè)列名稱的java類,方便我們后期對(duì)數(shù)據(jù)表的CUD操作** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/ public class Constant {private static final String DATABASE_NAME = "advertisement";public static class AD_PLAN_TABLE_INFO {public static final String TABLE_NAME = "ad_plan";public static final String COLUMN_PLAN_ID = "plan_id";public static final String COLUMN_USER_ID = "user_id";public static final String COLUMN_PLAN_STATUS = "plan_status";public static final String COLUMN_START_DATE = "start_date";public static final String COLUMN_END_DATE = "end_date";}public static class AD_CREATIVE_TABLE_INFO {public static final String TABLE_NAME = "ad_creative";public static final String COLUMN_CREATIVE_ID = "creative_id";public static final String COLUMN_TYPE = "type";public static final String COLUMN_MATERIAL_TYPE = "material_type";public static final String COLUMN_HEIGHT = "height";public static final String COLUMN_WIDTH = "width";public static final String COLUMN_AUDIT_STATUS = "audit_status";public static final String COLUMN_URL = "url";}public static class AD_UNIT_TABLE_INFO {public static final String TABLE_NAME = "ad_unit";public static final String COLUMN_UNIT_ID = "unit_id";public static final String COLUMN_UNIT_STATUS = "unit_status";public static final String COLUNN_POSITION_TYPE = "position_type";public static final String COLUNN_PLAN_ID = "plan_id";}public static class RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO {public static final String TABLE_NAME = "relationship_creative_unit";public static final String COLUMN_CREATIVE_ID = "creative_id";public static final String COLUMN_UNIT_ID = "unit_id";}public static class AD_UNIT_DISTRICT_TABLE_INFO {public static final String TABLE_NAME = "ad_unit_district";public static final String COLUMN_UNIT_ID = "unit_id";public static final String COLUMN_PROVINCE = "province";public static final String COLUMN_CITY = "city";}public static class AD_UNIT_KEYWORD_TABLE_INFO {public static final String TABLE_NAME = "ad_unit_keyword";public static final String COLUMN_UNIT_ID = "unit_id";public static final String COLUMN_KEYWORD = "keyword";}public static class AD_UNIT_HOBBY_TABLE_INFO {public static final String TABLE_NAME = "ad_unit_hobby";public static final String COLUMN_UNIT_ID = "unit_id";public static final String COLUMN_HOBBY_TAG = "hobby_tag";}//key -> 表名//value -> 數(shù)據(jù)庫名public static Map<String, String> table2db;static {table2db = new HashMap<>();table2db.put(AD_PLAN_TABLE_INFO.TABLE_NAME, DATABASE_NAME);table2db.put(AD_CREATIVE_TABLE_INFO.TABLE_NAME, DATABASE_NAME);table2db.put(AD_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);table2db.put(RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);table2db.put(AD_UNIT_DISTRICT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);table2db.put(AD_UNIT_HOBBY_TABLE_INFO.TABLE_NAME, DATABASE_NAME);table2db.put(AD_UNIT_KEYWORD_TABLE_INFO.TABLE_NAME, DATABASE_NAME);} }
轉(zhuǎn)載于:https://www.cnblogs.com/zhangpan1244/p/11329817.html
總結(jié)
以上是生活随笔為你收集整理的[Spring cloud 一步步实现广告系统] 15. 使用开源组件监听Binlog 实现增量索引准备...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 爬取腾讯视频
- 下一篇: 给phpcms v9增加类似于phpcm