netty实现mysql协议_基于Netty模拟解析Binlog
前言
最近一段時間一直再看mysql binlog相關的內容,也整理了幾篇相關的文章,對mysql的事件以及通訊協議在理論上有了一個大概的了解,但是缺少實戰;本文的目的就是從實戰出發,了解binlog解析的整個過程。
解析思路
把binlog的解析過程大致分為以下幾個步驟:
1.服務器啟動首先獲取上一次解析成功的位置(實例中存儲在本地文件中);
2.和mysql服務器建立連接;
3.接受mysql發送來的binlog事件;
4.對不同的binlog事件進行解析;
5.將數據進行存儲(實例中僅在日志中打印);
6.存儲成功后,定時記錄Binaly Log位置。
關于binlog相關的配置可以參考系列文章,里面有詳解的介紹,下面對步驟進行詳細的介紹;
1.服務器啟動首先獲取上一次解析成功的位置(實例中存儲在本地文件中)
binlog的位置信息存儲在文件namePosition,有更新也同樣更新到namePosition中,部分代碼如下:
public class NamePositionStore {
private static Logger log = LoggerFactory.getLogger(NamePositionStore.class);
public static final String BINLOG_NAME = "binlogName";
public static final String BINLOG_POSITIION = "binlogPosition";
private static Map binlogMap = new HashMap();
private static String lineSeparator = (String) System.getProperties().get("line.separator");
private static String localStoreUrl = "namePosition";
static {
loadNamePosition();
}
public static synchronized Map loadNamePosition() {
binlogMap = load();
return binlogMap;
}
public static synchronized Map getNamePosition() {
return binlogMap;
}
public static synchronized void putNamePosition(String binlogName, long binlogPosition) {
binlogMap.put(BINLOG_NAME, binlogName);
binlogMap.put(BINLOG_POSITIION, binlogPosition + "");
store(binlogMap);
}
public static synchronized void putNamePosition(long binlogPosition) {
binlogMap.put(BINLOG_POSITIION, binlogPosition + "");
store(binlogMap);
}
...以下代碼省略,可參考碼云完整代碼...
}
namePosition中存儲了兩個字段分別是:binlogName和binlogPosition,這兩個字段會在客戶端請求mysql binlog的時候需要的參數;
2.和mysql服務器建立連接
在文章Mysql通訊協議分析中可以看到和mysql服務器建立連接的步驟:mysql發送握手包,客戶端發送認證包,mysql發送認證的結果;
public class HandshakeHandler extends SimpleChannelInboundHandler {
private Logger logger = LoggerFactory.getLogger(HandshakeHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataPackage pk) throws Exception {
logger.info("Handshake start");
if (null == pk) {
return;
}
ByteBuf msg = (ByteBuf) pk.getContent();
int protocolVersion = msg.readByte();
String serverVersion = ByteUtil.NullTerminatedString(msg);
int threadId = ByteUtil.readInt(msg, 4);
logger.info("protocolVersion = " + protocolVersion + ",serverVersion = " + serverVersion + ",threadId = "
+ threadId);
String randomNumber1 = ByteUtil.NullTerminatedString(msg);
msg.readBytes(2);
byte encode = msg.readByte();
msg.readBytes(2);
msg.readBytes(13);
String randomNumber2 = ByteUtil.NullTerminatedString(msg);
logger.info("Handshake end");
AuthenticateDataBean dataBean = new AuthenticateDataBean(encode, randomNumber1 + randomNumber2,
Constants.userName, Constants.password);
ctx.channel().writeAndFlush(new DataPackage(1, dataBean));
ctx.pipeline().remove(this);
}
}
接受mysql發送的握手包,進行相關的解析工作,其中比較重要的是兩個挑戰隨機數,客戶端在認證的時候需要使用隨機數對密碼加密;解析完之后客戶端發送認證數據包(封裝在AuthenticateDataBean),具體類信息如下:
public class AuthenticateDataBean implements IDataBean {
/** 認證需要的用戶名密碼 **/
private String userName;
private String password;
/** 編碼和挑戰隨機數 **/
private byte encode;
private String randomNumber;
...以下代碼省略,可參考碼云完整代碼...
@Override
public byte[] toByteArray() throws Exception {
int clientPower = PowerType.CLIENT_LONG_FLAG | PowerType.CLIENT_PROTOCOL_41
| PowerType.CLIENT_SECURE_CONNECTION;
byte clientPowerBytes[] = ByteUtil.writeInt(clientPower, 4);
int maxLen = 0;
byte maxLenBytes[] = ByteUtil.writeInt(maxLen, 4);
byte encodeBytes[] = ByteUtil.writeInt(encode, 1);
byte zeroBytes[] = ByteUtil.writeInt(0, 23);
byte[] userNameBytes = (userName + "\0").getBytes();
byte[] passwordBytes = "".equals(password) ? new byte[0]
: ByteUtil.passwordCompatibleWithMySQL411(password, randomNumber);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(clientPowerBytes);
byteBuf.writeBytes(maxLenBytes);
byteBuf.writeBytes(encodeBytes);
byteBuf.writeBytes(zeroBytes);
byteBuf.writeBytes(userNameBytes);
byteBuf.writeByte((byte) passwordBytes.length);
byteBuf.writeBytes(passwordBytes);
return byteBuf.array();
}
}
發送的認證包到服務器之后,客戶端會收到認證的結果,具體處理在AuthenticateResultHandler中:
public class AuthenticateResultHandler extends SimpleChannelInboundHandler {
private Logger logger = LoggerFactory.getLogger(AuthenticateResultHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataPackage dataPackage) throws Exception {
ByteBuf msg = (ByteBuf) dataPackage.getContent();
int mark = msg.readByte();
if (mark == 0) {
Map binlongMap = NamePositionStore.getNamePosition();
RequestBinlogDumpDataBean dataBean = new RequestBinlogDumpDataBean(Constants.serverId,
binlongMap.get(NamePositionStore.BINLOG_NAME),
Long.valueOf(binlongMap.get(NamePositionStore.BINLOG_POSITIION)));
ctx.channel().writeAndFlush(new DataPackage(0, dataBean));
logger.info("Authenticate success:" + ByteUtil.bytesToHexString(msg.array()));
} else {
logger.info("Authenticate fail:" + ByteUtil.bytesToHexString(msg.array()));
}
ctx.pipeline().remove(this);
}
}
如果認證成功,這時候客戶端需要發送請求接受binlog的請求,這里面包含兩個重要的參數就是binlogName和binlogPosition,具體信息在RequestBinlogDumpDataBean類中,結構類似AuthenticateDataBean,此處省略。
3.接受mysql發送來的binlog事件
服務器收到客戶端的binlog請求,這時服務器如果產生了binlog日志,會發送給客戶端,客戶端需要一個接受binlog事件的類:
public class BinlogEventParseHandler extends SimpleChannelInboundHandler {
private Logger logger = LoggerFactory.getLogger(BinlogEventParseHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataPackage datePackage) throws Exception {
ByteBuf contentBuf = (ByteBuf) datePackage.getContent();
contentBuf.skipBytes(1);
EventHeader header = new EventHeader();
header.setTimestamp(ByteUtil.readInt(contentBuf, 4));
header.setTypeCode((byte) ByteUtil.readInt(contentBuf, 1));
header.setServerId(ByteUtil.readInt(contentBuf, 4));
header.setEventLen(ByteUtil.readInt(contentBuf, 4));
header.setNextPosition(ByteUtil.readInt(contentBuf, 4));
header.setFlags(ByteUtil.readInt(contentBuf, 2));
logger.info(header.toString());
IEventParser parser = EventParserFactory.getEventParser(header.getTypeCode());
if (parser == null) {
logger.error("不支持的binlog事件類型解析;typeCode = " + header.getTypeCode());
}
parser.parse(contentBuf, header);
if (header.getTypeCode() != EventType.ROTATE_EVENT
&& header.getTypeCode() != EventType.FORMAT_DESCRIPTION_EVENT) {
NamePositionStore.putNamePosition(header.getNextPosition());
}
}
}
首先解析事件頭包括:eventType,eventLen,nextPosition等信息,然后根據事件類型,調用不同的解析器進行解析;
4.對不同的binlog事件進行解析
步驟3中通過不同的事件類型,獲取對應的解析器,這些解析器都在EventParserFactory中,下面以FormatDescriptionEventParser為例
public class FormatDescriptionEventParser implements IEventParser {
private Logger logger = LoggerFactory.getLogger(FormatDescriptionEventParser.class);
@Override
public void parse(ByteBuf msg, EventHeader eventHeader) {
long binlogVersion = ByteUtil.readInt(msg, 2);
String serverVersion = ByteUtil.readFixedLenString(msg, 50);
long timestamp = ByteUtil.readInt(msg, 4);
byte headerLength = msg.readByte();
StringBuffer eventTypeFixDataLen = new StringBuffer();
for (int i = 0; i < 27; i++) {
eventTypeFixDataLen.append(msg.readByte() + ",");
}
logger.info("binlogVersion = " + binlogVersion + ",serverVersion = " + serverVersion + ",timestamp = "
+ timestamp + ",headerLength = " + headerLength + ",eventTypeStr = " + eventTypeFixDataLen);
}
}
根據FormatDescriptionEvent的格式讀取ByteBuf里面的數據包括:binlog版本,服務器版本,時間戳,事件頭長度以及每個Event的fixed part lengths,本次實戰中僅僅將解析后的數據打印到日志中,沒有做其他處理。
5.將數據進行存儲(實例中僅在日志中打印)
本次使用的binlog模式是:STATEMENT,所有所有的sql語句都會發送給客戶端,對應的事件是QueryEvent,包括創建表,增刪改等操作:
public class QueryEventParser implements IEventParser {
private Logger logger = LoggerFactory.getLogger(QueryEventParser.class);
private static final int QUERY_EVENT_FIX_LEN = 13;
@Override
@SuppressWarnings("unused")
public void parse(ByteBuf msg, EventHeader eventHeader) {
long threadId = ByteUtil.readInt(msg, 4);
long time = ByteUtil.readInt(msg, 4);
int dbNameLen = msg.readByte();
int errorCode = ByteUtil.readInt(msg, 2);
int variableLen = ByteUtil.readInt(msg, 2);
msg.skipBytes(variableLen);
String dbName = ByteUtil.NullTerminatedString(msg);
String sql = ByteUtil.readFixedLenString(msg, (int) (eventHeader.getEventLen() - variableLen
- EventHeader.EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN - dbName.getBytes().length - 1));
logger.info("dbName = " + dbName + ",sql = " + sql);
}
}
以上的QueryEventParser解析執行的更新語句,記錄了數據庫名稱和相關的更新sql語句。
6.存儲成功后,定時記錄Binaly Log位置
在步驟三中的BinlogEventParseHandler類中,我們在解析玩之后,存儲了nextPosition信息到文件中,方便下次啟動讀取,同時binlog還有一個切換binlog文件的事件,同樣也需要記錄;
public class RotateEventParser implements IEventParser {
private Logger logger = LoggerFactory.getLogger(RotateEventParser.class);
@Override
public void parse(ByteBuf msg, EventHeader eventHeader) {
long binlogPosition = ByteUtil.readLong(msg, 8);
int variablePartLen = (int) (eventHeader.getEventLen() - EventHeader.EVENT_HEADER_LEN - 8);
byte variablePart[] = new byte[variablePartLen];
msg.readBytes(variablePart);
String binlogName = new String(variablePart);
logger.info("binlogPosition = " + binlogPosition + ",binlogName = " + binlogName);
NamePositionStore.putNamePosition(binlogName, binlogPosition);
}
}
對應的事件是RotateEvent,因為切換成新的binlongName,所有需要同時記錄binlongName和binlogPosition。
總結
本文旨在讓大家更加了解binlog同步的大致過程,所以本文提供的項目沒有經過大量的測試,僅供大家學習使用;本項目中參考了一些優秀的開源軟件:mysql-binlog-connector-java和MySQL-Binlog
總結
以上是生活随笔為你收集整理的netty实现mysql协议_基于Netty模拟解析Binlog的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java中的方法 net.中的函数_.N
- 下一篇: 不动产登记他项权证(不动产登记证明和不动