Featured image of post 为MySQL社区版实现审计功能:从插件配置到日志监控全解析

为MySQL社区版实现审计功能:从插件配置到日志监控全解析

本文探讨了MySQL社区版缺乏原生审计功能的问题,并提出使用MariaDB的server_audit插件作为解决方案。介绍了版本兼容性关键点(MySQL 5.7.34及以下版本适用),详细说明了插件安装步骤和配置参数优化建议,包括日志路径设置、文件轮转策略等。文章还展示了审计日志的格式样例,为需要合规审计的企业提供了可行方案。最后强调在生产环境部署前必须进行完整测试,确保插件兼容性和稳定性。

背景

公司要把重要的数据库的操作记录留存,方便查询操作可视化和告警监控等待。更能和其余相同的数据库保持一致。

本文将详细介绍如何通过MariaDB的server_audit插件为MySQL社区版实现完整的审计功能,包括版本兼容性处理、插件配置、日志解析和存储方案。

为什么MySQL社区版需要审计功能?

在公司数据库管理中,审计日志是安全合规和故障排查的重要工具。然而,许多使用MySQL社区版的企业面临一个尴尬的现实:原生的审计功能仅在企业版中提供。根据MySQL官方文档,社区版从5.7.34版本后不再支持第三方审计插件,这给需要合规审计的企业带来了挑战。

技术选型与版本兼容性分析

方案 优点 缺点 适用场景
MySQL企业版审计插件 官方支持,功能完善 需要付费许可 预算充足的企业
MariaDB server_audit插件 免费开源,功能完整 版本兼容性复杂 MySQL 5.7.34及以下版本
Percona审计插件 完全兼容MySQL 需要迁移到Percona Server 新建项目或可迁移环境
触发器+通用日志 无需额外组件 性能影响大,信息不完整 简单审计需求

在企业数据管理中,数据库操作审计是安全合规的核心需求。但MySQL社区版存在明显短板:

  1. 缺乏原生审计功能(仅企业版支持audit_log插件)
  2. 版本兼容陷阱
    • MySQL 5.7.34:最后一个可原生使用MariaDB插件的版本
    • MySQL ≥5.7.34 无法使用MariaDB的server_audit.so插件
    • MySQL 8.0 完全移除插件兼容性
  3. 替代方案选择
    • 方案1:MySQL 5.7低版本 + MariaDB插件(需版本匹配)
    • 方案2:迁移至Percona Server(免费开源,兼容MySQL 5.6/5.7/8.0且支持审计)
    • 方案3:全面迁移到MariaDB

实践建议:在测试环境中验证插件兼容性,生产环境部署前进行完整的回归测试。

环境

环境如下:

MySQL

1
2
[root@test_db01 ~]# mysql -V
mysql  Ver 14.14 Distrib 5.7.27, for Linux (x86_64) using  EditLine wrapper

Cnetos7

1
2
3
4
[root@test_db01 ~]# cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)

Linux test_db01 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

安装插件

查看插件安装目录

1
2
3
4
5
6
7
8

mysql>  show global variables like 'plugin_dir';
+---------------+--------------------------+
| Variable_name | Value                    |
+---------------+--------------------------+
| plugin_dir    | /usr/lib64/mysql/plugin/ |
+---------------+--------------------------+
1 row in set (0.00 sec)

提取mariadb审计插件并放置插件目录

此处需要注意版本,直接适配核对测试

1
2
3
4
wget  https://downloads.mariadb.com/MariaDB/mariadb-10.5.16/bintar-linux-x86_64/mariadb-10.5.16-linux-x86_64.tar.gz
tar -zxvf   mariadb-10.5.3-linux-x86_64.tar.gz
cp ./mariadb-10.5.3-linux-x86_64/lib/plugin/server_audit.so /usr/lib64/mysql/plugin/
chmod +x /usr/lib64/mysql/plugin/

mysql安装server_audit.so插件

1
2
3
4
mysql>  install plugin server_audit soname 'server_audit.so';
Query OK, 0 rows affected (0.02 sec)           
##也可以在my.cnf 加载插件方式安装
在my.cnf 设置 plugin_load = server_audit=server_audit.so  

查看当前MySQL插件情况

