首页数据库自制MYSQL旁挂审计

自制MYSQL旁挂审计

时间2023-04-16 22:18:02发布访客分类数据库浏览1153
导读:背景需求生产环境中, 难免有执行有问题的SQL, 这个时候就需要有人背锅找出原因. 找到相关时间点的二进制日志, 然后解析就可以找到出问题的SQL, 但 那是谁执行的呢? 谁都不承认. 这时候就需要审计了.审计分类嵌入式, 就是在my...

背景需求

生产环境中, 难免有执行有问题的SQL, 这个时候就需要有人背锅找出原因. 找到相关时间点的二进制日志, 然后解析就可以找到出问题的SQL, 但 那是谁执行的呢? 谁都不承认. 这时候就需要审计了.

审计分类

嵌入式, 就是在mysql端做记录, 也就是审计插件. 优点:无需更改链路. 缺点:影响mysql性能

转发型, 就是转发相关包, 并记录下来, 比如我们之前写的mysql流量镜像脚本. 优点:可以只审计固定的流量(需要转发的) 缺点:延迟会增加一丢丢

旁挂型, 就是在链路旁边监控流量并做记录. 优点:不影响链路和mysql性能. 缺点:只能做记录, 无法控制连接.

本文就是实现旁挂型审计

实现原理

获取mysql包 然后 解析mysql包

获取mysql包, 我们使用scapy的sniff抓包, 解析mysql包, 之前就已经做过了.

网卡或者路由器把客户端的流量转发一份给审计即可

我们只获取mysql客户端发给服务端的包, 所以只需要目标端口为mysql的端口即可. 如果是部署在非本机上, 还可以设置个目标主机过滤(可选)

sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)
#filter 过滤规则, 和tcpdump一样的语法
#iface 网卡名字(目标IP所要经过的网卡)
#prn 处理函数

对于mysql包处理, 由于ssl加密太复杂(要私钥,还有随机数...), 所以我们只解析非ssl的流量(应用服务器到数据库服务器很少有使用SSL的, 毕竟多数都是内网环境). 主要解析3种包. 连接包, COM_QUERY包(sql), Quit包(断开连接)
if bdata[:1] == b'\x03':
    sql = bdata[1:].decode()
    _psql = sqlparse.parse(sql)[0] #没有做更深层次的解析了, 毕竟不需要
    msg += f"SQL({
_psql.get_type()}
) {
_psql.value}
    "

elif lbdata >
 32 and len(set(bdata[9:32])) == 1:
    username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
    client_flag = btoint(bdata[:4])
    msg += f"CONNECT username:{
username}
 client_flag:{
client_flag}
    "
    
elif bdata[:1] == b'\x01':
    msg += f'QUIT '

演示

没有写接口出来, 直接修改源码(改下目标端口和网卡名字), 然后执行即可

python mysql_audit.py

客户端连接测试

mysql -h192.168.101.21 -P3308 -p123456  --skip-ssl #暂不支持SSL的审计

客户端退出测试

mysql退出的时候是发送的 0x01到server

mysql>
     exit

DDL/DML压测

先清空表吧, 压测工具可以参考ddcw_tool.benchmark_mysql

DDL是能记录的

那个ping包是 workbench 在发

测试DML

虽然是模拟的数据, 但还是习惯性的打个码...

总结

不影响性能的清空建议都上个审计.

本文使用的单进程做的, 可能存在性能问题(可以去掉SQL解析, 直接打印SQL文本).

本文的脚本不支持解析SSL.(如果是SSL包就直接跳过了).

旁挂型审计对业务和数据库影响最小(可以说没得影响), 但是无法控制连接,也无法解析ssl, 所以使用哪种审计得结合你的实际需求来.

附源代码

本工具直接将结果print的, 你也可以将结果保存在文件里(修改下printf函数即可)

from scapy.all import sniff
import sqlparse
import datetime

def btoint(bdata,t='little'):
	return int.from_bytes(bdata,t)

def printf(msg):
	print(msg)

def save_pack(pack):
	if hasattr(pack,'load') and len(pack.load) >
= 5:
		bdata = pack.load
		msg = f"{
str(datetime.datetime.now())}
 {
pack['IP'].src}
:{
pack['TCP'].sport}
 "
		#msg = f"{
str(datetime.datetime.now())}
 {
pack['IP'].src}
:{
pack['TCP'].sport}
 {
pack['IP'].dst}
:{
pack['TCP'].dport}
 "
		if len(bdata) == btoint(bdata[:3])+4: #不支持SSL
			bdata = bdata[4:]
		else:
			return None
		lbdata = len(bdata)


		PPACK = True
		if bdata[:1] == b'\x03':
			sql = bdata[1:].decode()
			_psql = sqlparse.parse(sql)[0]
			msg += f"SQL({
_psql.get_type()}
) {
_psql.value}
"
		elif bdata[:1] == b'\x16':
			msg += f"STMT_PREPARE {
bdata[1:].decode()}
"
		elif bdata[:1] == b'\x17':
			msg += f'STMT_EXECUTE {
btoint(bdata[1:5])}
    ' #懒得去解析具体的值了
		elif bdata[:1] == b'\x0E':
			msg += f'PING '
		elif bdata[:1] == b'\x01':
			msg += f'QUIT '
		elif lbdata >
 32 and len(set(bdata[9:32])) == 1:
			username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
			client_flag = btoint(bdata[:4])
			msg += f"CONNECT username:{
username}
 client_flag:{
client_flag}
    "
		else:
			PPACK = False
		if PPACK:
			printf(msg)
			

sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!

云数据库 MySQLpythonMYSQLPYTHON审计

若转载请注明出处: 自制MYSQL旁挂审计
本文地址: https://pptw.com/jishu/3408.html
【译文】Go 中的适配器模式 俄罗斯著名商业CMS DataLife Engine v16.0

游客 回复需填写必要信息