自制MYSQL旁挂审计
背景需求
生产环境中, 难免有执行有问题的SQL, 这个时候就需要有人背锅找出原因. 找到相关时间点的二进制日志, 然后解析就可以找到出问题的SQL, 但 那是谁执行的呢? 谁都不承认. 这时候就需要审计了.
审计分类
嵌入式, 就是在mysql端做记录, 也就是审计插件. 优点:无需更改链路. 缺点:影响mysql性能
转发型, 就是转发相关包, 并记录下来, 比如我们之前写的mysql流量镜像脚本. 优点:可以只审计固定的流量(需要转发的) 缺点:延迟会增加一丢丢
旁挂型, 就是在链路旁边监控流量并做记录. 优点:不影响链路和mysql性能. 缺点:只能做记录, 无法控制连接.
本文就是实现旁挂型审计
实现原理
获取mysql包 然后 解析mysql包
获取mysql包, 我们使用scapy的sniff抓包, 解析mysql包, 之前就已经做过了.
网卡或者路由器把客户端的流量转发一份给审计即可我们只获取mysql客户端发给服务端的包, 所以只需要目标端口为mysql的端口即可. 如果是部署在非本机上, 还可以设置个目标主机过滤(可选)
sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)
#filter 过滤规则, 和tcpdump一样的语法
#iface 网卡名字(目标IP所要经过的网卡)
#prn 处理函数
对于mysql包处理, 由于ssl加密太复杂(要私钥,还有随机数...), 所以我们只解析非ssl的流量(应用服务器到数据库服务器很少有使用SSL的, 毕竟多数都是内网环境). 主要解析3种包. 连接包, COM_QUERY包(sql), Quit包(断开连接)if bdata[:1] == b'\x03':
sql = bdata[1:].decode()
_psql = sqlparse.parse(sql)[0] #没有做更深层次的解析了, 毕竟不需要
msg += f"SQL({
_psql.get_type()}
) {
_psql.value}
"
elif lbdata >
32 and len(set(bdata[9:32])) == 1:
username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
client_flag = btoint(bdata[:4])
msg += f"CONNECT username:{
username}
client_flag:{
client_flag}
"
elif bdata[:1] == b'\x01':
msg += f'QUIT '
演示
没有写接口出来, 直接修改源码(改下目标端口和网卡名字), 然后执行即可
python mysql_audit.py
客户端连接测试
mysql -h192.168.101.21 -P3308 -p123456 --skip-ssl #暂不支持SSL的审计
客户端退出测试
mysql退出的时候是发送的 0x01到server
mysql>
exit
DDL/DML压测
先清空表吧, 压测工具可以参考ddcw_tool.benchmark_mysql
DDL是能记录的那个ping包是 workbench 在发
测试DML
虽然是模拟的数据, 但还是习惯性的打个码...总结
不影响性能的清空建议都上个审计.
本文使用的单进程做的, 可能存在性能问题(可以去掉SQL解析, 直接打印SQL文本).
本文的脚本不支持解析SSL.(如果是SSL包就直接跳过了).
旁挂型审计对业务和数据库影响最小(可以说没得影响), 但是无法控制连接,也无法解析ssl, 所以使用哪种审计得结合你的实际需求来.
附源代码
本工具直接将结果print的, 你也可以将结果保存在文件里(修改下printf函数即可)
from scapy.all import sniff
import sqlparse
import datetime
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
def printf(msg):
print(msg)
def save_pack(pack):
if hasattr(pack,'load') and len(pack.load) >
= 5:
bdata = pack.load
msg = f"{
str(datetime.datetime.now())}
{
pack['IP'].src}
:{
pack['TCP'].sport}
"
#msg = f"{
str(datetime.datetime.now())}
{
pack['IP'].src}
:{
pack['TCP'].sport}
{
pack['IP'].dst}
:{
pack['TCP'].dport}
"
if len(bdata) == btoint(bdata[:3])+4: #不支持SSL
bdata = bdata[4:]
else:
return None
lbdata = len(bdata)
PPACK = True
if bdata[:1] == b'\x03':
sql = bdata[1:].decode()
_psql = sqlparse.parse(sql)[0]
msg += f"SQL({
_psql.get_type()}
) {
_psql.value}
"
elif bdata[:1] == b'\x16':
msg += f"STMT_PREPARE {
bdata[1:].decode()}
"
elif bdata[:1] == b'\x17':
msg += f'STMT_EXECUTE {
btoint(bdata[1:5])}
' #懒得去解析具体的值了
elif bdata[:1] == b'\x0E':
msg += f'PING '
elif bdata[:1] == b'\x01':
msg += f'QUIT '
elif lbdata >
32 and len(set(bdata[9:32])) == 1:
username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
client_flag = btoint(bdata[:4])
msg += f"CONNECT username:{
username}
client_flag:{
client_flag}
"
else:
PPACK = False
if PPACK:
printf(msg)
sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: 自制MYSQL旁挂审计
本文地址: https://pptw.com/jishu/3408.html