1
2
mysql>  show plugins;
| SERVER_AUDIT                             | ACTIVE   | AUDIT              | server_audit.so       | GPL     |

增加审计目录授权

1
2
mkdir /opt/mysqldata/auditlogs
chown -R mysql:mysql /opt/mysqldata/auditlogs

开启审计,写入配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#防止server_audit 插件被卸载 进行配置文件配置
server_audit=FORCE_PLUS_PERMANENT
#指定哪些操作被记录到日志文件中
server_audit_events='CONNECT,QUERY,TABLE,QUERY_DDL'
#开启审计功能
server_audit_logging=on
#默认存放路径,可以不写,默认到data文件下
server_audit_file_path=/opt/mysqldata/auditlogs
#设置文件大小 默认1000000,1073741824=1GB
server_audit_file_rotate_size=1073741824
#指定日志文件的数量,如果为0日志将从不轮转
server_audit_file_rotations=0

下面是我的配置文件
版本太低,不支持JSON,高版本支持JSON格式,方便Python处理,后续只能re来处理
```bash
#######server audit#####
server_audit_logging=ON
server_audit_file_path=/www/server/data/server_audit.log
server_audit=FORCE_PLUS_PERMANENT
server_audit_file_rotate_size=500M
server_audit_file_rotations=15
max_allowed_packet=32M
#audit_log_format=JSON
#audit_log_file=server_audit.json
#server_audit_logging=ON
#server_audit_file_path=/data/mysql/server_audit.json
#server_audit=FORCE_PLUS_PERMANENT
#server_audit_file_rotate_size=1G
#server_audit_file_rotations=10
#max_allowed_packet=32M
  • FORCE_PLUS_PERMANENT:防止插件被意外卸载
  • 日志轮转设置:避免日志文件无限增长
  • max_allowed_packet:确保大SQL语句能被完整记录

重启mysql生效!

1
2
3
/etc/init.d/mysql restart
Shutting down MySQL............... SUCCESS! 
Starting MySQL.. SUCCESS! 

检查日志状态

1
2
3
4
5
-- 检查插件状态 
SHOW PLUGINS WHERE NAME = 'server_audit';
 
-- 查看审计设置 
SHOW GLOBAL VARIABLES LIKE 'server_audit%';

日志格式

1
2
3
4
5
6
7
20250610 10:42:49,test_db01,root,192.168.102.207,7972452,438352983,QUERY,xxxx,'SELECT  id,ext_id,ext_table_name,target_type,value1,value2,spread_config  FROM t_target \n \n WHERE (ext_id = \'12606411819336451111124\' AND ext_table_name = \'t_discount_coupon\')',0
20250610 10:42:50,test_db01,root,192.168.83.18,7972073,438352984,QUERY,gazelle_model_assemble,'SELECT  id,code,name,handler,status,retry_times,create_time,update_time  FROM batch      WHERE  (status = \'running\')',0
20250610 10:42:50,test_db01,root,192.168.102.207,7969972,438352986,QUERY,xxxx,'select * from t_event_send where sent = 0 order by created_time asc',0
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
20250610 10:42:50,test_db01,root,192.168.83.27,7971636,438352987,QUERY,hixxxx_core,'SELECT id, user_id, trade_no, out_trade_no, order_sn, lease_id, period_no, amount, trade_status, alipay_close_status, trade_type, remark, trade_way, business_type, entry_param, coupon_id, coupon_amount, deleted, gmt_create, gmt_modified FROM hire_trade_flows WHERE deleted = 0 AND (trade_status = 3 AND trade_type = 1 AND trade_way IN (1, 3, 4) AND business_type IN (3, 5, 2, 1, 14, 4, 6, 10, 13))',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352988,QUERY,historical_customer_data,'SELECT 1',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352989,QUERY,historical_customer_data,'SELECT 1',0

解释这条日志格式

1
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
字段编号 字段值 含义
1 20250610 10:42:50 时间戳,表示审计事件发生的时间
2 test_db01 服务器主机名(或 MariaDB 实例名称)
3 root 数据库用户名,执行该语句的用户
4 192.168.83.36 客户端 IP 地址
5 7853786 线程 ID(MariaDB 内部线程号)
6 438352985 查询 ID,用于区分不同的 SQL 查询
7 QUERY 事件类型QUERY 表示 SQL 查询)
8 hxxxdb 当前数据库名USE 的数据库)
9 'SELECT\n MAX(IF(... SQL 语句内容,使用单引号包裹,\n 为换行符
10 0 返回值 / 错误码0 表示成功(无错误)

审计日志处理系统实现

系统架构设计

1
2
3
4
5
[MySQL Server] 
(生成审计日志)
[server_audit.log]  
(Python监控进程)
[日志解析模块][MySQL审计数据库]

Python处理程序核心逻辑

1
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0

我们有上面的日志格式,可以进行如下操作处理

日志监控关键特性:

  1. 断点续传:通过state文件记录读取位置
  2. 文件轮转检测:通过inode和dev识别日志切换
  3. 异常处理:完善的错误捕获和日志记录

在这里插入图片描述

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def process_log_entry(new_content):
    """处理单个审计日志条目"""
    # 定义正则表达式模式,用于解析审计日志格式
    # 模式匹配字段:时间,数据库名(1),用户名,源IP,连接ID,查询ID,事件类型,数据库名(2),SQL语句,执行结果
    pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"
    
    # 尝试匹配日志行
    match = re.match(pattern, new_content)
    if match:
        # 提取匹配的各个字段
        sql_time = match.group(1)        # 时间 (格式: 年月日 时分秒)
        database_name = match.group(8)   # 数据库名称 (从第8组获取)
        sql_content = match.group(9)      # SQL语句内容 (包含原始转义字符)
        s_sql_content = match.group(9)    # 保留原始SQL语句用于日志记录
        sql_result = match.group(10)      # SQL执行结果 (0表示成功)
        
        # 清洗SQL语句内容:
        # 1. 去除SQL注释(--、#和/* */类型的注释)
        COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)
        sql_content = COMMENT_PATTERN.sub("", sql_content)
        
        # 2. 替换特殊转义字符
        #    - 将\n换行符替换为空格
        #    - 将\'转义单引号替换为普通单引号
        #    - 将\t制表符替换为空格
        #    - 将\r回车符替换为空格
        sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')
        
        # 3. 去除SQL语句前后的多余空格
        sql_content = sql_content.strip()
        
        # 检查是否为需要记录的DDL语句:
        # 1. SQL执行成功 (sql_result == "0")
        # 2. 是DDL操作 (CREATE/ALTER/DROP/RENAME开头)
        DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)
        if sql_result == "0" and DDL_PATTERN.match(sql_content):
            # 记录处理日志
            logging.info("-" * 50)
            logging.info(f"原始SQL内容: {s_sql_content}")
            logging.info(f"执行时间: {sql_time}")
            logging.info(f"数据库名称: {database_name}")
            logging.info(f"清洗后SQL: {sql_content}")
            logging.info("-" * 50)
            
            # 将审计记录保存到MySQL数据库
            save_to_mysql(sql_time, database_name, sql_content)
1
2
原始: 'SELECT\n            MAX(...) nearBankTime\n        FROM ...'
清洗后: 'SELECT MAX(...) nearBankTime FROM ...'

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# -*- coding: utf-8 -*-
import os, sys
import re
import time
import json, logging
from datetime import datetime
import pymysql

# 日志配置
logging.basicConfig(
    filename='mysql_monitor.log',
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# 读取缓存文件,定位日志读取位置
def load_state(state_file):
    """Load saved state."""
    try:
        with open(state_file, 'r', encoding='utf-8') as f:
            state = json.load(f)
            return state.get('position', 0), state.get('inode', None), state.get('dev', None)
    except (IOError, ValueError):
        return 0, None, None
    except Exception as e:
        logging.error(f"Failed to load state: {str(e)}")
        return 0, None, None

# 保存位置数据到文件中
def save_state(state_file, position, inode, dev):
    """Save current state."""
    try:
        with open(state_file, 'w', encoding='utf-8') as f:
            json.dump({
                'position': position,
                'inode': inode,
                'dev': dev
            }, f)
    except Exception as e:
        logging.error(f"Failed to save state: {str(e)}")

# 保存sql数据到数据中,方便可视化 或者  告警
def save_to_mysql(sql_time, database_name, sql_content):
    """将日志信息写入到 MySQL 数据库的 audit_log 表中"""
    connection = pymysql.connect(
        host='192.168.102.201',
        user='root',
        password='xxxx.88',
        database='audit_db',
        charset='utf8mb4'
    )
    try:
        with connection.cursor() as cursor:
            sql = """
                  INSERT INTO audit_log (sql_time, database_name, sql_content)
                  VALUES (%s, %s, %s)
                  """
            sql_time = datetime.strptime(sql_time, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")
            cursor.execute(sql, (sql_time, database_name, sql_content))
        connection.commit()
    except Exception as e:
        logging.error(f"Failed to insert data into MySQL: {str(e)}")
    finally:
        connection.close()

# 读取文件 ,数据 匹配 和 清洗
def process_log_entry(new_content):
    """Process a single log entry."""
    pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"
    match = re.match(pattern, new_content)
    if match:
        sql_time = match.group(1)
        database_name = match.group(8)
        sql_content = match.group(9)
        s_sql_content = match.group(9)
        sql_result = match.group(10)
        # 先匹配存在DDL语句的sql内容
        # 去除-- /* # 等注释符
        #logging.info(f"1 {sql_content}")
        COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)
        sql_content = COMMENT_PATTERN.sub("", sql_content)
        # 去除多余的换行符/'等
        #logging.info(f"2 {sql_content}")
        sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')
        #logging.info(f"3 {sql_content}")
        # 去除前后多余的空格
        sql_content = sql_content.strip()
        #logging.info(f"4 {sql_content}")
        # 检查是否为 DDL 语句
        DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)
        if sql_result == "0" and DDL_PATTERN.match(sql_content):
            logging.info("-" * 50)
            logging.info(f"SQL Content: {s_sql_content}")
            logging.info(f"Time: {sql_time}")
            logging.info(f"Database Name: {database_name}")
            logging.info(f"SQL Content: {sql_content}")
            logging.info("-" * 50)
            save_to_mysql(sql_time, database_name, sql_content)

# 启动程序,传入 缓存 文件  和 审计日志记录 文件 位置
def monitor_log(state_file, log_file):
    """Monitor MySQL audit logs."""
    last_position, last_inode, last_dev = load_state(state_file)
    while True:
        try:
            current_st = os.stat(log_file)
        except OSError:
            logging.warning("Log file not found, retrying...")
            time.sleep(1)
            continue
        if (current_st.st_ino != last_inode) or (current_st.st_dev != last_dev):
            logging.info("New log file detected")
            last_position = 0
            last_inode = current_st.st_ino
            last_dev = current_st.st_dev
            save_state(state_file, last_position, last_inode, last_dev)
        if current_st.st_size < last_position:
            logging.info("File truncated")
            last_position = 0
            save_state(state_file, last_position, last_inode, last_dev)
        if current_st.st_size > last_position:
            try:
                with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
                    f.seek(last_position)
                    for line in f:
                        process_log_entry(line.strip())
                    last_position = f.tell()
                    save_state(state_file, last_position, last_inode, last_dev)
            except Exception as e:
                logging.error(f"Error reading log file: {str(e)}")
        time.sleep(0.5)


if __name__ == "__main__":
    log_file = '/data/mysql/server_audit.log'
    state_file = "/opt/mysql_audit_monitor/mysqlMonitor.state"
    logging.info(f"Starting MySQL audit log monitoring: {log_file}")
    try:
        monitor_log(state_file, log_file)
    except KeyboardInterrupt:
        logging.info("Monitoring stopped")
        sys.exit(0)

在这里插入图片描述

在这里插入图片描述

后记

生产环境建议

场景 推荐方案
MySQL 5.7.34以下版本 MariaDB插件方案
MySQL 8.0+ Percona Server或迁移至MariaDB
审计日志存储 独立MySQL实例,与业务库物理隔离
高频操作环境 写入Elasticsearch替代MySQL

通过server_audit插件+Python监控的组合,可在MySQL社区版低成本实现企业级审计需求。但需注意:

📌 版本兼容性是成功前提,MySQL 8.0用户务必选择Percona/MariaDB 📌 日志清洗环节直接影响分析准确性(如转义符处理) 📌 审计日志存储分离是安全最佳实践

最终效果

  • 所有数据库操作可视化展示
  • 实时捕获高危DDL语句
  • 满足等保2.0/ISO 27001审计要求