refactor(audit): 废弃装饰器+分离架构,改为监听器单体直写入库
This commit is contained in:
@ -1,15 +1,16 @@
|
||||
# inventory-backend/app/core/audit_listener.py
|
||||
"""
|
||||
SQLAlchemy Event Listener 审计监听器(精简版)
|
||||
仅负责计算变更数据,存入 g.audit_details,由装饰器统一入库
|
||||
SQLAlchemy Event Listener 审计监听器(单体架构版)
|
||||
监听器亲自完成入库,不依赖 g 对象,不依赖装饰器回调。
|
||||
只要模型发生 INSERT/UPDATE/DELETE,监听器直接创建 AuditLog 并挂载到当前事务 session。
|
||||
"""
|
||||
from sqlalchemy import event, inspect
|
||||
from flask import current_app, g, has_request_context
|
||||
from flask import current_app, request, has_request_context
|
||||
from datetime import datetime
|
||||
|
||||
# 需要忽略的字段
|
||||
|
||||
IGNORE_FIELDS = {
|
||||
'updated_at', 'update_time', ' modified_time', 'last_modified',
|
||||
'updated_at', 'update_time', 'modified_time', 'last_modified',
|
||||
'created_at', 'create_time', 'created_on',
|
||||
}
|
||||
|
||||
@ -32,8 +33,8 @@ def _serialize_value(value):
|
||||
|
||||
def _is_audit_model(mapper):
|
||||
"""判断模型是否需要审计"""
|
||||
if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled:
|
||||
return True
|
||||
if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled is False:
|
||||
return False
|
||||
|
||||
AUDIT_WHITELIST = {
|
||||
'MaterialBase', 'MaterialWarningSetting',
|
||||
@ -46,153 +47,132 @@ def _is_audit_model(mapper):
|
||||
|
||||
|
||||
def _get_module_name(mapper):
|
||||
"""获取业务模块名称"""
|
||||
model_name = mapper.class_.__name__
|
||||
module_map = {
|
||||
'MaterialBase': '基础信息',
|
||||
'MaterialWarningSetting': '基础信息',
|
||||
'StockBuy': '采购入库',
|
||||
'StockSemi': '半成品入库',
|
||||
'StockProduct': '成品入库',
|
||||
'StockService': '服务权益',
|
||||
'RepairRecord': '维修管理',
|
||||
'TransOutbound': '出库记录',
|
||||
'TransBorrow': '借库',
|
||||
'TransReturn': '归还',
|
||||
'BomTable': 'BOM配方',
|
||||
'StockTake': '盘点',
|
||||
'StockAdjust': '盈亏调整',
|
||||
'TransScrap': '报废',
|
||||
'SysUser': '用户管理'
|
||||
}
|
||||
return module_map.get(model_name, model_name)
|
||||
"""根据模型类名推断所属模块"""
|
||||
name = mapper.class_.__name__
|
||||
if 'Stock' in name or 'Buy' in name:
|
||||
return '入库管理'
|
||||
if 'Outbound' in name or 'TransOut' in name:
|
||||
return '出库管理'
|
||||
if 'Borrow' in name or 'Return' in name:
|
||||
return '借还管理'
|
||||
if 'Bom' in name:
|
||||
return 'BOM管理'
|
||||
if 'StockTake' in name or 'Adjust' in name or 'Scrap' in name:
|
||||
return '盘点管理'
|
||||
if 'Repair' in name:
|
||||
return '维修管理'
|
||||
if 'SysUser' in name or 'SysMenu' in name or 'SysRole' in name:
|
||||
return '系统管理'
|
||||
if 'Material' in name:
|
||||
return '基础数据'
|
||||
return '未知模块'
|
||||
|
||||
|
||||
def _extract_unique_key(instance):
|
||||
"""提取记录的唯一标识"""
|
||||
inspector = inspect(instance.__class__)
|
||||
priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no']
|
||||
|
||||
for field in priority_fields:
|
||||
if field in inspector.columns:
|
||||
value = getattr(instance, field, None)
|
||||
if value:
|
||||
return str(value), f'{instance.__class__.__name__}:{value}'
|
||||
|
||||
for col in inspector.columns:
|
||||
if col.type.python_type == str:
|
||||
value = getattr(instance, col.name, None)
|
||||
if value:
|
||||
return str(value), str(value)
|
||||
|
||||
return str(instance.id), f'Record:{instance.id}'
|
||||
|
||||
|
||||
def _store_audit_details(module, target_id, action, details):
|
||||
"""将审计详情存入 g 对象"""
|
||||
def _get_request_user_info():
|
||||
"""从当前 HTTP 请求中尽力提取用户信息,获取不到拉倒"""
|
||||
user_id, username, ip = None, 'system', ''
|
||||
if has_request_context():
|
||||
if not hasattr(g, 'audit_details'):
|
||||
g.audit_details = {}
|
||||
key = f"{module}:{target_id}"
|
||||
g.audit_details[key] = {
|
||||
'action': action,
|
||||
'details': details
|
||||
}
|
||||
print(f"💾 [X光-存储] 已存入 g.audit_details, key: {key}, details: {details}")
|
||||
try:
|
||||
from flask_jwt_extended import get_jwt_identity, get_jwt
|
||||
user_id = get_jwt_identity()
|
||||
claims = get_jwt()
|
||||
username = claims.get('username', 'system')
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
ip = request.headers.get('X-Forwarded-For', '') or request.remote_addr or ''
|
||||
if ip and ',' in ip:
|
||||
ip = ip.split(',')[0].strip()
|
||||
except Exception:
|
||||
pass
|
||||
return user_id, username, ip
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 核心监听器函数
|
||||
# 核心:监听器内部直接创建并挂载日志
|
||||
# ============================================================
|
||||
|
||||
def _create_audit_log(session, mapper, target, action, details):
|
||||
"""
|
||||
监听器内部直接实例化 AuditLog 并加入当前事务 session。
|
||||
由 SQLAlchemy 生命周期保证随主事务一同提交或回滚。
|
||||
"""
|
||||
try:
|
||||
from app.models.audit import AuditLog
|
||||
|
||||
user_id, username, ip = _get_request_user_info()
|
||||
module = _get_module_name(mapper)
|
||||
|
||||
target_id = None
|
||||
if hasattr(target, 'id'):
|
||||
target_id = target.id
|
||||
elif hasattr(target, 'stock_id'):
|
||||
target_id = target.stock_id
|
||||
elif hasattr(target, 'bom_no'):
|
||||
target_id = target.bom_no
|
||||
|
||||
log = AuditLog(
|
||||
user_id=user_id,
|
||||
username=username,
|
||||
action=action,
|
||||
module=module,
|
||||
target_id=str(target_id) if target_id else '0',
|
||||
details=details,
|
||||
ip_address=ip
|
||||
)
|
||||
session.add(log)
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Audit log auto-creation failed: {e}")
|
||||
|
||||
|
||||
def before_update_listener(mapper, connection, target):
|
||||
"""SQLAlchemy before_update 监听器"""
|
||||
print(f"🔔 [X光-监听器] before_update 被调用, target: {target.__class__.__name__}")
|
||||
|
||||
if not _is_audit_model(mapper):
|
||||
print(f"⏭️ [X光-监听器] 跳过非审计模型: {mapper.class_.__name__}")
|
||||
return
|
||||
|
||||
# ★ 使用 inspect(target) 而非 inspector._instance_state(target)
|
||||
state = inspect(target)
|
||||
changes = {}
|
||||
|
||||
for attr in state.attrs:
|
||||
if attr.key in IGNORE_FIELDS:
|
||||
continue
|
||||
if attr.history.has_changes():
|
||||
# ★ 正确使用 added[0] 和 deleted[0]
|
||||
old_val = attr.history.deleted[0] if attr.history.deleted else None
|
||||
new_val = attr.history.added[0] if attr.history.added else None
|
||||
changes[attr.key] = {
|
||||
'old': _serialize_value(old_val),
|
||||
'new': _serialize_value(new_val)
|
||||
}
|
||||
|
||||
print(f"📊 [X光-监听器] 计算完成, changes: {changes}")
|
||||
|
||||
if changes:
|
||||
module = _get_module_name(mapper)
|
||||
target_id, target_name = _extract_unique_key(target)
|
||||
|
||||
# ★ 将变更存入 g
|
||||
details = {'changes': changes}
|
||||
|
||||
# ★ X光调试:确认监听器是否成功算出 changes
|
||||
print(f"🚀 [X光-监听器] 成功触发! target: {target}, changes: {changes}")
|
||||
|
||||
_store_audit_details(module, target_id, 'update', details)
|
||||
else:
|
||||
print(f"⚠️ [X光-监听器] 无变更,跳过")
|
||||
"""UPDATE 事件:抓取字段变更明细"""
|
||||
if not _is_audit_model(mapper): return
|
||||
try:
|
||||
state = inspect(target)
|
||||
changes = {}
|
||||
for attr in state.attrs:
|
||||
if attr.key in IGNORE_FIELDS: continue
|
||||
if attr.history.has_changes():
|
||||
old_val = attr.history.deleted[0] if attr.history.deleted else None
|
||||
new_val = attr.history.added[0] if attr.history.added else None
|
||||
changes[attr.key] = {
|
||||
'old': _serialize_value(old_val),
|
||||
'new': _serialize_value(new_val)
|
||||
}
|
||||
if changes:
|
||||
_create_audit_log(connection, mapper, target, 'update', {'changes': changes})
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Audit Update Error: {e}")
|
||||
|
||||
|
||||
def before_delete_listener(mapper, connection, target):
|
||||
"""SQLAlchemy before_delete 监听器"""
|
||||
if not _is_audit_model(mapper):
|
||||
return
|
||||
|
||||
# 序列化被删除的数据快照
|
||||
inspector = inspect(target.__class__)
|
||||
deleted_snapshot = {}
|
||||
|
||||
for col in inspector.columns:
|
||||
try:
|
||||
value = getattr(target, col.name, None)
|
||||
deleted_snapshot[col.name] = _serialize_value(value)
|
||||
except Exception:
|
||||
deleted_snapshot[col.name] = '[无法序列化]'
|
||||
|
||||
module = _get_module_name(mapper)
|
||||
target_id, target_name = _extract_unique_key(target)
|
||||
|
||||
# ★ 将删除快照存入 g
|
||||
details = {'deleted_snapshot': deleted_snapshot}
|
||||
print(f"🗑️ [X光-监听器] delete 触发, target_id: {target_id}")
|
||||
_store_audit_details(module, target_id, 'delete', details)
|
||||
"""DELETE 事件:抓取被删除对象的完整快照"""
|
||||
if not _is_audit_model(mapper): return
|
||||
try:
|
||||
state = inspect(target)
|
||||
snap = {}
|
||||
for attr in state.attrs:
|
||||
val = getattr(target, attr.key, None)
|
||||
snap[attr.key] = _serialize_value(val)
|
||||
_create_audit_log(connection, mapper, target, 'delete', {'deleted_snapshot': snap})
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Audit Delete Error: {e}")
|
||||
|
||||
|
||||
def after_insert_listener(mapper, connection, target):
|
||||
"""SQLAlchemy after_insert 监听器"""
|
||||
if not _is_audit_model(mapper):
|
||||
return
|
||||
|
||||
inspector = inspect(target.__class__)
|
||||
target_id, target_name = _extract_unique_key(target)
|
||||
module = _get_module_name(mapper)
|
||||
|
||||
# 序列化新记录
|
||||
new_snapshot = {}
|
||||
for col in inspector.columns:
|
||||
try:
|
||||
value = getattr(target, col.name, None)
|
||||
new_snapshot[col.name] = _serialize_value(value)
|
||||
except Exception:
|
||||
new_snapshot[col.name] = '[无法序列化]'
|
||||
|
||||
# ★ 将新增数据存入 g
|
||||
details = {'created': new_snapshot}
|
||||
print(f"➕ [X光-监听器] insert 触发, target_id: {target_id}")
|
||||
_store_audit_details(module, target_id, 'create', details)
|
||||
"""INSERT 事件:抓取新增对象的完整快照"""
|
||||
if not _is_audit_model(mapper): return
|
||||
try:
|
||||
state = inspect(target)
|
||||
snap = {}
|
||||
for attr in state.attrs:
|
||||
val = getattr(target, attr.key, None)
|
||||
snap[attr.key] = _serialize_value(val)
|
||||
_create_audit_log(connection, mapper, target, 'insert', {'created': snap})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ============================================================
|
||||
@ -200,7 +180,7 @@ def after_insert_listener(mapper, connection, target):
|
||||
# ============================================================
|
||||
|
||||
def register_audit_listeners(db):
|
||||
"""注册审计监听器"""
|
||||
"""向所有需要审计的模型注册事件监听器"""
|
||||
from app.models import (
|
||||
MaterialBase, MaterialWarningSetting,
|
||||
StockBuy, StockSemi, StockProduct, StockService,
|
||||
@ -218,14 +198,13 @@ def register_audit_listeners(db):
|
||||
]
|
||||
|
||||
audit_models = [m for m in audit_models if m is not None]
|
||||
|
||||
count = 0
|
||||
for model in audit_models:
|
||||
try:
|
||||
event.listen(model, 'before_update', before_update_listener, propagate=True)
|
||||
event.listen(model, 'before_delete', before_delete_listener, propagate=True)
|
||||
event.listen(model, 'after_insert', after_insert_listener, propagate=True)
|
||||
current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}')
|
||||
except Exception as e:
|
||||
current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}')
|
||||
|
||||
return len(audit_models)
|
||||
count += 1
|
||||
except Exception:
|
||||
pass
|
||||
return count
|
||||
|
||||
Reference in New Issue
Block a user