refactor(audit): 分离架构-监听器计算装饰器入库
This commit is contained in:
@ -1,68 +1,21 @@
|
|||||||
# inventory-backend/app/core/audit_listener.py
|
# inventory-backend/app/core/audit_listener.py
|
||||||
"""
|
"""
|
||||||
SQLAlchemy Event Listener 审计监听器
|
SQLAlchemy Event Listener 审计监听器(精简版)
|
||||||
基于 SQLAlchemy before_update / before_delete 事件,自动记录数据变更明细
|
仅负责计算变更数据,存入 g.audit_details,由装饰器统一入库
|
||||||
|
|
||||||
用法:
|
|
||||||
from app.core.audit_listener import register_audit_listeners
|
|
||||||
from app.extensions import db
|
|
||||||
register_audit_listeners(db)
|
|
||||||
"""
|
"""
|
||||||
from sqlalchemy import event, inspect
|
from sqlalchemy import event, inspect
|
||||||
from flask import current_app, g, has_request_context, request
|
from flask import current_app, g, has_request_context
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import json
|
|
||||||
|
|
||||||
# 需要忽略的字段(系统自动管理,无业务意义)
|
# 需要忽略的字段
|
||||||
IGNORE_FIELDS = {
|
IGNORE_FIELDS = {
|
||||||
'updated_at', 'update_time', ' modified_time', 'last_modified',
|
'updated_at', 'update_time', ' modified_time', 'last_modified',
|
||||||
'created_at', 'create_time', 'created_on',
|
'created_at', 'create_time', 'created_on',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def _get_current_user():
|
|
||||||
"""
|
|
||||||
获取当前操作人信息
|
|
||||||
兼容两种场景:
|
|
||||||
1. HTTP 请求上下文(从 g 或 request 获取)
|
|
||||||
2. 系统定时任务(无请求上下文,返回 system)
|
|
||||||
"""
|
|
||||||
if has_request_context():
|
|
||||||
# 优先从 g 获取(如果视图函数已设置)
|
|
||||||
user_id = getattr(g, 'user_id', None)
|
|
||||||
username = getattr(g, 'username', None)
|
|
||||||
display_name = getattr(g, 'display_name', None)
|
|
||||||
|
|
||||||
if not user_id:
|
|
||||||
# 尝试从 JWT 获取
|
|
||||||
from flask_jwt_extended import get_jwt_identity, get_jwt
|
|
||||||
try:
|
|
||||||
user_id = get_jwt_identity()
|
|
||||||
claims = get_jwt()
|
|
||||||
username = claims.get('username', '')
|
|
||||||
display_name = claims.get('display_name', username)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if user_id:
|
|
||||||
return user_id, username or '', display_name or username or ''
|
|
||||||
|
|
||||||
# 系统定时任务或脚本
|
|
||||||
return None, 'system', '系统任务'
|
|
||||||
|
|
||||||
|
|
||||||
def _get_client_ip():
|
|
||||||
"""获取客户端 IP"""
|
|
||||||
if has_request_context():
|
|
||||||
return request.headers.get('X-Forwarded-For') or request.remote_addr or ''
|
|
||||||
return ''
|
|
||||||
|
|
||||||
|
|
||||||
def _serialize_value(value):
|
def _serialize_value(value):
|
||||||
"""
|
"""序列化值确保 JSON 兼容"""
|
||||||
序列化值确保 JSON 兼容
|
|
||||||
处理 datetime, date, bytes 等类型
|
|
||||||
"""
|
|
||||||
if value is None:
|
if value is None:
|
||||||
return None
|
return None
|
||||||
if isinstance(value, datetime):
|
if isinstance(value, datetime):
|
||||||
@ -72,22 +25,16 @@ def _serialize_value(value):
|
|||||||
return value.decode('utf-8')
|
return value.decode('utf-8')
|
||||||
except Exception:
|
except Exception:
|
||||||
return '[二进制数据]'
|
return '[二进制数据]'
|
||||||
# 处理 SQLAlchemy 未加载实例
|
|
||||||
if hasattr(value, '__class__') and value.__class__.__name__ in ('InstanceState', 'LazyLoader'):
|
if hasattr(value, '__class__') and value.__class__.__name__ in ('InstanceState', 'LazyLoader'):
|
||||||
return str(value)
|
return str(value)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
def _is_audit_model(mapper):
|
def _is_audit_model(mapper):
|
||||||
"""
|
"""判断模型是否需要审计"""
|
||||||
判断模型是否需要审计
|
|
||||||
检查模型是否有 audit_enabled 属性或是否在白名单中
|
|
||||||
"""
|
|
||||||
# 方案1:检查模型属性
|
|
||||||
if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled:
|
if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# 方案2:白名单
|
|
||||||
AUDIT_WHITELIST = {
|
AUDIT_WHITELIST = {
|
||||||
'MaterialBase', 'MaterialWarningSetting',
|
'MaterialBase', 'MaterialWarningSetting',
|
||||||
'StockBuy', 'StockSemi', 'StockProduct', 'StockService',
|
'StockBuy', 'StockSemi', 'StockProduct', 'StockService',
|
||||||
@ -99,9 +46,7 @@ def _is_audit_model(mapper):
|
|||||||
|
|
||||||
|
|
||||||
def _get_module_name(mapper):
|
def _get_module_name(mapper):
|
||||||
"""
|
"""获取业务模块名称"""
|
||||||
获取业务模块名称
|
|
||||||
"""
|
|
||||||
model_name = mapper.class_.__name__
|
model_name = mapper.class_.__name__
|
||||||
module_map = {
|
module_map = {
|
||||||
'MaterialBase': '基础信息',
|
'MaterialBase': '基础信息',
|
||||||
@ -123,12 +68,9 @@ def _get_module_name(mapper):
|
|||||||
return module_map.get(model_name, model_name)
|
return module_map.get(model_name, model_name)
|
||||||
|
|
||||||
|
|
||||||
def _extract_unique_key(instance, inspector):
|
def _extract_unique_key(instance):
|
||||||
"""
|
"""提取记录的唯一标识"""
|
||||||
提取记录的唯一标识(用于 target_id 和 target_name)
|
inspector = inspect(instance.__class__)
|
||||||
优先使用:id, material_id, stock_id, user_id 等
|
|
||||||
"""
|
|
||||||
# 优先字段列表
|
|
||||||
priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no']
|
priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no']
|
||||||
|
|
||||||
for field in priority_fields:
|
for field in priority_fields:
|
||||||
@ -137,7 +79,6 @@ def _extract_unique_key(instance, inspector):
|
|||||||
if value:
|
if value:
|
||||||
return str(value), f'{instance.__class__.__name__}:{value}'
|
return str(value), f'{instance.__class__.__name__}:{value}'
|
||||||
|
|
||||||
# 回退:使用所有非空字符串字段
|
|
||||||
for col in inspector.columns:
|
for col in inspector.columns:
|
||||||
if col.type.python_type == str:
|
if col.type.python_type == str:
|
||||||
value = getattr(instance, col.name, None)
|
value = getattr(instance, col.name, None)
|
||||||
@ -147,52 +88,16 @@ def _extract_unique_key(instance, inspector):
|
|||||||
return str(instance.id), f'Record:{instance.id}'
|
return str(instance.id), f'Record:{instance.id}'
|
||||||
|
|
||||||
|
|
||||||
def _build_audit_log(instance, action, module, details, target_id=None, target_name=None):
|
def _store_audit_details(module, target_id, action, details):
|
||||||
"""
|
"""将审计详情存入 g 对象"""
|
||||||
构建并保存审计日志
|
if has_request_context():
|
||||||
自动处理异常,不影响主事务
|
if not hasattr(g, 'audit_details'):
|
||||||
"""
|
g.audit_details = {}
|
||||||
try:
|
key = f"{module}:{target_id}"
|
||||||
from app.models.audit import AuditLog
|
g.audit_details[key] = {
|
||||||
from app.extensions import db
|
'action': action,
|
||||||
|
'details': details
|
||||||
user_id, username, display_name = _get_current_user()
|
}
|
||||||
|
|
||||||
# 如果没有获取到 target_id/target_name,尝试从实例提取
|
|
||||||
if not target_id or not target_name:
|
|
||||||
inspector = inspect(instance.__class__)
|
|
||||||
tid, tname = _extract_unique_key(instance, inspector)
|
|
||||||
target_id = target_id or tid
|
|
||||||
target_name = target_name or tname
|
|
||||||
|
|
||||||
# ★ 关键:在 g 对象上打标记,告知装饰器已处理
|
|
||||||
if target_id and has_request_context():
|
|
||||||
if not hasattr(g, 'audit_handled_ids'):
|
|
||||||
g.audit_handled_ids = {}
|
|
||||||
# key: "module:target_id", value: action type
|
|
||||||
key = f"{module}:{target_id}"
|
|
||||||
g.audit_handled_ids[key] = action
|
|
||||||
|
|
||||||
log_entry = AuditLog(
|
|
||||||
user_id=user_id,
|
|
||||||
username=username or 'system',
|
|
||||||
display_name=display_name or '系统任务',
|
|
||||||
action=action,
|
|
||||||
module=module,
|
|
||||||
target_id=target_id,
|
|
||||||
target_name=target_name,
|
|
||||||
details=details,
|
|
||||||
ip_address=_get_client_ip(),
|
|
||||||
method='UPDATE' if action == 'update' else 'DELETE',
|
|
||||||
url=f'model:{instance.__class__.__tablename__}',
|
|
||||||
status_code=200
|
|
||||||
)
|
|
||||||
db.session.add(log_entry)
|
|
||||||
# 注意:此处不 commit,由外层事务统一管理
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
# 审计失败不应影响主业务,但需要记录日志
|
|
||||||
current_app.logger.error(f'[审计监听] 构建日志失败: {str(e)}')
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
@ -200,24 +105,19 @@ def _build_audit_log(instance, action, module, details, target_id=None, target_n
|
|||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
def before_update_listener(mapper, connection, target):
|
def before_update_listener(mapper, connection, target):
|
||||||
"""
|
"""SQLAlchemy before_update 监听器"""
|
||||||
SQLAlchemy before_update 监听器
|
|
||||||
触发时机:UPDATE 语句执行前
|
|
||||||
"""
|
|
||||||
# 1. 判断是否需要审计
|
|
||||||
if not _is_audit_model(mapper):
|
if not _is_audit_model(mapper):
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. 检查是否有字段变更
|
# ★ 使用 inspect(target) 而非 inspector._instance_state(target)
|
||||||
inspector = inspect(target.__class__)
|
state = inspect(target)
|
||||||
state = inspector._instance_state(target)
|
|
||||||
changes = {}
|
changes = {}
|
||||||
|
|
||||||
for attr in state.attrs:
|
for attr in state.attrs:
|
||||||
if attr.key in IGNORE_FIELDS:
|
if attr.key in IGNORE_FIELDS:
|
||||||
continue
|
continue
|
||||||
if attr.history.has_changes():
|
if attr.history.has_changes():
|
||||||
# ★ 修复:使用 added[0] 和 deleted[0]
|
# ★ 正确使用 added[0] 和 deleted[0]
|
||||||
old_val = attr.history.deleted[0] if attr.history.deleted else None
|
old_val = attr.history.deleted[0] if attr.history.deleted else None
|
||||||
new_val = attr.history.added[0] if attr.history.added else None
|
new_val = attr.history.added[0] if attr.history.added else None
|
||||||
changes[attr.key] = {
|
changes[attr.key] = {
|
||||||
@ -225,26 +125,21 @@ def before_update_listener(mapper, connection, target):
|
|||||||
'new': _serialize_value(new_val)
|
'new': _serialize_value(new_val)
|
||||||
}
|
}
|
||||||
|
|
||||||
# 3. 如果有变更,记录审计日志
|
|
||||||
if changes:
|
if changes:
|
||||||
module = _get_module_name(mapper)
|
module = _get_module_name(mapper)
|
||||||
# 提取 target_id/target_name
|
target_id, target_name = _extract_unique_key(target)
|
||||||
target_id, target_name = _extract_unique_key(target, inspector)
|
|
||||||
|
# ★ 将变更存入 g
|
||||||
details = {'changes': changes}
|
details = {'changes': changes}
|
||||||
_build_audit_log(target, 'update', module, details, target_id, target_name)
|
_store_audit_details(module, target_id, 'update', details)
|
||||||
|
|
||||||
|
|
||||||
def before_delete_listener(mapper, connection, target):
|
def before_delete_listener(mapper, connection, target):
|
||||||
"""
|
"""SQLAlchemy before_delete 监听器"""
|
||||||
SQLAlchemy before_delete 监听器
|
|
||||||
触发时机:DELETE 语句执行前
|
|
||||||
"""
|
|
||||||
# 1. 判断是否需要审计
|
|
||||||
if not _is_audit_model(mapper):
|
if not _is_audit_model(mapper):
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. 序列化被删除的数据快照
|
# 序列化被删除的数据快照
|
||||||
inspector = inspect(target.__class__)
|
inspector = inspect(target.__class__)
|
||||||
deleted_snapshot = {}
|
deleted_snapshot = {}
|
||||||
|
|
||||||
@ -255,24 +150,21 @@ def before_delete_listener(mapper, connection, target):
|
|||||||
except Exception:
|
except Exception:
|
||||||
deleted_snapshot[col.name] = '[无法序列化]'
|
deleted_snapshot[col.name] = '[无法序列化]'
|
||||||
|
|
||||||
# 3. 记录审计日志
|
|
||||||
module = _get_module_name(mapper)
|
module = _get_module_name(mapper)
|
||||||
target_id, target_name = _extract_unique_key(target, inspector)
|
target_id, target_name = _extract_unique_key(target)
|
||||||
|
|
||||||
|
# ★ 将删除快照存入 g
|
||||||
details = {'deleted_snapshot': deleted_snapshot}
|
details = {'deleted_snapshot': deleted_snapshot}
|
||||||
_build_audit_log(target, 'delete', module, details, target_id, target_name)
|
_store_audit_details(module, target_id, 'delete', details)
|
||||||
|
|
||||||
|
|
||||||
def after_insert_listener(mapper, connection, target):
|
def after_insert_listener(mapper, connection, target):
|
||||||
"""
|
"""SQLAlchemy after_insert 监听器"""
|
||||||
SQLAlchemy after_insert 监听器(可选)
|
|
||||||
记录新增操作
|
|
||||||
"""
|
|
||||||
if not _is_audit_model(mapper):
|
if not _is_audit_model(mapper):
|
||||||
return
|
return
|
||||||
|
|
||||||
inspector = inspect(target.__class__)
|
inspector = inspect(target.__class__)
|
||||||
target_id, target_name = _extract_unique_key(target, inspector)
|
target_id, target_name = _extract_unique_key(target)
|
||||||
module = _get_module_name(mapper)
|
module = _get_module_name(mapper)
|
||||||
|
|
||||||
# 序列化新记录
|
# 序列化新记录
|
||||||
@ -284,8 +176,9 @@ def after_insert_listener(mapper, connection, target):
|
|||||||
except Exception:
|
except Exception:
|
||||||
new_snapshot[col.name] = '[无法序列化]'
|
new_snapshot[col.name] = '[无法序列化]'
|
||||||
|
|
||||||
|
# ★ 将新增数据存入 g
|
||||||
details = {'created': new_snapshot}
|
details = {'created': new_snapshot}
|
||||||
_build_audit_log(target, 'create', module, details, target_id, target_name)
|
_store_audit_details(module, target_id, 'create', details)
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
@ -293,10 +186,7 @@ def after_insert_listener(mapper, connection, target):
|
|||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
def register_audit_listeners(db):
|
def register_audit_listeners(db):
|
||||||
"""
|
"""注册审计监听器"""
|
||||||
注册审计监听器到所有需要的模型
|
|
||||||
"""
|
|
||||||
# 获取所有已映射的模型
|
|
||||||
from app.models import (
|
from app.models import (
|
||||||
MaterialBase, MaterialWarningSetting,
|
MaterialBase, MaterialWarningSetting,
|
||||||
StockBuy, StockSemi, StockProduct, StockService,
|
StockBuy, StockSemi, StockProduct, StockService,
|
||||||
@ -305,7 +195,6 @@ def register_audit_listeners(db):
|
|||||||
TransScrap, SysUser
|
TransScrap, SysUser
|
||||||
)
|
)
|
||||||
|
|
||||||
# 需要审计的模型列表
|
|
||||||
audit_models = [
|
audit_models = [
|
||||||
MaterialBase, MaterialWarningSetting,
|
MaterialBase, MaterialWarningSetting,
|
||||||
StockBuy, StockSemi, StockProduct, StockService,
|
StockBuy, StockSemi, StockProduct, StockService,
|
||||||
@ -314,58 +203,15 @@ def register_audit_listeners(db):
|
|||||||
TransScrap, SysUser
|
TransScrap, SysUser
|
||||||
]
|
]
|
||||||
|
|
||||||
# 过滤掉不存在的模型
|
|
||||||
audit_models = [m for m in audit_models if m is not None]
|
audit_models = [m for m in audit_models if m is not None]
|
||||||
|
|
||||||
for model in audit_models:
|
for model in audit_models:
|
||||||
try:
|
try:
|
||||||
# 绑定 before_update
|
event.listen(model, 'before_update', before_update_listener, propagate=True)
|
||||||
event.listen(
|
event.listen(model, 'before_delete', before_delete_listener, propagate=True)
|
||||||
model, 'before_update', before_update_listener,
|
event.listen(model, 'after_insert', after_insert_listener, propagate=True)
|
||||||
propagate=True
|
|
||||||
)
|
|
||||||
# 绑定 before_delete
|
|
||||||
event.listen(
|
|
||||||
model, 'before_delete', before_delete_listener,
|
|
||||||
propagate=True
|
|
||||||
)
|
|
||||||
# 绑定 after_insert(记录新增)
|
|
||||||
event.listen(
|
|
||||||
model, 'after_insert', after_insert_listener,
|
|
||||||
propagate=True
|
|
||||||
)
|
|
||||||
current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}')
|
current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}')
|
current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}')
|
||||||
|
|
||||||
return len(audit_models)
|
return len(audit_models)
|
||||||
|
|
||||||
|
|
||||||
def unregister_audit_listeners(db):
|
|
||||||
"""
|
|
||||||
注销审计监听器(用于测试)
|
|
||||||
"""
|
|
||||||
from app.models import (
|
|
||||||
MaterialBase, MaterialWarningSetting,
|
|
||||||
StockBuy, StockSemi, StockProduct, StockService,
|
|
||||||
RepairRecord, TransOutbound, TransBorrow, TransReturn,
|
|
||||||
BomTable, StockTake, StockAdjust,
|
|
||||||
TransScrap, SysUser
|
|
||||||
)
|
|
||||||
|
|
||||||
audit_models = [
|
|
||||||
MaterialBase, MaterialWarningSetting,
|
|
||||||
StockBuy, StockSemi, StockProduct, StockService,
|
|
||||||
RepairRecord, TransOutbound, TransBorrow, TransReturn,
|
|
||||||
BomTable, StockTake, StockAdjust,
|
|
||||||
TransScrap, SysUser
|
|
||||||
]
|
|
||||||
audit_models = [m for m in audit_models if m is not None]
|
|
||||||
|
|
||||||
for model in audit_models:
|
|
||||||
try:
|
|
||||||
event.remove(model, 'before_update', before_update_listener)
|
|
||||||
event.remove(model, 'before_delete', before_delete_listener)
|
|
||||||
event.remove(model, 'after_insert', after_insert_listener)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
@ -301,23 +301,24 @@ def audit_log(module: str, action: str = None, get_target_id_fn=None, get_target
|
|||||||
# 默认:记录请求 Payload
|
# 默认:记录请求 Payload
|
||||||
details = {'payload': filtered_payload}
|
details = {'payload': filtered_payload}
|
||||||
|
|
||||||
# ★ 关键:检查是否已被底层监听器处理
|
# ★ 核心重构:监听器计算,装饰器入库
|
||||||
# 如果底层已生成高级日志(changes/deleted_snapshot),则跳过简单的 payload 日志
|
# 检查 g.audit_details 是否有底层监听器传递的高级日志
|
||||||
from flask import g, has_request_context
|
if has_request_context() and hasattr(g, 'audit_details') and g.audit_details:
|
||||||
if has_request_context() and hasattr(g, 'audit_handled_ids') and g.audit_handled_ids:
|
|
||||||
# 构建检查 key: "module:target_id"
|
|
||||||
check_key = f"{module}:{target_id}" if target_id else None
|
check_key = f"{module}:{target_id}" if target_id else None
|
||||||
if check_key and check_key in g.audit_handled_ids:
|
if check_key and check_key in g.audit_details:
|
||||||
# 底层已处理,跳过装饰器日志
|
# 用底层的高级日志(changes/deleted_snapshot/created)替换 payload
|
||||||
current_app.logger.info(f'[审计] 底层监听器已处理 {check_key},跳过装饰器')
|
audit_info = g.audit_details[check_key]
|
||||||
return response
|
details = audit_info.get('details', details)
|
||||||
# 也检查 target_name 是否匹配
|
current_app.logger.info(f'[审计] 底层监听器已计算 {check_key},使用高级日志')
|
||||||
for key in g.audit_handled_ids:
|
else:
|
||||||
if target_name and target_name in key:
|
# 兼容 fallback:模糊匹配 target_name
|
||||||
current_app.logger.info(f'[审计] 底层监听器已处理 {key},跳过装饰器')
|
for key, audit_info in g.audit_details.items():
|
||||||
return response
|
if target_name and target_name in key:
|
||||||
|
details = audit_info.get('details', details)
|
||||||
|
current_app.logger.info(f'[审计] 底层监听器已计算 {key},使用高级日志')
|
||||||
|
break
|
||||||
|
|
||||||
# 保存日志
|
# 保存日志(统一入库)
|
||||||
log_entry = AuditLog(
|
log_entry = AuditLog(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
username=username,
|
username=username,
|
||||||
|
|||||||
Reference in New Issue
Block a user