MYSQL主从连接协议解析, 并使用PYTHON模拟
MYSQL的主从应该是使用得最多的架构, 使用也很简单, 就change master to xxx 然后start 就可以了, 但是你知道原理吗?
写在前面
如果你阅读了我之前写的那个 MYSQL流量镜像, 那么你就可以看到mysql连接过程如下:
就是连接mysql之后, 设置了一些变量, 然后发送了两个特殊的包. 是不是很简单. 下面我们就来详细介绍下
ps: 其实这个流量镜像脚本还可以用来当general log使用(仅部分连接的流量日志) -_-
连接过程
连接上mysql服务器后, 都是request_dump(sql/rpl_slave.cc)函数通知主库我是从库.
主从连接, 分两种情况, 一种是基于gtid(MASTER_AUTO_POSITION = 1)的, 另一种是指定log_filename,log_pos的, 推荐使用第一种, 但第二种更简单. 本文两种都演示.
先设置一些参数, 比如master_heartbeat_period(心跳间隔,单位纳秒),master_binlog_checksum,slave_uuid
同时也会查询一些服务器信息, 比如SERVER_ID等. 但本次环境就不整那么多了.
client ->
>
server : set header_beat等
server ->
>
client : ok
client -->
>
server : COM_REGISTER_SLAVE
server -->
>
client : ok
client ->
>
server : COM_BINLOG_DUMP/COM_BINLOG_DUMP_GTID
server -->
>
client: EVENT(含Heartbeat_event)
下图 省略连接过程,(也不考虑失败情况) 要看账号认证过程, 请看之前的文章: mysql连接协议解析
主从相关包结构
主要就是COM_BINLOG_DUMP,COM_BINLOG_DUMP_GTID,COM_REGISTER_SLAVE
Heartbeat_event 比较简单, 就是个event_header加个binlog_filename 就没了...
COM_BINLOG_DUMP
这个包用于非gtid模式, 告诉服务器, 我是dump线程, 结构如下:
源码在:(sql/rpl_slave.cc)request_dump
名字 | 大小(字节) | 描述 |
---|---|---|
COM_FLAG | 1 | 固定为18 |
binlog-pos | 4 | 要请求的binlog的pos号 |
flags | 2 | 通常固定为0 |
server-id | 4 | 从库(本机)的server_id |
binlog-filename | 要请求的binlog文件名字(无空白字符结尾) |
COM_BINLOG_DUMP_GTID
用于gtid模式下,结构如下:
源码在:(sql/rpl_slave.cc)request_dump
名字 | 大小(字节) | 描述 |
---|---|---|
COM_FLAG | 1 | 固定30 |
binlog_flags | 2 | 固定0 |
server_id | 4 | 从库(本机)的server_id |
BINLOG_NAME_SIZE | 4 | binlog名字大小 |
BINLOG_NAME | 取决于BINLOG_NAME_SIZE | 通常都是填充0 |
POS | 8 | 要请求的binlog的pos号, 通常为4 |
gtid_size | 4 | gtid大小 |
gtid | 取决于gtid_size | gtid |
COM_REGISTER_SLAVE
注册主从的(可选). 多数字段都是0填充的... 不要也罢 -_-
源码在: sql/rpl_slave.cc register_slave_on_master
名字 | 大小(字节) | 描述 |
---|---|---|
COM_FLAG | 1 | 固定21 |
server_id | 4 | 从库(本机)的server_id |
host | 1 | 通常填充0 |
user | 1 | 通常填充0 |
password | 1 | 通常填充0 |
port | 2 | 从库端口 |
pl_recovery_rank | 4 | 固定0 |
master_id | 4 | 固定0(主库填的) |
python模拟
本次模拟就不写relay log了, print出来就行, event也不全解析了, 就解析下header就行
gtid
就是 MASTER_AUTO_POSITION = 1 的情况
import testpymysqlreplication
aa = testpymysqlreplication.repl()
aa.server_id = 123456
aa.auto_position = True
aa.connect()
aa.request_dump()
for x in aa.event():
print(x)
可以使用show processlist 看到我们的连接再来插入条数据瞧瞧
也是没得问题的
本次环境不想解析event_body了event_type为27的就是 心跳包
非gtid
使用和gtid差不多, 只是auto_position设置为False
import testpymysqlreplication
aa = testpymysqlreplication.repl()
aa.server_id = 123456
aa.log_file = 'm3308.001037'
aa.log_pos = 4
aa.auto_position = False
aa.connect()
aa.request_dump()
for x in aa.event():
print(x)
就不测试插入数据了.
总结
1. mysql主从分两种情况, 有gtid和无gtid情况, 分别对应MASTER_AUTO_POSITION = 1和MASTER_AUTO_POSITION = 0
2. 执行完request_dump后, 从库就只需要等主库发送就行了
3. 如果从库长时间未收到主库的心跳包, 就认为主库挂了. 如果24小时业务都频繁的话, 可以不要心跳包(其实30秒一个流量也不多.....)
注:每个command都要重置seq为0
附python源码
本次源码没有解析GTID, 使用的mysql_monitor脚本得到的gtid信息. 如果要解析gtid的话, 可以参考pymysqlreplication的gtid.py 也可以参考官方文档
testpymysql 脚本是之前解析mysql连接的时候的, 也可以使用pymysql的
import testpymysql
import struct
def _read_lenenc(bdata,i):
length = btoint(bdata[i:i+1])
i += 1
data = bdata[i:i+length]
i += length
return data,i
def event_header(bdata):
timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("LBLLLh",bdata[0:19])
return {
"timestamp":timestamp,'event_type':event_type,'server_id':server_id,'event_size':event_size,'log_pos':log_pos,'flags':flags,}
class repl(testpymysql.mysql):
def __init__(self,*args,**kwargs):
super().__init__(**kwargs)
self.server_id = 12345678 #uint32
self.lport = 6666
self.auto_position = False #Ture:gtid
self.master_heartbeat_period = 30*1000000000 #设置心跳, 单位纳秒
self.log_file = 'm3308.001037' #其实可以show master status的, 但是我懒得去查了...
self.log_pos = 4
#懒得去计算gtid了, 直接用我的环境的现成的, 计算方式可以参考: pymysqlreplication 的 gtid.py
#使用: gtid.GtidSet(GTID_STR).encode()
self.bgtid = b'\x06\x00\x00\x00\x00\x00\x00\x00N\x08\x1c\xcb\xb0\xc9\x11\xed\xbf\x10\x00\x0c)\x938\xcc\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00me\x0f\x1f\xbaN\x11\xed\x99\xab\x00\x0c)\x80\xc1\x1e\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00Eg\x02\x00\x00\x00\x00\x00z\xb0f\xef\xc1\xbe\x11\xec\x92\xdd\x00\x0c)\x80\xc1\x1e\x03\x00\x00\x00\x00\x00\x00\x00\x13\n\x00\x00\x00\x00\x00\x00\x19\n\x00\x00\x00\x00\x00\x00\x8c\n\x00\x00\x00\x00\x00\x00\x8d\n\x00\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\x90\xbd\xfb\xb7\xcb\xe2\x11\xec\xa8p\x00\x0c)\x80\xc1\x12\x01\x00\x00\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\x90\xbd\xfb\xb7\xcb\xe2\x11\xec\xa8p\x00\x0c)\x80\xc1\x1e\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xa9\xbb\xd7\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xf7R\x04\x00\x00\x00\x00\x00'
#sql/rpl_slave.cc register_slave_on_master
def register_slave(self):
"""
COM_FLAG: 1 (COM_REGISTER_SLAVE:21)
server_id: 4
host,user,password 0
port: 2
rpl_recovery_rank:4 (0)
master_id:4 (0)
"""
bdata = struct.pack('BLBBBHLL',21,self.server_id,0,0,0,self.lport,0,0)
self._next_seq_id = 0 #每个com都清零
self.write_pack(bdata)
rpack = self.read_pack()
return True if rpack[0:1] == b'\x00' else False
#sql/rpl_slave.cc request_dump
def request_dump(self):
self.query(f'SET @master_heartbeat_period = {
self.master_heartbeat_period}
')
#master_binlog_checksum, slave_uuid 算了
bdata = b''
if self.auto_position:
"""
COM_FLAG: 1 (COM_BINLOG_DUMP_GTID:30)
binlog_flags: 2
server_id: 4
BINLOG_NAME_INFO_SIZE: 4
BINLOG_NAME:
POS: 8
gtid_size: 4
gtid:
"""
#regiest first
bdata = struct.pack('BHLLLQL',30,0,self.server_id,4,0,self.log_pos,len(self.bgtid)) + self.bgtid
self.register_slave()
else:
"""
COM_FLAG: 1 (COM_BINLOG_DUMP:18)
binlog-pos: 4
flags: 2
server_id: 4
binlog-filename:
"""
self.register_slave()
bdata = struct.pack('BLHL',18,self.log_pos,0,self.server_id) + self.log_file.encode()
self._next_seq_id = 0
self.write_pack(bdata)
def event(self):
while True:
bdata = self.read_pack()
if bdata[0:1] == b'\x00' and len(bdata) >
20:
yield event_header(bdata[1:20])
def query(self,sql):
self._next_seq_id = 0
bdata = struct.pack('B',3) + sql.encode()
self.write_pack(bdata)
#return self.result()
def close(self):
#self._next_seq_id = 0
self.write_pack(struct.pack('B',1))
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: MYSQL主从连接协议解析, 并使用PYTHON模拟
本文地址: https://pptw.com/jishu/862.html