feat(audit): 平滑升级-监听器+装饰器共存,装饰器自动检测并跳过已处理日志

This commit is contained in:
DXC
2026-04-20 13:15:25 +08:00
parent becd3cb010
commit 381d1fa675
2 changed files with 387 additions and 0 deletions

View File

@ -0,0 +1,371 @@
# inventory-backend/app/core/audit_listener.py
"""
SQLAlchemy Event Listener 审计监听器
基于 SQLAlchemy before_update / before_delete 事件,自动记录数据变更明细
用法:
from app.core.audit_listener import register_audit_listeners
from app.extensions import db
register_audit_listeners(db)
"""
from sqlalchemy import event, inspect
from flask import current_app, g, has_request_context, request
from datetime import datetime
import json
# 需要忽略的字段(系统自动管理,无业务意义)
IGNORE_FIELDS = {
'updated_at', 'update_time', ' modified_time', 'last_modified',
'created_at', 'create_time', 'created_on',
}
def _get_current_user():
"""
获取当前操作人信息
兼容两种场景:
1. HTTP 请求上下文(从 g 或 request 获取)
2. 系统定时任务(无请求上下文,返回 system
"""
if has_request_context():
# 优先从 g 获取(如果视图函数已设置)
user_id = getattr(g, 'user_id', None)
username = getattr(g, 'username', None)
display_name = getattr(g, 'display_name', None)
if not user_id:
# 尝试从 JWT 获取
from flask_jwt_extended import get_jwt_identity, get_jwt
try:
user_id = get_jwt_identity()
claims = get_jwt()
username = claims.get('username', '')
display_name = claims.get('display_name', username)
except Exception:
pass
if user_id:
return user_id, username or '', display_name or username or ''
# 系统定时任务或脚本
return None, 'system', '系统任务'
def _get_client_ip():
"""获取客户端 IP"""
if has_request_context():
return request.headers.get('X-Forwarded-For') or request.remote_addr or ''
return ''
def _serialize_value(value):
"""
序列化值确保 JSON 兼容
处理 datetime, date, bytes 等类型
"""
if value is None:
return None
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(value, (bytes, bytearray)):
try:
return value.decode('utf-8')
except Exception:
return '[二进制数据]'
# 处理 SQLAlchemy 未加载实例
if hasattr(value, '__class__') and value.__class__.__name__ in ('InstanceState', 'LazyLoader'):
return str(value)
return value
def _is_audit_model(mapper):
"""
判断模型是否需要审计
检查模型是否有 audit_enabled 属性或是否在白名单中
"""
# 方案1检查模型属性
if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled:
return True
# 方案2白名单
AUDIT_WHITELIST = {
'MaterialBase', 'MaterialWarningSetting',
'StockBuy', 'StockSemi', 'StockProduct', 'StockService',
'RepairRecord', 'TransOutbound', 'TransBorrow', 'TransReturn',
'BomTable', 'StockTake', 'StockAdjust',
'TransScrap', 'SysUser'
}
return mapper.class_.__name__ in AUDIT_WHITELIST
def _get_module_name(mapper):
"""
获取业务模块名称
"""
model_name = mapper.class_.__name__
module_map = {
'MaterialBase': '基础信息',
'MaterialWarningSetting': '基础信息',
'StockBuy': '采购入库',
'StockSemi': '半成品入库',
'StockProduct': '成品入库',
'StockService': '服务权益',
'RepairRecord': '维修管理',
'TransOutbound': '出库记录',
'TransBorrow': '借库',
'TransReturn': '归还',
'BomTable': 'BOM配方',
'StockTake': '盘点',
'StockAdjust': '盈亏调整',
'TransScrap': '报废',
'SysUser': '用户管理'
}
return module_map.get(model_name, model_name)
def _extract_unique_key(instance, inspector):
"""
提取记录的唯一标识(用于 target_id 和 target_name
优先使用id, material_id, stock_id, user_id 等
"""
# 优先字段列表
priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no']
for field in priority_fields:
if field in inspector.columns:
value = getattr(instance, field, None)
if value:
return str(value), f'{instance.__class__.__name__}:{value}'
# 回退:使用所有非空字符串字段
for col in inspector.columns:
if col.type.python_type == str:
value = getattr(instance, col.name, None)
if value:
return str(value), str(value)
return str(instance.id), f'Record:{instance.id}'
def _build_audit_log(instance, action, module, details, target_id=None, target_name=None):
"""
构建并保存审计日志
自动处理异常,不影响主事务
"""
try:
from app.models.audit import AuditLog
from app.extensions import db
user_id, username, display_name = _get_current_user()
# 如果没有获取到 target_id/target_name尝试从实例提取
if not target_id or not target_name:
inspector = inspect(instance.__class__)
tid, tname = _extract_unique_key(instance, inspector)
target_id = target_id or tid
target_name = target_name or tname
# ★ 关键:在 g 对象上打标记,告知装饰器已处理
if target_id and has_request_context():
if not hasattr(g, 'audit_handled_ids'):
g.audit_handled_ids = {}
# key: "module:target_id", value: action type
key = f"{module}:{target_id}"
g.audit_handled_ids[key] = action
log_entry = AuditLog(
user_id=user_id,
username=username or 'system',
display_name=display_name or '系统任务',
action=action,
module=module,
target_id=target_id,
target_name=target_name,
details=details,
ip_address=_get_client_ip(),
method='UPDATE' if action == 'update' else 'DELETE',
url=f'model:{instance.__class__.__tablename__}',
status_code=200
)
db.session.add(log_entry)
# 注意:此处不 commit由外层事务统一管理
except Exception as e:
# 审计失败不应影响主业务,但需要记录日志
current_app.logger.error(f'[审计监听] 构建日志失败: {str(e)}')
# ============================================================
# 核心监听器函数
# ============================================================
def before_update_listener(mapper, connection, target):
"""
SQLAlchemy before_update 监听器
触发时机UPDATE 语句执行前
"""
# 1. 判断是否需要审计
if not _is_audit_model(mapper):
return
# 2. 检查是否有字段变更
inspector = inspect(target.__class__)
state = inspector._instance_state(target)
changes = {}
for attr in state.attrs:
if attr.key in IGNORE_FIELDS:
continue
if attr.history.has_changes():
# ★ 修复:使用 added[0] 和 deleted[0]
old_val = attr.history.deleted[0] if attr.history.deleted else None
new_val = attr.history.added[0] if attr.history.added else None
changes[attr.key] = {
'old': _serialize_value(old_val),
'new': _serialize_value(new_val)
}
# 3. 如果有变更,记录审计日志
if changes:
module = _get_module_name(mapper)
# 提取 target_id/target_name
target_id, target_name = _extract_unique_key(target, inspector)
details = {'changes': changes}
_build_audit_log(target, 'update', module, details, target_id, target_name)
def before_delete_listener(mapper, connection, target):
"""
SQLAlchemy before_delete 监听器
触发时机DELETE 语句执行前
"""
# 1. 判断是否需要审计
if not _is_audit_model(mapper):
return
# 2. 序列化被删除的数据快照
inspector = inspect(target.__class__)
deleted_snapshot = {}
for col in inspector.columns:
try:
value = getattr(target, col.name, None)
deleted_snapshot[col.name] = _serialize_value(value)
except Exception:
deleted_snapshot[col.name] = '[无法序列化]'
# 3. 记录审计日志
module = _get_module_name(mapper)
target_id, target_name = _extract_unique_key(target, inspector)
details = {'deleted_snapshot': deleted_snapshot}
_build_audit_log(target, 'delete', module, details, target_id, target_name)
def after_insert_listener(mapper, connection, target):
"""
SQLAlchemy after_insert 监听器(可选)
记录新增操作
"""
if not _is_audit_model(mapper):
return
inspector = inspect(target.__class__)
target_id, target_name = _extract_unique_key(target, inspector)
module = _get_module_name(mapper)
# 序列化新记录
new_snapshot = {}
for col in inspector.columns:
try:
value = getattr(target, col.name, None)
new_snapshot[col.name] = _serialize_value(value)
except Exception:
new_snapshot[col.name] = '[无法序列化]'
details = {'created': new_snapshot}
_build_audit_log(target, 'create', module, details, target_id, target_name)
# ============================================================
# 注册函数
# ============================================================
def register_audit_listeners(db):
"""
注册审计监听器到所有需要的模型
"""
# 获取所有已映射的模型
from app.models import (
MaterialBase, MaterialWarningSetting,
StockBuy, StockSemi, StockProduct, StockService,
RepairRecord, TransOutbound, TransBorrow, TransReturn,
BomTable, StockTake, StockAdjust,
TransScrap, SysUser
)
# 需要审计的模型列表
audit_models = [
MaterialBase, MaterialWarningSetting,
StockBuy, StockSemi, StockProduct, StockService,
RepairRecord, TransOutbound, TransBorrow, TransReturn,
BomTable, StockTake, StockAdjust,
TransScrap, SysUser
]
# 过滤掉不存在的模型
audit_models = [m for m in audit_models if m is not None]
for model in audit_models:
try:
# 绑定 before_update
event.listen(
model, 'before_update', before_update_listener,
propagate=True
)
# 绑定 before_delete
event.listen(
model, 'before_delete', before_delete_listener,
propagate=True
)
# 绑定 after_insert记录新增
event.listen(
model, 'after_insert', after_insert_listener,
propagate=True
)
current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}')
except Exception as e:
current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}')
return len(audit_models)
def unregister_audit_listeners(db):
"""
注销审计监听器(用于测试)
"""
from app.models import (
MaterialBase, MaterialWarningSetting,
StockBuy, StockSemi, StockProduct, StockService,
RepairRecord, TransOutbound, TransBorrow, TransReturn,
BomTable, StockTake, StockAdjust,
TransScrap, SysUser
)
audit_models = [
MaterialBase, MaterialWarningSetting,
StockBuy, StockSemi, StockProduct, StockService,
RepairRecord, TransOutbound, TransBorrow, TransReturn,
BomTable, StockTake, StockAdjust,
TransScrap, SysUser
]
audit_models = [m for m in audit_models if m is not None]
for model in audit_models:
try:
event.remove(model, 'before_update', before_update_listener)
event.remove(model, 'before_delete', before_delete_listener)
event.remove(model, 'after_insert', after_insert_listener)
except Exception:
pass

View File

@ -301,6 +301,22 @@ def audit_log(module: str, action: str = None, get_target_id_fn=None, get_target
# 默认:记录请求 Payload # 默认:记录请求 Payload
details = {'payload': filtered_payload} details = {'payload': filtered_payload}
# ★ 关键:检查是否已被底层监听器处理
# 如果底层已生成高级日志changes/deleted_snapshot则跳过简单的 payload 日志
from flask import g, has_request_context
if has_request_context() and hasattr(g, 'audit_handled_ids') and g.audit_handled_ids:
# 构建检查 key: "module:target_id"
check_key = f"{module}:{target_id}" if target_id else None
if check_key and check_key in g.audit_handled_ids:
# 底层已处理,跳过装饰器日志
current_app.logger.info(f'[审计] 底层监听器已处理 {check_key},跳过装饰器')
return response
# 也检查 target_name 是否匹配
for key in g.audit_handled_ids:
if target_name and target_name in key:
current_app.logger.info(f'[审计] 底层监听器已处理 {key},跳过装饰器')
return response
# 保存日志 # 保存日志
log_entry = AuditLog( log_entry = AuditLog(
user_id=user_id, user_id=user_id,