mysql switch binlog_如何使用 Golang 处理 MySQL 的 binlog
大家好,我是 Artem,一名 Golang 開發(fā)。我們的團(tuán)隊(duì)花費(fèi)了大量時間訓(xùn)練 MySQL binlog。這里整合一些簡單用法,不會放過任何隱藏的陷阱。示例代碼將在最后顯示。
每次從 數(shù)據(jù)庫 查詢的返回結(jié)果中拉取用戶信息時,主項(xiàng)目中會有高負(fù)載模塊。此時使用緩存是一個不錯的建議,但是什么時候重置緩存呢?這需要由數(shù)據(jù)來決定更新時間。
MySQL 的主從復(fù)制是一個很棒的設(shè)計(jì)。而我們的守護(hù)進(jìn)程可以視為一個通過 binlog 獲取數(shù)據(jù)的 slave,binlog 設(shè)置成 row 格式。這樣就能使用所有的數(shù)據(jù)庫命令,但事務(wù)下的命令只有在提交后才會記錄。在達(dá)到內(nèi)存的使用限制后(默認(rèn)為 1GB),會開啟另一個文件,每個新文件的名稱后都會有一個增量。
本文將分為以下兩部分:
如何處理 binlog 中的新數(shù)據(jù)
如何設(shè)置和擴(kuò)展
part 1. 快速運(yùn)行
連接到一個新的 channel(chanal 是一個庫的標(biāo)簽)。我們將使用 binlog 中的 row 格式https://mariadb.com/kb/en/library/binary-log-formats/
。
func binLogListener() {
c, err := getDefaultCanal()
if err == nil {
coords, err := c.GetMasterPos()
if err == nil {
c.SetEventHandler(&binlogHandler{})
c.RunFrom(coords)
}
}
}
func getDefaultCanal() (*canal.Canal, error) {
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306)
cfg.User = "root"
cfg.Password = "root"
cfg.Flavor = "mysql"
cfg.Dump.ExecutionPath = ""
return canal.NewCanal(cfg)
}
現(xiàn)在來進(jìn)行封裝
type binlogHandler struct {
canal.DummyEventHandler // Dummy handler from external lib
BinlogParser // Our custom helper
}
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil}
func (h *binlogHandler) String() string {return "binlogHandler"}
然后我們可以在 OnRow() 方法中添加一些代碼邏輯,讓他更好用
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {
var n int //starting value
var k int // step
switch e.Action {
case canal.DeleteAction:
return nil // not covered in example
case canal.UpdateAction:
n = 1
k = 2
case canal.InsertAction:
n = 0
k = 1
}
for i := n; i < len(e.Rows); i += k {
key := e.Table.Schema + "." + e.Table.Name
switch key {
case User{}.SchemaName() + "." + User{}.TableName():
/*
Real data parsing
*/
}
}
return nil
}
這個包裝器的主要邏輯是解析接收到的數(shù)據(jù)。我們可以通過更新的兩個條件獲取數(shù)據(jù)(第一條包含初始數(shù)據(jù),第二條則是更新數(shù)據(jù)),同時也支持多行插入和多行更新。在這種情況下,執(zhí)行 UPDATE 操作時,每次都要使用第二個條件。而執(zhí)行 INSERT 時,需要操作每一行,為此我們需要使用 n 和 k 變量。
從 binlog 中獲取一個模版,逐行加載數(shù)據(jù),每個 column 都標(biāo)明注釋:
type User struct {
Id int `gorm:"column:id"`
Name string `gorm:"column:name"`
Status string `gorm:"column:status"`
Created time.Time `gorm:"column:created"`
}
func (User) TableName() string {
return "User"
}
func (User) SchemaName() string {
return "Test"
}
MySQL 的表結(jié)構(gòu)體
CREATE TABLE Test.User(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(40) NULL ,
status ENUM("active","deleted") DEFAULT "active",
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
)
ENGINE =InnoDB;
現(xiàn)在使用代碼的方式實(shí)現(xiàn)
user := User{}
h.GetBinLogData(&user, e, i)
最后,我們可以通過新增加的用戶變量獲取數(shù)據(jù)。打印出來讓它看起來更美觀。
if e.Action == canal.UpdateAction {
oldUser := User{}
h.GetBinLogData(&oldUser, e, i-1)
fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, )
} else {
fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, )
}
太好了,代碼即將實(shí)現(xiàn), "Hello, binlog world":
func main() {
go binLogListener()
// placeholder for your handsome code
time.Sleep(2 * time.Minute)
fmt.Print("Thx for watching")
}
新增和更新用戶:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack");
UPDATE Test.User SET name="Jonh" WHERE id=1;
結(jié)果展示:
User 1 is created with name Jack
User 1 name changed from Jack to Jonh
這段代碼通過 binlog 來解析新增的 row,并通過數(shù)據(jù)表獲取我們需要的數(shù)據(jù),在結(jié)構(gòu)體中解析數(shù)據(jù)并輸出結(jié)果。我沒有介紹所有的數(shù)據(jù)解析器(BinlogParser),這其中還隱藏了一些 hydration 邏輯模型。
part 2. 正如 cobb 所說,我們需要更加深入了解
解析器的隱藏部分是基于反射,可以使用下面這種方式來進(jìn)行 hydration 模型。
h.GetBinLogData(&user, e, i)
使用一些簡單的數(shù)據(jù)類型來處理
bool
int
float64
string
time.Time
也可以通過 JSON 來解析結(jié)構(gòu)體。
如果你需要更多的數(shù)據(jù)類型 , 或者你只是想知道 binlog 是如何進(jìn)行解析工作的 , 最好的辦法是自己擴(kuò)展解析類型。
下面是一個int
類型的實(shí)例 :
type User struct {
Id int `gorm:"column:id"`
}
我們可以通過反射來獲取類型名稱。parseTagSetting 方法可以使注釋更便于使用:
element := User{} //In common cases we have interface, but here we will start with model
v := reflect.ValueOf(element)
s := reflect.Indirect(v)
t := s.Type()
num := t.NumField()
parsedTag := parseTagSetting(t.Field(k).Tag)
if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" {
continue
}
for k := 0; k < num; k++ {
name := s.Field(k).Type().Name()
switch name {
case "int":
// here we deal with an incoming row
}
}
獲取了類型名稱的同時也可以通過反射來設(shè)置它的值
func (v Value) SetInt(x int64) {//...
解析注釋幫助器(從 Gorm 庫獲取)
func parseTagSetting(tags reflect.StructTag) map[string]string {
setting := map[string]string{}
for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} {
tags := strings.Split(str, ";")
for _, value := range tags {
v := strings.Split(value, ":")
k := strings.TrimSpace(strings.ToUpper(v[0]))
if len(v) >= 2 {
setting[k] = strings.Join(v[1:], ":")
} else {
setting[k] = k
}
}
}
return setting
}
解析器中有 int64 類型,我們可以創(chuàng)建一個將 int64 轉(zhuǎn)換為 row 類型的方法:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 {
columnId := m.getBinlogIdByName(e, columnName)
if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER {
return 0
}
switch e.Rows[n][columnId].(type) {
case int8:
return int64(e.Rows[n][columnId].(int8))
case int32:
return int64(e.Rows[n][columnId].(int32))
case int64:
return e.Rows[n][columnId].(int64)
case int:
return int64(e.Rows[n][columnId].(int))
case uint8:
return int64(e.Rows[n][columnId].(uint8))
case uint16:
return int64(e.Rows[n][columnId].(uint16))
case uint32:
return int64(e.Rows[n][columnId].(uint32))
case uint64:
return int64(e.Rows[n][columnId].(uint64))
case uint:
return int64(e.Rows[n][columnId].(uint))
}
return 0
}
除了 getBinlogIdByName() 方法,所有東西看起來都是合理的。
需要使用 trivial 幫助器來處理 column 名而不是它的 id,這樣可以:
使用 Gorm 注釋來處理字段名:
在開頭和中間添加字段名時不需要額外修改:
使用字段名處理比 column3 更方便。
最后,我們加入以下處理:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
還有兩個例子
ENUM: 我們將獲取的值作為索引——所以“ active ”狀態(tài)會被設(shè)置為 0。同樣的,我們也需要用 enum 字符串表示,而不是 id,這些可以從字段介紹中獲取。重要提示,值中的 1 描述的是 0 值索引字段,數(shù)組的值是從 0 開始的。
Enum 的解析如下:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string {
columnId := m.getBinlogIdByName(e, columnName)
if e.Table.Columns[columnId].Type == schema.TYPE_ENUM {
values := e.Table.Columns[columnId].EnumValues //fields value
if len(values) == 0 || e.Rows[n][columnId] == nil {
return ""
}
return values[e.Rows[n][columnId].(int64)-1] // first id in result is zero one in values
}
}
存儲 JSON
這難道不是個好主意嗎? JSON 是 MySQL 引擎?zhèn)鹊淖址?#xff0c;我們可以將序列化的數(shù)據(jù)指向解析器。為此,可以添加一個自定義 Gorm 注釋——“ fromJson ”,以下是不同數(shù)據(jù)之間的例子:
type JsonData struct {
Int int `gorm:"column:int"`
StructData TestData `gorm:"column:struct_data;fromJson"`
MapData map[string]string `gorm:"column:map_data;fromJson"`
SliceData []int `gorm:"column:slice_data;fromJson"`
}
type TestData struct {
Test string `json:"test"`
Int int `json:"int"`
}
雖然可以創(chuàng)造很多條件來實(shí)現(xiàn),但是新增字段會損壞它。上 Stack Overflow 尋找答案的結(jié)果可能是,“如何從未知的 JSON 結(jié)構(gòu)體解析 ? ” “ 不知道你為什么需要這樣,但你可以試試 ...”
將結(jié)構(gòu)體轉(zhuǎn)換為接口可以實(shí)現(xiàn):
if _, ok := parsedTag["FROMJSON"]; ok {
newObject := reflect.New(s.Field(k).Type()).Interface()
json := m.stringHelper(e, n, columnName)
jsoniter.Unmarshal([]byte(json), &newObject)
s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type()))
}
總結(jié)
以上是生活随笔為你收集整理的mysql switch binlog_如何使用 Golang 处理 MySQL 的 binlog的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql临时表好处和坏处_mysql临
- 下一篇: 选择查询 mysql_具体的mysql