首页数据库MYSQL主从连接协议解析, 并使用PYTHON模拟

MYSQL主从连接协议解析, 并使用PYTHON模拟

时间2023-04-03 15:06:54发布访客分类数据库浏览1570
导读:MYSQL的主从应该是使用得最多的架构, 使用也很简单, 就change master to xxx 然后start 就可以了, 但是你知道原理吗?写在前面如果你阅读了我之前写的那个 MYSQL流量镜像, 那么你就可以看到mysql连接过...

MYSQL的主从应该是使用得最多的架构, 使用也很简单, 就change master to xxx 然后start 就可以了, 但是你知道原理吗?

写在前面

如果你阅读了我之前写的那个 MYSQL流量镜像, 那么你就可以看到mysql连接过程如下:

就是连接mysql之后, 设置了一些变量, 然后发送了两个特殊的包. 是不是很简单. 下面我们就来详细介绍下

ps: 其实这个流量镜像脚本还可以用来当general log使用(仅部分连接的流量日志) -_-

连接过程

连接上mysql服务器后, 都是request_dump(sql/rpl_slave.cc)函数通知主库我是从库.

主从连接, 分两种情况, 一种是基于gtid(MASTER_AUTO_POSITION = 1)的, 另一种是指定log_filename,log_pos的, 推荐使用第一种, 但第二种更简单. 本文两种都演示.

先设置一些参数, 比如master_heartbeat_period(心跳间隔,单位纳秒),master_binlog_checksum,slave_uuid

同时也会查询一些服务器信息, 比如SERVER_ID等. 但本次环境就不整那么多了.

client ->
    >
      server : set header_beat等
server ->
    >
      client : ok
client -->
    >
      server : COM_REGISTER_SLAVE
server -->
    >
      client : ok
client ->
    >
      server : COM_BINLOG_DUMP/COM_BINLOG_DUMP_GTID
server -->
    >
 client: EVENT(含Heartbeat_event)

下图 省略连接过程,(也不考虑失败情况) 要看账号认证过程, 请看之前的文章: mysql连接协议解析

主从相关包结构

主要就是COM_BINLOG_DUMP,COM_BINLOG_DUMP_GTID,COM_REGISTER_SLAVE

Heartbeat_event 比较简单, 就是个event_header加个binlog_filename 就没了...

COM_BINLOG_DUMP

这个包用于非gtid模式, 告诉服务器, 我是dump线程, 结构如下:

源码在:(sql/rpl_slave.cc)request_dump

名字

大小(字节)

描述

COM_FLAG

1

固定为18

binlog-pos

4

要请求的binlog的pos号

flags

2

通常固定为0

server-id

4

从库(本机)的server_id

binlog-filename

要请求的binlog文件名字(空白字符结尾)

COM_BINLOG_DUMP_GTID

用于gtid模式下,结构如下:

源码在:(sql/rpl_slave.cc)request_dump

名字

大小(字节)

描述

COM_FLAG

1

固定30

binlog_flags

2

固定0

server_id

4

从库(本机)的server_id

BINLOG_NAME_SIZE

4

binlog名字大小

BINLOG_NAME

取决于BINLOG_NAME_SIZE

通常都是填充0

POS

8

要请求的binlog的pos号, 通常为4

gtid_size

4

gtid大小

gtid

取决于gtid_size

gtid

COM_REGISTER_SLAVE

注册主从的(可选). 多数字段都是0填充的... 不要也罢 -_-

源码在: sql/rpl_slave.cc register_slave_on_master

名字

大小(字节)

描述

COM_FLAG

1

固定21

server_id

4

从库(本机)的server_id

host

1

通常填充0

user

1

通常填充0

password

1

通常填充0

port

2

从库端口

pl_recovery_rank

4

固定0

master_id

4

固定0(主库填的)

python模拟

本次模拟就不写relay log了, print出来就行, event也不全解析了, 就解析下header就行

gtid

就是 MASTER_AUTO_POSITION = 1 的情况

import testpymysqlreplication
aa = testpymysqlreplication.repl()
aa.server_id = 123456
aa.auto_position = True
aa.connect()
aa.request_dump()
for x in aa.event():
	print(x)
可以使用show processlist 看到我们的连接

再来插入条数据瞧瞧

也是没得问题的

本次环境不想解析event_body了

event_type为27的就是 心跳包

非gtid

使用和gtid差不多, 只是auto_position设置为False

import testpymysqlreplication
aa = testpymysqlreplication.repl()
aa.server_id = 123456
aa.log_file = 'm3308.001037'
aa.log_pos = 4
aa.auto_position = False
aa.connect()
aa.request_dump()
for x in aa.event():
	print(x)

就不测试插入数据了.

总结

1. mysql主从分两种情况, 有gtid和无gtid情况, 分别对应MASTER_AUTO_POSITION = 1和MASTER_AUTO_POSITION = 0

2. 执行完request_dump后, 从库就只需要等主库发送就行了

3. 如果从库长时间未收到主库的心跳包, 就认为主库挂了. 如果24小时业务都频繁的话, 可以不要心跳包(其实30秒一个流量也不多.....)

注:每个command都要重置seq为0

附python源码

