mysql同步binlog_利用MySQL的Binlog实现数据同步与订阅(下)
利用MySQL的Binlog實現數據同步與訂閱(下)?blog.yuanpei.me
終于到這個系列的最后一篇,在前兩篇博客中,我們分別了介紹了Binlog的概念和事件總線(EventBus)的實現,在完成前面這將近好幾千字的鋪墊以后,我們終于可以進入正題,即通過EventBus發布Binlog,再通過編寫對應的EventHandler來訂閱這些Binlog,這樣就實現了我們“最初的夢想”。坦白說,這個過程實在有一點漫長,慶幸的是,它終于還是來了。
Binlog讀取與解析
首先,我們通過 Python-Mysql-Replication 這個項目來讀取Binlog,直接通過pip install mysql-replication安裝即可。接下來,我們編寫一個簡單的腳本文件:
def readBinLog():
stream = BinLogStreamReader(
# 填寫IP、賬號、密碼即可
connection_settings = {
'host': '',
'port': 3306,
'user': '',
'passwd': ''
},
# 每臺服務器唯一
server_id = 3,
# 主庫Binlog讀寫完畢時是否阻塞連接
blocking = True,
# 篩選指定的表
only_tables = ['order_info', 'log_info'],
# 篩選指定的事件
only_events = [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
for row in binlogevent.rows:
event = {
"schema": binlogevent.schema,
"table": binlogevent.table,
"log_pos": binlogevent.packet.log_pos
}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["origin"] = dict(row["values"].items())
event["current"] = None
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["origin"] = dict(row["before_values"].items())
event["current"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["origin"] = None
event["current"] = dict(row["values"].items())
event = dict(event.items())
stream.close()
發布Binlog
在讀取到Binlog以后,我們需要將其發布到EventBus里,為此,在.NET Core這邊提供一個Web API接口,只需要注入IEventBus然后調用Publish()即可:
// Post: //Publish[HttpPost]
[Route ("PublishBinLog")]
public Task PublishBinLog (BinLogEventModel eventModel)
{
if (eventModel.action == "insert" && eventModel.table.StartsWith ("log_"))
_eventBus.Publish (eventModel.MapTo ());
if (eventModel.action == "insert" && eventModel.table == "order_info")
_eventBus.Publish (eventModel.MapTo ());
return Task.CompletedTask;
}
相應地,我們需要在腳本中添加調用Web API的邏輯代碼,使用我們最熟悉的requests庫即可:
def sendBinLog(event):
url = "https://localhost:44348/EventBus/PublishBinLog"
headers = {
'Content-Type': "application/json",
}
try:
payload = json.dumps(event,cls=ComplexEncoder)
response = session.request("POST", url, data=payload, headers=headers, verify=False)
except Exception:
pass
在這里,在處理Binlog的序列化的問題時,我們可能會遇到默認的JSON序列化器無法對event進行序列化的問題,此時,我們可以編寫一個自定義的序列化器,下面是博主目前在使用的序列化器:
class ComplexEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d%H:%M:%S')
elif isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
elif isinstance(obj, decimal.Decimal):
return str(obj)
elif isinstance(obj, bytes):
return obj.decode('utf-8')
else:
return json.JSONEncoder.default(self, obj)
訂閱Binlog
現在,為了訂閱這些Binlog,我們來編寫對應的EventHandler,這里我們定義兩個EventHandler,一個用于打印日志編號、日志內容、日志級別等信息,一個用于統計不同級別的日志的數目。代碼實現如下:
//打印日志的EventHandlerpublic class WriteLogEventHandler : IEventHandler {
private ILogger _logger;
public WriteLogEventHandler (ILogger logger) {
_logger = logger;
}
public Task Handle (WriteLogEvent @event) {
_logger.LogInformation ($"日志編號:{@event.TRANSACTION_ID},日志級別:{@event.LOG_LEVEL},主機:{@event.HOST_NAME},IP:{@event.HOST_IP},內容:{@event.CONTENT}");
return Task.CompletedTask;
}
}
//分析日志的EventHandlerpublic class AnalyseLogEventHandler : IEventHandler {
private readonly ILogger _logger;
private readonly IDistributedCache _cache;
public AnalyseLogEventHandler (ILogger logger, IDistributedCache cache) {
_logger = logger;
_cache = cache;
}
public Task Handle (WriteLogEvent @event) {
var cacheCount = _cache.GetString (@event.LOG_LEVEL);
if (string.IsNullOrEmpty (cacheCount))
cacheCount = "1";
else
cacheCount = (int.Parse (cacheCount) + 1).ToString ();
_cache.SetString (@event.LOG_LEVEL, cacheCount);;
return Task.CompletedTask;
}
}
注意,這里需要在Startup中注入EventHandler、EventBus以及各種必要的依賴項,你可以手動注冊,或者參考下面的代碼,實現掃描注冊:
services.AddSingleton ();
services.AddSingleton (sp => new EventBusSubscriptionManager ());
services.AddSingleton (sp => new ConnectionFactory () { HostName = "localhost", UserName = "guest", Password = "guest" });
services.AddSingleton ();
services.AddControllers ().AddNewtonsoftJson ();
services.AddDistributedMemoryCache (options => {
options.ExpirationScanFrequency = TimeSpan.FromMinutes (5);
options.SizeLimit = 10;
});
//自動注冊services.AddEventBus();
//手動注冊services.AddSingleton (sp => {
var eventBus = new RabbitMQEventBus (sp.GetRequiredService (), sp.GetRequiredService (), sp.GetRequiredService> (), sp, "eventbus-exchange", "eventbus-queue");
eventBus.Subscribe():
eventBus.Subscribe();
return eventBus;
});
services.AddTransient();
services.AddTransient();
一起來看看效果,簡直太完美了,我就是不想寫中間表啊,這樣多好!!!
本文小結
通過三篇博客的篇幅,我們實現了“利用MySQL的Binlog實現數據同步與訂閱”的想法。在這個過程中,我們了解了Binlog的相關概念,參考微軟的 eShopOnContainers 項目實現了一個基于RabbitMQ的EventBus,而這一切都在這篇博客中完成了最終的“拼合”,通過 Python-Mysql-Replication 實現了Binlog解析,而EventBus則作為整個事件系統的“上帝”對所有事件處理器(EventHandler)進行統一調度,最終我們不需要關心這些事件是如何被發布到EventBus中的,只需要知道它對應哪一個Event并為它編寫對應的EventHandler即可,除了這篇博客中提到的Binlog以外,實際上它還可以作為系統內的“領域事件”來實現業務上的事件驅動,譬如OrderInfoCreateEvent這個事件可以表示一個訂單被創建,而關心訂單狀態的人則可以通過EventHandler來實現訂閱,實現類似發短信、發郵件、發微信等等的功能,或者可以讓第三方的Web API來消費事件中攜帶的信息。同理,第三方的數據在流入系統時,可以先發布到消息隊列中,再通過對應的EventHandler來進行異步處理,極大地改善系統接口的吞吐性能,而如果在這中間抽象出來一個數據交換層出來,那么就能收獲更多不一樣的東西,就在寫這篇博客的時候,我在Github上的代碼被收入了微軟的"北極冰川火種計劃",雖然數字世界遠比現實世界寬廣得多,可能為這個世界減少一點“無用”的數據或者代碼,應該一樣可以算作是環保行為吧!
總結
以上是生活随笔為你收集整理的mysql同步binlog_利用MySQL的Binlog实现数据同步与订阅(下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql高可用_mysql高可用方案
- 下一篇: mysql binary mode_my