sql server cdc 清理_基于CDC技术的ElasticSearch索引同步机制
概述
ElasticSearch作為一個(gè)基于Lucene的搜索引擎被廣泛應(yīng)用于各種應(yīng)用系統(tǒng),比如電商、新聞?lì)悺⒆稍冾惥W(wǎng)站。在使用ElasticSearch開發(fā)應(yīng)用的過程中,一個(gè)非常重要的過程是將數(shù)據(jù)導(dǎo)入到ElasticSearch索引中建立文檔。在一開始系統(tǒng)規(guī)模比較小時(shí),我們可以使用logstash來同步索引。logstash的好處是開方量少,只要進(jìn)行編寫簡單的索引模板和同步sql,就能快速搭建索引同步程序。但是隨著應(yīng)用數(shù)據(jù)規(guī)模的變大,索引變化變得非常頻繁。logstash的缺點(diǎn)也隨著暴露,包括(1)不支持刪除,只能通過修改字段屬性軟刪除,隨著應(yīng)用使用時(shí)間的增長,ElasticSearch中會(huì)留存大量的無用數(shù)據(jù),拖慢搜索速度。(2)sql分頁效率低,sql查詢慢。logstash的分頁邏輯是先有一個(gè)大的子查詢,然后再從子查詢中分頁獲取數(shù)據(jù),因此效率低下,當(dāng)數(shù)據(jù)庫數(shù)據(jù)量大時(shí),一個(gè)分頁查詢就需要幾百秒。同步幾千萬數(shù)據(jù)可能需要1天時(shí)間。因此我們決定放棄使用logstash,而改用使用canal來搭建基于CDC技術(shù)的ElasticSearch索引同步機(jī)制。
系統(tǒng)架構(gòu)設(shè)計(jì)
如圖所示,索引同步系統(tǒng)由幾個(gè)部分組成,下面分點(diǎn)介紹。
(1)數(shù)據(jù)庫
原始數(shù)據(jù)數(shù)據(jù)庫
(2)Canal
Canal是阿里云開源的MySql數(shù)據(jù)庫增量數(shù)據(jù)訂閱和消費(fèi)工具。它的實(shí)現(xiàn)原理是將自己偽裝為一個(gè)MySQL slave,向MySql master發(fā)送dump協(xié)議;MySQL master收到dump請求,開始推送binary log給slave,canal解析binary log對象。
(3)Canal Client
Canal Client是自己實(shí)現(xiàn)的程序,通過從Canal Server中獲取經(jīng)過Canal解析之后的數(shù)據(jù)庫binlog日志,做相應(yīng)的業(yè)務(wù)邏輯處理。在本文介紹的基于CDC的索引同步系統(tǒng)中,Canal Client訂閱搜索相關(guān)的數(shù)據(jù)庫表的binlog日志,如果跟數(shù)據(jù)搜索相關(guān)的數(shù)據(jù)發(fā)生變化時(shí),就向Rabbit發(fā)一條消息,表明數(shù)據(jù)發(fā)生變化了,通知同步Worker從MySQL同步數(shù)據(jù)到ES。
(4)RabbitMQ
消息隊(duì)列,也可以選用Kafaka等其他消息隊(duì)列,根據(jù)具體業(yè)務(wù)確定。
(5)索引同步Worker
Worker從消息隊(duì)列中消費(fèi)數(shù)據(jù),根據(jù)消息從MySQL獲取相應(yīng)的數(shù)據(jù)并同步到ElasticSearch中。
Canal Client實(shí)現(xiàn)
Canal Client從Canal Server中獲取binlog日志,并根據(jù)業(yè)務(wù)需求進(jìn)行處理。以下通過一些關(guān)鍵代碼介紹Canal Client的實(shí)現(xiàn)。
(1)在pom中添加Canal client的依賴。
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>(2)初始化Canal連接
CanalConfig包含了Canal的配置信息。CanalConnector為canal-client包中的類,我們通過這個(gè)類來連接server,獲取binlog,關(guān)閉server。該服務(wù)基于SpringBoot。因此init會(huì)在CanalClientInitializer bean被創(chuàng)建時(shí)被調(diào)用,preDestory會(huì)在服務(wù)關(guān)閉,CanClientInitializer被銷毀時(shí)被調(diào)用。
@Component @Slf4j public class CanalClientInitializer {CanalConfig canalConfig;CanalConnector connector;CanalDataProcessor canalDataProcessor;public CanalClientInitializer(@Autowired CanalConfig canalConfig, @Autowired CanalDataProcessor canalDataProcessor) {this.canalConfig = canalConfig;this.canalDataProcessor = canalDataProcessor;}@PostConstructpublic void init() throws InterruptedException {connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), "", "");//建立連接connector.connect();//訂閱相關(guān)的表connector.subscribe(canalConfig.getSyncTable());canalDataProcessor.process(connector);}@PreDestroypublic void preDestroy() {log.info("stop the canal client");canalDataProcessor.stopProcess();}}(3)CanalDataProcessor獲取并處理binlog
@Component @Slf4j public class CanalDataProcessor {boolean isRunning;RabbitTemplate rabbitTemplate;TableChangeProcessor tableChangeProcessor;public CanalDataProcessor(@Autowired RabbitTemplate rabbitTemplate, @Autowired TableChangeProcessor processor) {this.rabbitTemplate = rabbitTemplate;this.tableChangeProcessor = processor;}@Asyncpublic void process(CanalConnector connector) throws InterruptedException {isRunning = true;while (isRunning) {try {//獲取消息Message message = connector.getWithoutAck(100, 10L, TimeUnit.SECONDS);//業(yè)務(wù)處理邏輯processMessage(message);//消息被成功執(zhí)行,向Canal Server發(fā)送ack消息通知server該message已經(jīng)被處理完成connector.ack(message.getId());} catch (Exception e) {log.error("wtf", e);//當(dāng)消息沒被成功處理完成時(shí)進(jìn)行回滾,下次能夠重新獲取該Messageconnector.rollback();Thread.sleep(1000);}}connector.disconnect();}public void stopProcess() {isRunning = false;}private void processMessage(Message message) {for(Entry entry : message.getEntries()) {try {tableChangeProcessor.process(entry);} catch (Exception e) {log.error("wtf", e);continue;}}} }(4)TableChangeProcessor
TableChangeProcessor中為具體的業(yè)務(wù)邏輯,處理Message,獲取跟搜索相關(guān)的數(shù)據(jù)變化,發(fā)送相應(yīng)的消息到消息隊(duì)列中。
注意點(diǎn)
(1)忽略搜索無關(guān)的數(shù)據(jù)字段變化,避免不必要的索引更新,降低服務(wù)器壓力。如Products表中有一個(gè)product_weight表示商品重量發(fā)生了變化,但其實(shí)商品重量跟搜索無關(guān),那就不要關(guān)心這個(gè)變化。
(2)對于搜索中不會(huì)出現(xiàn)的數(shù)據(jù),不要寫入到ES中,比如電商商品中的下架商品,另外,如果商品被下架,則要進(jìn)行監(jiān)聽通知索引同步Worker從es中刪除索引文檔。這樣能夠降低ES中總的索引文檔數(shù)量,提升搜索效率。
(3)要考慮Rabbit掛掉或者隊(duì)列寫滿,消息無法寫入的情況;首先應(yīng)該在Rabbit發(fā)送消息時(shí)添加重試,其次應(yīng)該在重試幾次還是失敗的情況下拋出異常,canal消息流回滾,下次還是能夠獲取到這個(gè)數(shù)據(jù)變化的Canal消息,避免數(shù)據(jù)變動(dòng)的丟失。
(4)注意目前Canal只支持單Client。如果要實(shí)現(xiàn)高可用,則需要依賴于ZooKeeper,一個(gè)Client作為工作Client,其余Client作為冷備,當(dāng)工作Client掛掉時(shí),冷備Client監(jiān)聽到ZooKeeper數(shù)據(jù)變化,搶占鎖成為工作Client。
Canal Worker實(shí)現(xiàn)
索引同步Worker從消息隊(duì)列中獲取Canal Client發(fā)送的跟搜索相關(guān)的數(shù)據(jù)庫變化消息。舉個(gè)例子,比如商品表中跟搜索相關(guān)的字段發(fā)生了變化,Canal Client會(huì)發(fā)送以下一條數(shù)據(jù):
{"change_id": "694212527059369984","change_type": 1, //商品發(fā)生變化"change_time": "1600741397" }在Worker中監(jiān)聽隊(duì)列消息:
@Component @Slf4j public class ProductChangeQueueListener {@Autowired@Qualifier("snake")ObjectMapper om;@AutowiredChangeEventHandlerFactory changeEventHandlerFactory;@RabbitListener(queues = RabbitConfig.PRODUCT_QUEUE_NAME, containerFactory = "customRabbitListenerContainerFactory")public void onChange(Message message) {ChangeEvent event = parse(message);if(event == null) {return;}changeEventHandlerFactory.handle(event);}private ChangeEvent parse(Message message) {ChangeEvent event = null;try {event = om.readValue(new String(message.getBody()), ChangeEvent.class);} catch (Exception e) {log.error("同步失敗,解析失敗", e);}return event;}}ChangeEventHandlerFactory為事件處理器的工廠類。以下為一個(gè)事件處理器的實(shí)現(xiàn)。它監(jiān)聽changeType為CHANGE_TYPE_OUT_PRODUCT的事件,從數(shù)據(jù)庫中獲取到變動(dòng)的數(shù)據(jù),構(gòu)建ES的IndexRequest,并將Request存入到RequestBulkBuffer中,等待批量同步到ES中。有些同學(xué)可能會(huì)有疑問,為何不直接從Canal中獲取數(shù)據(jù),主要原因是Canal中只包含了單表數(shù)據(jù),但是索引文檔可能包含了多表的數(shù)據(jù),因此還需要從MySQL獲取數(shù)據(jù)。如果索引文檔中只包含單表數(shù)據(jù),可以考慮在ChangeEvent中包含修改之后的數(shù)據(jù),索引同步Woker就不用再從MySql中再獲取一遍數(shù)據(jù),提升Worker工作效率。
@Component @Slf4j public class OutProductEventHandler implements ChangeEventHandler {@AutowiredProductDao productDao;@AutowiredRequestBulkBuffer buffer;@AutowiredOutProductChangeRequestBuilder builder;@Override@Retryablepublic boolean handle(ChangeEvent changeEvent) {if (!match(changeEvent)) {return false;}Tuple dataTuple = productDao.getProductWithStore(changeEvent.getChangeId());if (dataTuple == null) {return true;}Product product = dataTuple.get(QProduct.product);Store store = dataTuple.get(QStore.store);IndexRequest request = null;try {request = builder.convertToUpdateQuery(getTimestampNow(), product, store);} catch (Exception e) {log.error("wtf", e);}if (request == null) {return true;}buffer.add(request);return true;}@Overridepublic boolean match(ChangeEvent changeEvent) {return ChangeEvent.CHANGE_TYPE_OUT_PRODUCT == changeEvent.getChangeType();} }在上面的OutProductEventHandler類中,我們并不直接在該類中使用RestHighLevelClient將文檔更新到ES索引,而是將IndexRequest暫存到RequestBulkBuffer中。RestBulkBuffer使用CircularFifoBuffer作為存儲(chǔ)數(shù)據(jù)結(jié)構(gòu)。
@Component public class RequestBulkBuffer {CircularFifoBuffer buffer;public RequestBulkBuffer(CircularFifoBuffer buffer) {this.buffer = buffer;}public void add(DocWriteRequest<?> request) {buffer.add(request);}}CircularFifoBuffer是一個(gè)經(jīng)過改造的環(huán)形隊(duì)列實(shí)現(xiàn)。允許多線程寫,在我們這個(gè)應(yīng)用場景中只支持也只需支持單線程讀->處理->移除處理完的數(shù)據(jù)。當(dāng)環(huán)形隊(duì)列緩存滿時(shí),借助于semaphore,寫入線程將會(huì)被阻塞,在后面的Worker如何防止數(shù)據(jù)丟失中,我們來闡述為什么要這么做。
/*** 允許多線程寫* 只允許單線程->讀->處理->移除*/ public class CircularFifoBuffer {private Logger logger = LoggerFactory.getLogger(CircularFifoBuffer.class.getName());private transient Object[] elements;private transient int start = 0;private transient int end = 0;private transient boolean full = false;private final int maxElements;private ReentrantLock addLock;private Semaphore semaphore;public CircularFifoBuffer(int size) {if (size <= 0) {throw new IllegalArgumentException("The size must be greater than 0");}elements = new Object[size];maxElements = elements.length;addLock = new ReentrantLock();semaphore = new Semaphore(size);}public int size() {int size = 0;if (end < start) {size = maxElements - start + end;} else if (end == start) {size = (full ? maxElements : 0);} else {size = end - start;}return size;}public boolean isEmpty() {return size() == 0;}public boolean isFull() {return size() == maxElements;}public int maxSize() {return maxElements;}public void clear() {full = false;start = 0;end = 0;Arrays.fill(elements, null);}public boolean add(Object element) {if (null == element) {throw new NullPointerException("Attempted to add null object to buffer");}addLock.lock();try {semaphore.acquire();} catch (Exception e) {logger.error("RingBuffer", "線程退出,添加失敗");return false;}elements[end++] = element;if (end >= maxElements) {end = 0;}if (end == start) {full = true;}addLock.unlock();return true;}public Object get() {if (isEmpty()) {return null;}return elements[start];}public Object remove() {if (isEmpty()) {return null;}Object element = elements[start];if(null != element) {elements[start++] = null;if (start >= maxElements) {start = 0;}full = false;semaphore.release();}return element;}/*** @param size the max size of elements will return*/public Object[] get(int size) {int queueSize = size();if (queueSize == 0) { //emptyreturn new Object[0];}int realFetchSize = queueSize >= size ? size : queueSize;if (end > start) {return Arrays.copyOfRange(elements, start, start + realFetchSize);} else {if (maxElements - start >= realFetchSize) {return Arrays.copyOfRange(elements, start, start + realFetchSize);} else {return ArrayUtils.addAll(Arrays.copyOfRange(elements, start, maxElements),Arrays.copyOfRange(elements, 0, realFetchSize - (maxElements - start)));}}}public Object[] getAll() {return get(size());}public Object[] remove(int size) {if(isEmpty()) {return new Object[0];}int queueSize = size();int realFetchSize = queueSize >= size ? size : queueSize;Object [] retArr = new Object[realFetchSize];for(int i=0;i<realFetchSize;i++) {retArr[i] = remove();}return retArr;}}下面這個(gè)類為緩存的消費(fèi)者,它循環(huán)從buffer中獲取一定數(shù)據(jù)的數(shù)據(jù),并使用RestHighLevelClient將數(shù)據(jù)批量同步到ES。在Worker啟動(dòng)時(shí),會(huì)創(chuàng)建一個(gè)線程調(diào)用startConsume,在服務(wù)關(guān)閉時(shí)該線程結(jié)束。
@Slf4j public class RequestBulkConsumer {private static final int DEFAULT_BULK_SIZE = 2000;private CircularFifoBuffer buffer;private EsBulkRequestService service;private boolean isRunning = false;private int bulkSize = DEFAULT_BULK_SIZE;public RequestBulkConsumer(CircularFifoBuffer buffer, RestHighLevelClient client) {this.buffer = buffer;this.service = new EsBulkRequestService(client);}public void setBulkSize(int size) {this.bulkSize = size;}public int getBulkSize() {return bulkSize;}public boolean isRunning() {return isRunning;}public void startConsume() {if(isRunning) {return;}isRunning = true;while(true) {if(!isRunning) {break;}Object [] items = buffer.get(bulkSize);if(items.length == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {break;}} else {List<DocWriteRequest<?>> requests = convert(items);try {BulkResponse response = service.request(requests);processResponse(response);buffer.remove(items.length);if (items.length < bulkSize) {Thread.sleep(3000);}} catch (InterruptedException e) {break;} catch (IOException e) {log.error("wtf", e);} catch (Exception e) {log.error("wtf", e);buffer.remove(items.length);}}}}private List<DocWriteRequest<?>> convert(Object [] items) {return Stream.of(items).map(i -> {if(i instanceof DocWriteRequest) {return (DocWriteRequest<?>) i;} else {return null;}}).filter(Objects::nonNull).collect(Collectors.toList());}public void stop() {isRunning = false;}private void processResponse(BulkResponse bulkResponse) {BulkItemResponse [] itemResponseArr = bulkResponse.getItems();for(BulkItemResponse resp : itemResponseArr) {DocWriteResponse docWriteResponse = resp.getResponse();if(docWriteResponse instanceof IndexResponse) {IndexResponse indexResponse = (IndexResponse) docWriteResponse;if(indexResponse.getResult() != Result.CREATED && indexResponse.getResult() != Result.UPDATED) {if(indexResponse.status() == RestStatus.CONFLICT) {continue;} else {log.error("索引更新失敗: {}, {}", indexResponse.getId(), resp.getFailureMessage());}}} else if(docWriteResponse instanceof DeleteResponse) {DeleteResponse deleteResponse = (DeleteResponse) docWriteResponse;if(deleteResponse.getResult() != Result.DELETED) {log.error("索引刪除失敗: {}, {}", deleteResponse.getId(), resp.getFailureMessage());}}}} }以下為Worker的主要幾個(gè)類的代碼。在索引同步系統(tǒng)中,高可用并不是最重要的,因?yàn)槲覀兊乃阉鞅旧硎且粋€(gè)準(zhǔn)實(shí)時(shí)系統(tǒng),只需要保證最終一致性就可以了,我們主要需要避免的是數(shù)據(jù)變更的丟失。以下說明在Worker中是如何避免數(shù)據(jù)丟失的。
避免數(shù)據(jù)丟失
(1)如果Rabbit掛掉,沒關(guān)系,Canal Client那邊在Rabbit掛掉期間無法消費(fèi)binlog,會(huì)等待Rabbit重啟之后再處理數(shù)據(jù)變化。Worker只要能做到Rabbit重啟之后重連就行。
(2)如果MySQL掛掉,則Worker無法從數(shù)據(jù)庫中獲取數(shù)據(jù),則消息處理失敗,消息會(huì)堆積在Rabbit中。等MySQL重新上線之后,消息重新開始處理,數(shù)據(jù)也不會(huì)丟失。
(3)如果ES掛掉,則批量處理線程消費(fèi)buffer中的數(shù)據(jù)時(shí)會(huì)失敗,buffer會(huì)被生產(chǎn)者填滿,由于CircularFifoBuffer在被填滿時(shí)使用了信號(hào)量阻塞生產(chǎn)者線程,消息又會(huì)被堆積在Rabbit中,等待ES重新上線之后,消息重新開始處理,數(shù)據(jù)也不會(huì)丟失。
(4)如果Rabbit隊(duì)列被寫滿,emmm,設(shè)置好在內(nèi)存被占滿時(shí)將消息寫入硬盤然后搞一個(gè)大一點(diǎn)的硬盤吧,Rabbit默認(rèn)應(yīng)該就是這么做的。然后做好預(yù)警,當(dāng)消息達(dá)到一定量時(shí)抓緊處理,一般來說可能性不是很大。
(5)版本沖突,如果商品表中某一條數(shù)據(jù)如商品A在同一秒內(nèi)變化了兩次,消息隊(duì)列中有連續(xù)兩條消息,又由于這兩條消息可能在兩個(gè)線程中被消費(fèi),由于網(wǎng)絡(luò),計(jì)算機(jī)性能等原因,先變的數(shù)據(jù)后被寫入ES中,導(dǎo)致ES中數(shù)據(jù)和MySql數(shù)據(jù)不一致。因此我們在更新索引時(shí)使用ES的外部版本號(hào)。使用從MySQL中取數(shù)據(jù)時(shí)的時(shí)間戳作為版本號(hào),只有當(dāng)時(shí)間戳比當(dāng)前版本號(hào)大或相等時(shí)才能變更文檔,否則ES會(huì)報(bào)版本沖突錯(cuò)誤。
private IndexRequest convertToUpdateQuery(Long timestamp, OutStoreProduct outStoreProduct) throws JsonProcessingException {IndexRequest indexRequest = new IndexRequest(indexName, "doc", outStoreProduct.getId());if(StringUtils.isEmpty(outStoreProduct.getTooEbaoProductId())) {log.error("商品 {} 的ebaoProductId為空,無法同步", outStoreProduct.getId());return null;}indexRequest.source(om.writeValueAsString(outStoreProduct), XContentType.JSON).versionType(VersionType.EXTERNAL_GTE).version(timestamp).routing(outStoreProduct.getTooEbaoProductId());return indexRequest;}關(guān)于全量同步
以上只是實(shí)現(xiàn)了增量同步,在索引初始化時(shí),我們需要做全量同步操作,將數(shù)據(jù)從數(shù)據(jù)庫初始化到ES索引中。我們可以在Worker中寫一個(gè)接口,該接口實(shí)現(xiàn)邏輯分批將數(shù)據(jù)同步任務(wù)發(fā)到消息隊(duì)列中,其它worker收到消息后完成對應(yīng)任務(wù)。比如我們可以發(fā)布每一個(gè)門店的數(shù)據(jù)同步任務(wù),worker每收到一個(gè)消息,同步一個(gè)門店的數(shù)據(jù)。
總結(jié)
綜上,本系統(tǒng)是一個(gè)近實(shí)時(shí)的能夠保證ES和MySQL數(shù)據(jù)一致性的高效索引同步系統(tǒng)。
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的sql server cdc 清理_基于CDC技术的ElasticSearch索引同步机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 再添新航点:国产大飞机 C919 飞抵青
- 下一篇: mysql 8.0数据备份恢复_MySQ