MYSQL INNODB ibd文件详解 (2) 提取DDL和DML
上一章学习了一些管理页.... 这一张来看看数据(INDEX_PAGE)页
基础知识
mysql数据和索引是放一起的, 主键索引记录主键值和剩余字段值, 二级索引(普通索引)记录 索引值和主键值.
FIL_PAGE_INDEX
FIL_PAGE_INDEX 结构如下
名字 | 大小 | 描述 |
---|---|---|
FIL_HEADER | 38 | |
PAGE_HEADER | 56 | 数据页信息, 比如有多少字段之类的 |
PAGE_DATA | 用户记录(record) | |
PAGE_DIRECTORY | 目录信息, 相当于用户记录的部分索引. | |
FIL_TRAILER | 8 | |
page_data里存具体的字段,含最大值和最小值
page_directory 每个slot2字节, 指向record的第一位(record按4-8个分一组, 组第一个成员记录组大小(owned))
注: page_directory记录的offset是不包含record及其之前的部分的.
看起来有点怪,但就是这样的....FIL_PAGE_SDI
从上面知道, 要解析字段还需要字段可变字段数量才行, 也就是表字典信息, 8.0的字段信息在ibd文件里面就有, 就是FIL_PAGE_SDI. 所以我们只要解析SDI_PAGE就行.
REC_N_FIELDS_ONE_BYTE_MAX = 0x7F #超过(>
)这个值, 就使用2字节 (就是第 1 bit位 标记是否使用2字节)
我并没有找到SDI_PAGE的结构信息, 不过不要紧, 官网写了SDI使用了压缩存储. 那我们就直接暴力解析获取字典信息吧....
def sdi_page_data(bdata):
_sdi_offset = []
isok = False
for i in range(120,16384):
if isok:
break
for j in range(i,16384):
if isok:
break
try:
_test = zlib.decompress(bdata[i:j])
_sdi_offset.append(i)
i += len(_test)
if len(_sdi_offset) == 2:
isok = True #break 2
break
except:
pass
sdi_info = [json.loads(zlib.decompress(bdata[_sdi_offset[0]:]).decode()), json.loads(zlib.decompress(bdata[_sdi_offset[1]:]).decode())]
return sdi_info
能获取字段信息了, 那就可以拼接处DDL了. 方向我给了代码的.文末(def get_ddl(data))
PYTHON解析出DDL和DML
不多数了, 直接看效果吧.
为了简单, 我只解析的cluster_index的叶子节点. 非叶子节点并没有解析(我又不查询数据...). 二级索引也没有解析(因为cluster index才记录了完整数据的...)
数据类型, 目前只支持 int 和 varchar, 其它类型需要读者自己去解析(比如date有3字节,年月日对应9:4:5 bit)
import innodb_index
filename = '/data/mysql_3314/mysqldata/db1/t20230424_666.ibd'
page_size = 16384
aa = innodb_index.rec_data_cluster(filename) #我默认就print DDL 了
print(len(aa))
for x in aa[:10]:
print(x)
图中数据均为随机生成然后我们去数据库验证下吧
DDL对得上
数据量也对得上
忘记解析字符集了, 不过不要紧....前10行数据也对得上
数据是随机生成的哈来点猛的, 把原数据删掉, 用我们这个导入进去看下 (不要在生产环境试哈)
python>
>
len(aa)
python>
>
with open('/tmp/t20230424_666.sql','w') as f:
python>
>
for x in aa:
python>
>
_size = f.write(x)
shell>
>
mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'checksum table t20230424_666;
'
shell>
>
mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'drop table t20230424_666;
'
shell>
>
mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e "
CREATE Table db1.t20230424_666(
id int not null,
name varchar(50),
addr varchar(100),
email varchar(100)
,PRIMARY KEY(id)) engine=InnoDB comment='';
"
shell>
>
mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 /tmp/t20230424_666.sql
shell>
>
mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'checksum table t20230424_666;
'
为了方便,就使用的checksum总结
1. recorde按照4-8个为一组, 每组的第一个记录该组的大小, 并在slot记录自己(本组第一个)的位置(都是为的快速查询...)
2. mysql默认的int类型第一bit是记录正负的, 解析的时候要注意...
3. 解析sdi也可以使用官方的工具 sdi2ibd
4. 本文给的工具只支持部分数据类型. 解析的时候也没有用并发, 如果数据量大, 可以使用并发
附python源码
innodb_page_type.py 在上一篇文章
innodb_file.py 也在上一篇文章(注,均为大端, 有些地方可能写成小端了)
innodb_index.py 如下
#解析 FIL_PAGE_INDEX
import struct
import innodb_page_type
import innodb_file
import zlib,json
PAGE_SIZE = 16384
FIL_PAGE_DATA = 38
FIL_PAGE_DATA_END = 8
PAGE_NEW_INFIMUM = 99
PAGE_NEW_SUPREMUM = 112
PAGE_DIR_SLOT_MAX_N_OWNED = 8
PAGE_DIR_SLOT_MIN_N_OWNED = 4
REC_N_FIELDS_ONE_BYTE_MAX = 0x7F #超过(>
)这个值, 就使用2字节 (就是第 1 bit位 标记是否使用2字节)
innodb_page_name = {
}
for x in dir(innodb_page_type):
if x[:2] != '__':
innodb_page_name[getattr(innodb_page_type,x)] = x
def fseg_header(bdata):
return struct.unpack('>
LLH',bdata[:10])
def read_var_len(bdata,start):
pass
#没找到sdi的结构, 但是发现它是压缩的, 所以我们使用暴力解压获取sdi数据吧....
#However, SDI data is compressed to reduce the storage footprint https://dev.mysql.com/doc/refman/8.0/en/serialized-dictionary-information.html
#TODO sdi2ibd
def sdi_page_data(bdata):
_sdi_offset = []
isok = False
for i in range(120,16384):
if isok:
break
for j in range(i,16384):
if isok:
break
try:
_test = zlib.decompress(bdata[i:j])
_sdi_offset.append(i)
i += len(_test)
if len(_sdi_offset) == 2:
isok = True #break 2
break
except:
pass
sdi_info = [json.loads(zlib.decompress(bdata[_sdi_offset[0]:]).decode()), json.loads(zlib.decompress(bdata[_sdi_offset[1]:]).decode())]
return sdi_info
#根据sdi信息返回表DDL语句
def get_ddl(data):
ddl = f"CREATE {
data[1]['dd_object_type']}
{
data[1]['dd_object']['schema_ref']}
.{
data[1]['dd_object']['name']}
( \n"
pk_list = []
auto_key = None
col_len = len(data[1]['dd_object']['columns']) - 2
_col_len = 0
coll = []
for col in data[1]['dd_object']['columns']:
if col['name'] in ['DB_TRX_ID','DB_ROLL_PTR']:
continue
if col['is_nullable']:
ddl += f"{
col['name']}
{
col['column_type_utf8']}
"
else:
ddl += f"{
col['name']}
{
col['column_type_utf8']}
not null"
_col_len += 1
coll.append(col['name'])
if col['is_auto_increment']:
ddl += ' auto_increment,\n'
elif _col_len col_len:
ddl += ',\n'
else:
ddl += ' \n'
index_ddl = ''
for i in data[1]['dd_object']['indexes']:
name = "PRIMARY KEY" if i['name'] == 'PRIMARY' else f"index {
i['name']}
"
index_ddl += f",{
name}
("
for x in i['elements']:
if x['length'] 4294967295:
index_ddl += f"{
coll[x['column_opx']]}
,"
index_ddl = index_ddl[:-1] #去掉最后一个,
index_ddl += ")"
ddl += index_ddl
ddl += f") engine={
data[1]['dd_object']['engine']}
comment='{
data[1]['dd_object']['comment']}
';
"
return ddl
def get_rec_data(bdata,columns,index):
rdata = []
page0 = page_index(bdata)
index_ = []
for x in index['elements']:
if x['length'] 4294967295:
index_.append(x['column_opx'])
len_column = len(columns) - 2 #去掉DB_TRX_ID(6) DB_ROLL_PTR(7)
for x in page0.records:
rec_header = rec_extra_header(bdata[x-5:x])
if rec_header.deleted:
#print('deleted')
continue #删除的数据就不要了
tdata = [ None for x in range(len_column) ]
null_bitmask_size = int((len_column+7)/8)
null_bitmask = int.from_bytes(bdata[x-null_bitmask_size-5:x-5],'big')
toffset = x-5-null_bitmask_size
doffset = x
#print(x,toffset,doffset,null_bitmask)
#read_key:
for k in index_: #遍历索引字段
ktype = columns[k]['type']
if ktype in [16,]:#变长...
ksize = bdata[toffset-1:toffset] #懒得考虑超过128的情况了...
if ksize >
REC_N_FIELDS_ONE_BYTE_MAX:
ksize = bdata[toffset-2:toffset]
toffset -= 1
toffset -= 1
#print(toffset)
tdata[k] = bdata[doffset:doffset+ksize].decode()
doffset += ksize
elif ktype in [15,]: #date
tdata[k] = bdata[doffset:doffset+3]
doffset += 3
elif ktype in [4,]: #DATA_BINARY 4字节 第一bit是记录正负
_t = struct.unpack('>
L',bdata[doffset:doffset+4])[0]
tdata[k] = (_t&
((131)-1)) if _t&
(131) else -(_t&
((131)-1))
#tdata[k] = bdata[doffset:doffset+4]
doffset += 4
doffset += 6 + 7
#遍历其它数据,
for k in range(len_column):
if k in index_: #索引已经记录了数据了
continue
if null_bitmask&
(1k): #空字段
continue
ktype = columns[k]['type']
#print(columns[k]['name'],toffset)
if ktype in [16,]:#变长...
ksize = struct.unpack('>
B',bdata[toffset-1:toffset])[0] #懒得考虑超过128的情况了...
if ksize >
REC_N_FIELDS_ONE_BYTE_MAX:
ksize = struct.unpack('>
H',bdata[toffset-2:toffset])[0]
toffset -= 1
toffset -= 1
tdata[k] = bdata[doffset:doffset+ksize].decode()
doffset += ksize
elif ktype in [15,]: #date
tdata[k] = bdata[doffset:doffset+3]
doffset += 3
elif ktype in [4,]: #DATA_BINARY 4字节
_t = struct.unpack('>
L',bdata[doffset:doffset+4])[0]
tdata[k] = (_t&
((131)-1)) if _t&
(131) else -(_t&
((131)-1))
#tdata[k] = bdata[doffset:doffset+4]
doffset += 4
rdata.append(tdata)
#break
return rdata
#数据类型 storage/innobase/include/data0type.h
def rec_data_cluster(filename): #主键索引必须显示主键,
"""
filename: 文件名
"""
f = open(filename,'rb')
f.seek(PAGE_SIZE*3,0)
sdi_info = sdi_page_data(f.read(PAGE_SIZE))
ddl = get_ddl(sdi_info)
print(ddl)
db_table = f'{
sdi_info[1]["dd_object"]["schema_ref"]}
.{
sdi_info[1]["dd_object"]["name"]}
'
index = sdi_info[1]['dd_object']['indexes'][0]
columns = sdi_info[1]['dd_object']['columns']
root_page = int(index['se_private_data'].split('root=')[1].split(';
')[0])
#f.seek(PAGE_SIZE*root_page,0)
#只需要找到第一个叶子节点, 然后解析叶子节点的数据即可. (inode不是就记录了第一个叶子节点么.....)
f.seek(2*PAGE_SIZE,0)
inode = innodb_file.inode(f.read(PAGE_SIZE)[38:PAGE_SIZE-8])
leaf_page = 0
for x in inode.index:
if x['no_leaf'] == root_page:
leaf_page = x['leaf']
break
rdata = [] #[(col1,col2),(col1,col2)]
next_page_number = leaf_page
while True:
if next_page_number == 4294967295:
break
#print('PAGE_NUM:',next_page_number)
f.seek(next_page_number*PAGE_SIZE,0)
page0 = page(f.read(PAGE_SIZE))
next_page_number = page0.FIL_PAGE_NEXT
rdata += get_rec_data(page0.bdata,columns,index)
#break
rdata_sql = []
for x in rdata:
_v = ''
for j in x:
_v += f'{
j}
,' if isinstance(j,int) else f'"{
j}
",'
_v = _v[:-1]
_sql = f'insert into {
db_table}
values({
_v}
);
'
rdata_sql.append(_sql)
return rdata_sql
#storage/innobase/rem/rec.h
REC_INFO_MIN_REC_FLAG = 0x10
REC_INFO_DELETED_FLAG = 0x20
REC_N_OWNED_MASK = 0xF
REC_HEAP_NO_MASK = 0xFFF8
REC_NEXT_MASK = 0xFFFF
#REC_STATUS_ORDINARY 0
#REC_STATUS_NODE_PTR 1
#REC_STATUS_INFIMUM 2
#REC_STATUS_SUPREMUM 3
class rec_extra_header(object):
def __init__(self,bdata):
if len(bdata) != 5:
return False
fb = struct.unpack('>
B',bdata[:1])[0]
self.deleted = True if fb&
REC_INFO_DELETED_FLAG else False #是否被删除
self.min_rec = True if fb&
REC_INFO_MIN_REC_FLAG else False #if and only if the record is the first user record on a non-leaf
self.owned = fb&
REC_N_OWNED_MASK # 大于0表示这个rec是这组的第一个, 就是地址被记录在page_directory里面
self.heap_no = struct.unpack('>
H',bdata[1:3])[0]&
REC_HEAP_NO_MASK #heap number, 0 min, 1 max other:rec
self.record_type = struct.unpack('>
H',bdata[1:3])[0]&
((13)-1) #0:rec 1:no-leaf 2:min 3:max
self.next_record = struct.unpack('>
H',bdata[3:5])[0]
def __str__(self):
return f'deleted:{
self.deleted}
min_rec:{
self.min_rec}
owned:{
self.owned}
heap_no:{
self.heap_no}
record_type:{
self.record_type}
next_record:{
self.next_record}
'
class page(object):
def __init__(self,bdata):
if len(bdata) != PAGE_SIZE:
return None
self.FIL_PAGE_SPACE_OR_CHKSUM, self.FIL_PAGE_OFFSET, self.FIL_PAGE_PREV, self.FIL_PAGE_NEXT, self.FIL_PAGE_LSN, self.FIL_PAGE_TYPE, self.FIL_PAGE_FILE_FLUSH_LSN = struct.unpack('>
4LQHQ',bdata[:34])
self.FIL_PAGE_SPACE_ID = struct.unpack('>
L',bdata[34:38])[0]
self.CHECKSUM, self.FIL_PAGE_LSN = struct.unpack('>
2L',bdata[-8:])
self.bdata = bdata
def fil_header(self):
return f'PAGE_SPACE_ID:{
self.FIL_PAGE_SPACE_ID}
PAGE_TYPE:{
innodb_page_name[self.FIL_PAGE_TYPE]}
PREV:{
self.FIL_PAGE_PREV}
NEXT:{
self.FIL_PAGE_NEXT}
'
def fil_trailer(self):
return f'CHECKSUM:{
self.CHECKSUM}
PAGE_LSN:{
self.FIL_PAGE_LSN}
'
class page_index(page):
def __init__(self,bdata):
super().__init__(bdata)
self.cols = [] #字段类型列表.
#PAGE_HEADER
bdata = self.bdata[FIL_PAGE_DATA:PAGE_SIZE-FIL_PAGE_DATA_END]
self.PAGE_N_DIR_SLOTS, self.PAGE_HEAP_TOP, self.PAGE_N_HEAP, self.PAGE_FREE, self.PAGE_GARBAGE, self.PAGE_LAST_INSERT, self.PAGE_DIRECTION, self.PAGE_N_DIRECTION, self.PAGE_N_RECS, self.PAGE_MAX_TRX_ID, self.PAGE_LEVEL, self.PAGE_INDEX_ID = struct.unpack('>
9HQHQ',bdata[:36])
self.PAGE_BTR_SEG_LEAF = fseg_header(bdata[36:46])
self.PAGE_BTR_SEG_TOP = fseg_header(bdata[46:56])
#PAGE_DIRECTORY (这两字节指向的数据位置, 不包含数据前面的 5-byte header 就是REC_N_NEW_EXTRA_BYTES)
page_directorys = []
for x in range(int(PAGE_SIZE/2)):
tdata = struct.unpack('>
H',self.bdata[-(2+FIL_PAGE_DATA_END+x*2):-(FIL_PAGE_DATA_END+x*2)])[0]
page_directorys.append(tdata)
if tdata == PAGE_NEW_SUPREMUM:
break #slot遍历完成
self.page_directorys = page_directorys
#RECORDS(PAGE_DATA) innodb_default_row_format
offset = self.page_directorys[:1][0] #第一字段, 虚拟的...
records = []
while True:
record_type = self.bdata[offset-3:offset-2]
if record_type == b'\x03' or record_type == b'': #00 普通rec(leaf), 01 no_leaf 02 min_rec 03 max_rec
break
records.append(offset)
offset += struct.unpack('>
H',self.bdata[offset-2:offset])[0]
records.remove(PAGE_NEW_INFIMUM) #去掉第一个页(虚拟的页,你把握不住)
self.records = records
def get_record(self,rec_offset):
"""
根据用户给的偏移量返回对于的数据
"""
pass
def find_data_with_index(index_value):
"""
根据用户给的index值查找数据 找page, 然后通过二分法找rec(利用slot)
"""
pass
def record(self):
return f'RECORDS:{
len(self.records)}
'
def page_header(self):
return f'SLOTS:{
self.PAGE_N_DIR_SLOTS}
PAGE_LEVEL:{
self.PAGE_LEVEL}
INDEX_ID:{
self.PAGE_INDEX_ID}
RECORDS:{
self.PAGE_N_RECS}
PAGE_HEAP_TOP:{
self.PAGE_HEAP_TOP}
PAGE_GARBAGE(deleted):{
self.PAGE_GARBAGE}
PAGE_FREE:{
self.PAGE_FREE}
'
def page_directory(self):
return f'SLOTS:{
len(self.page_directorys)}
MAX:{
self.page_directorys[-1:]}
MIN:{
self.page_directorys[:1]}
'
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: MYSQL INNODB ibd文件详解 (2) 提取DDL和DML
本文地址: https://pptw.com/jishu/290358.html