Canal Mysql binlog 同步至 ElasticSearch 详细介绍
文章目錄
- 數(shù)據(jù)同步ElasticSearch
- 單表基本配置
- 適配器映射文件詳細(xì)介紹(單表、多表映射介紹)
- 單表映射索引示例sql
- 單表映射索引示例sql帶函數(shù)或運算操作
- 多表映射(一對一, 多對一)索引示例sql
- 多表映射(一對多)索引示例sql
- 其它類型的sql示例
- 注意事項
本文詳細(xì)介紹Canal 配置保存 ElasticSearch
Canal從零配置使用參考:https://blog.csdn.net/zhangshenghang/article/details/120361721
數(shù)據(jù)同步ElasticSearch
我們接著在之前配置Hbase基礎(chǔ)上直接修改配置,實現(xiàn)同時同步ElasticSearch
單表基本配置
- 1.修改啟動器配置 {canal-apapter}/conf/application.yml
- 2.ElasticSearch 表映射文件
- 3 重啟服務(wù)
寫入數(shù)據(jù)
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());查看adapter日志
2021-09-20 13:53:07.279 [pool-1-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} 2021-09-20 13:53:07.286 [pool-1-thread-1] DEBUG c.a.o.c.client.adapter.hbase.service.HbaseSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} 2021-09-20 13:53:07.287 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} Affected indexes: testsync2查看ElasticSearch數(shù)據(jù)
至此寫入ElasticSearch、Hbase成功
適配器映射文件詳細(xì)介紹(單表、多表映射介紹)
${adapter}/conf/es7/xxx.yml
dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對應(yīng)上面配置的srcDataSources中的值 outerAdapterKey: exampleKey # 對應(yīng)application.yml中es配置的key destination: example # cannal的instance或者M(jìn)Q的topic groupId: # 對應(yīng)MQ模式下的groupId, 只會同步對應(yīng)groupId的數(shù)據(jù) esMapping:_index: mytest_user # es 的索引名稱_type: _doc # es 的type名稱, es7下無需配置此項_id: _id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配 # pk: id # 如果不需要_id, 則需要指定一個屬性為主鍵屬性# sql映射sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,a.c_time as _c_time, c.labels as _labels from user aleft join role b on b.id=a.role_idleft join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id" # objFields: # _labels: array:; # 數(shù)組或者對象屬性, array:; 代表以;字段里面是以;分隔的 # _obj: object # json對象etlCondition: "where a.c_time>='{0}'" # etl 的條件參數(shù)commitBatch: 3000 # 提交批大小sql映射說明:
sql支持多表關(guān)聯(lián)自由組合, 但是有一定的限制:
Elastic Search的mapping 屬性與sql的查詢值將一一對應(yīng)(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name將映射到es mapping的name field, _email將 映射到mapping的_email field, 這里以別名(如果有別名)作為最終的映射字段. 這里的_id可以填寫到配置文件的 _id: _id映射.
單表映射索引示例sql
select a.id as _id, a.name, a.role_id, a.c_time from user a該sql對應(yīng)的es mapping示例:
{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"}}}}} }單表映射索引示例sql帶函數(shù)或運算操作
select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a函數(shù)字段后必須跟上別名, 該sql對應(yīng)的es mapping示例:
{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"}}}}} }多表映射(一對一, 多對一)索引示例sql
select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a left join role b on b.id = a.role_id注:這里join操作只能是left outer join, 第一張表必須為主表!!
該sql對應(yīng)的es mapping示例:
多表映射(一對多)索引示例sql
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a left join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id注:left join 后的子查詢只允許一張表, 即子查詢中不能再包含子查詢或者關(guān)聯(lián)!!
該sql對應(yīng)的es mapping示例:
{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"},"labels": {"type": "text"}}}}} }其它類型的sql示例
- geo type
- 復(fù)合主鍵
- 數(shù)組字段
配置中使用:
objFields:labels: array:;- 對象字段
配置中使用:
objFields:description: object其中a.description字段內(nèi)容為json字符串
- 父子文檔索引
es/customer.yml
es/order.yml
esMapping:_index: customer_type: _doc_id: _idrelations:customer_order:name: orderparent: customer_idsql: "select concat('oid_', t.id) as _id,t.customer_id,t.id as order_id,t.serial_code as order_serial,t.c_time as order_timefrom biz_order t"skips:- customer_idmapping示例:
{"mappings":{"_doc":{"properties":{"id": {"type": "long"},"name": {"type": "text"},"email": {"type": "text"},"order_id": {"type": "long"},"order_serial": {"type": "text"},"order_time": {"type": "date"},"customer_order":{"type":"join","relations":{"customer":"order"}}}}} }注意事項
- 多表映射時,主表數(shù)據(jù)必須插入,如果只插入子表不插入主表,數(shù)據(jù)無法同步到ElasticSearch;相反只插入主表,子表不進(jìn)行插入,數(shù)據(jù)是可以同步到ElasticSearch的
- 多表映射時,如果主表關(guān)聯(lián)id寫入后,子表再進(jìn)行修改之前的關(guān)聯(lián)的id為我們主表寫入的id,數(shù)據(jù)是無法同步到ElasticSearch中的。
總結(jié)
以上是生活随笔為你收集整理的Canal Mysql binlog 同步至 ElasticSearch 详细介绍的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python pip清华源安装库
- 下一篇: Hive - HWI 简单使用