本次源码没有解析GTID, 使用的mysql_monitor脚本得到的gtid信息. 如果要解析gtid的话, 可以参考pymysqlreplication的gtid.py 也可以参考官方文档

testpymysql 脚本是之前解析mysql连接的时候的, 也可以使用pymysql的

import testpymysql
import struct

def _read_lenenc(bdata,i): 
	length = btoint(bdata[i:i+1])
	i += 1
	data = bdata[i:i+length]
	i += length
	return data,i

def event_header(bdata):
	timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("LBLLLh",bdata[0:19])
	return {
"timestamp":timestamp,'event_type':event_type,'server_id':server_id,'event_size':event_size,'log_pos':log_pos,'flags':flags,}


class repl(testpymysql.mysql):
	def __init__(self,*args,**kwargs):
		super().__init__(**kwargs)
		self.server_id = 12345678 #uint32
		self.lport = 6666
		self.auto_position = False #Ture:gtid
		self.master_heartbeat_period = 30*1000000000 #设置心跳, 单位纳秒
		self.log_file = 'm3308.001037' #其实可以show master status的, 但是我懒得去查了...
		self.log_pos = 4
		#懒得去计算gtid了, 直接用我的环境的现成的,  计算方式可以参考: pymysqlreplication 的 gtid.py
		#使用: gtid.GtidSet(GTID_STR).encode()
		self.bgtid = b'\x06\x00\x00\x00\x00\x00\x00\x00N\x08\x1c\xcb\xb0\xc9\x11\xed\xbf\x10\x00\x0c)\x938\xcc\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00me\x0f\x1f\xbaN\x11\xed\x99\xab\x00\x0c)\x80\xc1\x1e\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00Eg\x02\x00\x00\x00\x00\x00z\xb0f\xef\xc1\xbe\x11\xec\x92\xdd\x00\x0c)\x80\xc1\x1e\x03\x00\x00\x00\x00\x00\x00\x00\x13\n\x00\x00\x00\x00\x00\x00\x19\n\x00\x00\x00\x00\x00\x00\x8c\n\x00\x00\x00\x00\x00\x00\x8d\n\x00\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\x90\xbd\xfb\xb7\xcb\xe2\x11\xec\xa8p\x00\x0c)\x80\xc1\x12\x01\x00\x00\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\x90\xbd\xfb\xb7\xcb\xe2\x11\xec\xa8p\x00\x0c)\x80\xc1\x1e\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xa9\xbb\xd7\x00\x00\x00\x00\x00\xae1\x80\x01\x00\x00\x00\x00\xaf1\x80\x01\x00\x00\x00\x00\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xf7R\x04\x00\x00\x00\x00\x00'

	#sql/rpl_slave.cc  register_slave_on_master
	def register_slave(self):
		"""
		COM_FLAG: 1 (COM_REGISTER_SLAVE:21)
		server_id: 4
		host,user,password 0
		port: 2
		rpl_recovery_rank:4 (0)
		master_id:4 (0)
		"""
		bdata = struct.pack('BLBBBHLL',21,self.server_id,0,0,0,self.lport,0,0)
		self._next_seq_id = 0 #每个com都清零
		self.write_pack(bdata)
		rpack = self.read_pack()
		return True if rpack[0:1] == b'\x00' else False

	#sql/rpl_slave.cc request_dump
	def request_dump(self):
		self.query(f'SET @master_heartbeat_period = {
self.master_heartbeat_period}
    ')
		#master_binlog_checksum, slave_uuid 算了
		bdata = b''
		if self.auto_position:
			"""
			COM_FLAG: 1 (COM_BINLOG_DUMP_GTID:30)
			binlog_flags: 2
			server_id: 4
			BINLOG_NAME_INFO_SIZE: 4
			BINLOG_NAME:
			POS: 8
			gtid_size: 4
			gtid:
			"""
			#regiest first
			bdata = struct.pack('BHLLLQL',30,0,self.server_id,4,0,self.log_pos,len(self.bgtid)) + self.bgtid
			self.register_slave()
		else:
			"""
			COM_FLAG: 1 (COM_BINLOG_DUMP:18)
			binlog-pos: 4
			flags: 2
			server_id: 4
			binlog-filename:
			"""
			self.register_slave()
			bdata = struct.pack('BLHL',18,self.log_pos,0,self.server_id) + self.log_file.encode()

		self._next_seq_id = 0
		self.write_pack(bdata)

	def event(self):
		while True:
			bdata = self.read_pack()
			if bdata[0:1] == b'\x00' and len(bdata) >
     20:
				yield event_header(bdata[1:20])

	def query(self,sql):
		self._next_seq_id = 0
		bdata = struct.pack('B',3) + sql.encode()
		self.write_pack(bdata)
		#return self.result()

	def close(self):
		#self._next_seq_id = 0
		self.write_pack(struct.pack('B',1))

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!

云数据库 MySQLpython主从MYSQLMYSQL主从连接协议

若转载请注明出处: MYSQL主从连接协议解析, 并使用PYTHON模拟
本文地址: https://pptw.com/jishu/862.html
【HTML】HTML 注册表单案例 ① ( 表格设置 | 设置表格位置和大小 | 设置表格标题 | 表单设置 | 表格中设置单选按钮 ) 【ES三周年】linux-centos7安装elasticsearch-head插件

游客 回复需填写必要信息