From 381d1fa675609e158d457d2d916976839f21009e Mon Sep 17 00:00:00 2001 From: DXC Date: Mon, 20 Apr 2026 13:15:25 +0800 Subject: [PATCH] =?UTF-8?q?feat(audit):=20=E5=B9=B3=E6=BB=91=E5=8D=87?= =?UTF-8?q?=E7=BA=A7-=E7=9B=91=E5=90=AC=E5=99=A8+=E8=A3=85=E9=A5=B0?= =?UTF-8?q?=E5=99=A8=E5=85=B1=E5=AD=98=EF=BC=8C=E8=A3=85=E9=A5=B0=E5=99=A8?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=A3=80=E6=B5=8B=E5=B9=B6=E8=B7=B3=E8=BF=87?= =?UTF-8?q?=E5=B7=B2=E5=A4=84=E7=90=86=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inventory-backend/app/core/audit_listener.py | 371 +++++++++++++++++++ inventory-backend/app/utils/decorators.py | 16 + 2 files changed, 387 insertions(+) create mode 100644 inventory-backend/app/core/audit_listener.py diff --git a/inventory-backend/app/core/audit_listener.py b/inventory-backend/app/core/audit_listener.py new file mode 100644 index 0000000..6867fec --- /dev/null +++ b/inventory-backend/app/core/audit_listener.py @@ -0,0 +1,371 @@ +# inventory-backend/app/core/audit_listener.py +""" +SQLAlchemy Event Listener 审计监听器 +基于 SQLAlchemy before_update / before_delete 事件,自动记录数据变更明细 + +用法: + from app.core.audit_listener import register_audit_listeners + from app.extensions import db + register_audit_listeners(db) +""" +from sqlalchemy import event, inspect +from flask import current_app, g, has_request_context, request +from datetime import datetime +import json + +# 需要忽略的字段(系统自动管理,无业务意义) +IGNORE_FIELDS = { + 'updated_at', 'update_time', ' modified_time', 'last_modified', + 'created_at', 'create_time', 'created_on', +} + + +def _get_current_user(): + """ + 获取当前操作人信息 + 兼容两种场景: + 1. HTTP 请求上下文(从 g 或 request 获取) + 2. 系统定时任务(无请求上下文,返回 system) + """ + if has_request_context(): + # 优先从 g 获取(如果视图函数已设置) + user_id = getattr(g, 'user_id', None) + username = getattr(g, 'username', None) + display_name = getattr(g, 'display_name', None) + + if not user_id: + # 尝试从 JWT 获取 + from flask_jwt_extended import get_jwt_identity, get_jwt + try: + user_id = get_jwt_identity() + claims = get_jwt() + username = claims.get('username', '') + display_name = claims.get('display_name', username) + except Exception: + pass + + if user_id: + return user_id, username or '', display_name or username or '' + + # 系统定时任务或脚本 + return None, 'system', '系统任务' + + +def _get_client_ip(): + """获取客户端 IP""" + if has_request_context(): + return request.headers.get('X-Forwarded-For') or request.remote_addr or '' + return '' + + +def _serialize_value(value): + """ + 序列化值确保 JSON 兼容 + 处理 datetime, date, bytes 等类型 + """ + if value is None: + return None + if isinstance(value, datetime): + return value.strftime('%Y-%m-%d %H:%M:%S') + if isinstance(value, (bytes, bytearray)): + try: + return value.decode('utf-8') + except Exception: + return '[二进制数据]' + # 处理 SQLAlchemy 未加载实例 + if hasattr(value, '__class__') and value.__class__.__name__ in ('InstanceState', 'LazyLoader'): + return str(value) + return value + + +def _is_audit_model(mapper): + """ + 判断模型是否需要审计 + 检查模型是否有 audit_enabled 属性或是否在白名单中 + """ + # 方案1:检查模型属性 + if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled: + return True + + # 方案2:白名单 + AUDIT_WHITELIST = { + 'MaterialBase', 'MaterialWarningSetting', + 'StockBuy', 'StockSemi', 'StockProduct', 'StockService', + 'RepairRecord', 'TransOutbound', 'TransBorrow', 'TransReturn', + 'BomTable', 'StockTake', 'StockAdjust', + 'TransScrap', 'SysUser' + } + return mapper.class_.__name__ in AUDIT_WHITELIST + + +def _get_module_name(mapper): + """ + 获取业务模块名称 + """ + model_name = mapper.class_.__name__ + module_map = { + 'MaterialBase': '基础信息', + 'MaterialWarningSetting': '基础信息', + 'StockBuy': '采购入库', + 'StockSemi': '半成品入库', + 'StockProduct': '成品入库', + 'StockService': '服务权益', + 'RepairRecord': '维修管理', + 'TransOutbound': '出库记录', + 'TransBorrow': '借库', + 'TransReturn': '归还', + 'BomTable': 'BOM配方', + 'StockTake': '盘点', + 'StockAdjust': '盈亏调整', + 'TransScrap': '报废', + 'SysUser': '用户管理' + } + return module_map.get(model_name, model_name) + + +def _extract_unique_key(instance, inspector): + """ + 提取记录的唯一标识(用于 target_id 和 target_name) + 优先使用:id, material_id, stock_id, user_id 等 + """ + # 优先字段列表 + priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no'] + + for field in priority_fields: + if field in inspector.columns: + value = getattr(instance, field, None) + if value: + return str(value), f'{instance.__class__.__name__}:{value}' + + # 回退:使用所有非空字符串字段 + for col in inspector.columns: + if col.type.python_type == str: + value = getattr(instance, col.name, None) + if value: + return str(value), str(value) + + return str(instance.id), f'Record:{instance.id}' + + +def _build_audit_log(instance, action, module, details, target_id=None, target_name=None): + """ + 构建并保存审计日志 + 自动处理异常,不影响主事务 + """ + try: + from app.models.audit import AuditLog + from app.extensions import db + + user_id, username, display_name = _get_current_user() + + # 如果没有获取到 target_id/target_name,尝试从实例提取 + if not target_id or not target_name: + inspector = inspect(instance.__class__) + tid, tname = _extract_unique_key(instance, inspector) + target_id = target_id or tid + target_name = target_name or tname + + # ★ 关键:在 g 对象上打标记,告知装饰器已处理 + if target_id and has_request_context(): + if not hasattr(g, 'audit_handled_ids'): + g.audit_handled_ids = {} + # key: "module:target_id", value: action type + key = f"{module}:{target_id}" + g.audit_handled_ids[key] = action + + log_entry = AuditLog( + user_id=user_id, + username=username or 'system', + display_name=display_name or '系统任务', + action=action, + module=module, + target_id=target_id, + target_name=target_name, + details=details, + ip_address=_get_client_ip(), + method='UPDATE' if action == 'update' else 'DELETE', + url=f'model:{instance.__class__.__tablename__}', + status_code=200 + ) + db.session.add(log_entry) + # 注意:此处不 commit,由外层事务统一管理 + + except Exception as e: + # 审计失败不应影响主业务,但需要记录日志 + current_app.logger.error(f'[审计监听] 构建日志失败: {str(e)}') + + +# ============================================================ +# 核心监听器函数 +# ============================================================ + +def before_update_listener(mapper, connection, target): + """ + SQLAlchemy before_update 监听器 + 触发时机:UPDATE 语句执行前 + """ + # 1. 判断是否需要审计 + if not _is_audit_model(mapper): + return + + # 2. 检查是否有字段变更 + inspector = inspect(target.__class__) + state = inspector._instance_state(target) + changes = {} + + for attr in state.attrs: + if attr.key in IGNORE_FIELDS: + continue + if attr.history.has_changes(): + # ★ 修复:使用 added[0] 和 deleted[0] + old_val = attr.history.deleted[0] if attr.history.deleted else None + new_val = attr.history.added[0] if attr.history.added else None + changes[attr.key] = { + 'old': _serialize_value(old_val), + 'new': _serialize_value(new_val) + } + + # 3. 如果有变更,记录审计日志 + if changes: + module = _get_module_name(mapper) + # 提取 target_id/target_name + target_id, target_name = _extract_unique_key(target, inspector) + + details = {'changes': changes} + _build_audit_log(target, 'update', module, details, target_id, target_name) + + +def before_delete_listener(mapper, connection, target): + """ + SQLAlchemy before_delete 监听器 + 触发时机:DELETE 语句执行前 + """ + # 1. 判断是否需要审计 + if not _is_audit_model(mapper): + return + + # 2. 序列化被删除的数据快照 + inspector = inspect(target.__class__) + deleted_snapshot = {} + + for col in inspector.columns: + try: + value = getattr(target, col.name, None) + deleted_snapshot[col.name] = _serialize_value(value) + except Exception: + deleted_snapshot[col.name] = '[无法序列化]' + + # 3. 记录审计日志 + module = _get_module_name(mapper) + target_id, target_name = _extract_unique_key(target, inspector) + + details = {'deleted_snapshot': deleted_snapshot} + _build_audit_log(target, 'delete', module, details, target_id, target_name) + + +def after_insert_listener(mapper, connection, target): + """ + SQLAlchemy after_insert 监听器(可选) + 记录新增操作 + """ + if not _is_audit_model(mapper): + return + + inspector = inspect(target.__class__) + target_id, target_name = _extract_unique_key(target, inspector) + module = _get_module_name(mapper) + + # 序列化新记录 + new_snapshot = {} + for col in inspector.columns: + try: + value = getattr(target, col.name, None) + new_snapshot[col.name] = _serialize_value(value) + except Exception: + new_snapshot[col.name] = '[无法序列化]' + + details = {'created': new_snapshot} + _build_audit_log(target, 'create', module, details, target_id, target_name) + + +# ============================================================ +# 注册函数 +# ============================================================ + +def register_audit_listeners(db): + """ + 注册审计监听器到所有需要的模型 + """ + # 获取所有已映射的模型 + from app.models import ( + MaterialBase, MaterialWarningSetting, + StockBuy, StockSemi, StockProduct, StockService, + RepairRecord, TransOutbound, TransBorrow, TransReturn, + BomTable, StockTake, StockAdjust, + TransScrap, SysUser + ) + + # 需要审计的模型列表 + audit_models = [ + MaterialBase, MaterialWarningSetting, + StockBuy, StockSemi, StockProduct, StockService, + RepairRecord, TransOutbound, TransBorrow, TransReturn, + BomTable, StockTake, StockAdjust, + TransScrap, SysUser + ] + + # 过滤掉不存在的模型 + audit_models = [m for m in audit_models if m is not None] + + for model in audit_models: + try: + # 绑定 before_update + event.listen( + model, 'before_update', before_update_listener, + propagate=True + ) + # 绑定 before_delete + event.listen( + model, 'before_delete', before_delete_listener, + propagate=True + ) + # 绑定 after_insert(记录新增) + event.listen( + model, 'after_insert', after_insert_listener, + propagate=True + ) + current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}') + except Exception as e: + current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}') + + return len(audit_models) + + +def unregister_audit_listeners(db): + """ + 注销审计监听器(用于测试) + """ + from app.models import ( + MaterialBase, MaterialWarningSetting, + StockBuy, StockSemi, StockProduct, StockService, + RepairRecord, TransOutbound, TransBorrow, TransReturn, + BomTable, StockTake, StockAdjust, + TransScrap, SysUser + ) + + audit_models = [ + MaterialBase, MaterialWarningSetting, + StockBuy, StockSemi, StockProduct, StockService, + RepairRecord, TransOutbound, TransBorrow, TransReturn, + BomTable, StockTake, StockAdjust, + TransScrap, SysUser + ] + audit_models = [m for m in audit_models if m is not None] + + for model in audit_models: + try: + event.remove(model, 'before_update', before_update_listener) + event.remove(model, 'before_delete', before_delete_listener) + event.remove(model, 'after_insert', after_insert_listener) + except Exception: + pass \ No newline at end of file diff --git a/inventory-backend/app/utils/decorators.py b/inventory-backend/app/utils/decorators.py index 995b3fd..eff730d 100644 --- a/inventory-backend/app/utils/decorators.py +++ b/inventory-backend/app/utils/decorators.py @@ -301,6 +301,22 @@ def audit_log(module: str, action: str = None, get_target_id_fn=None, get_target # 默认:记录请求 Payload details = {'payload': filtered_payload} + # ★ 关键:检查是否已被底层监听器处理 + # 如果底层已生成高级日志(changes/deleted_snapshot),则跳过简单的 payload 日志 + from flask import g, has_request_context + if has_request_context() and hasattr(g, 'audit_handled_ids') and g.audit_handled_ids: + # 构建检查 key: "module:target_id" + check_key = f"{module}:{target_id}" if target_id else None + if check_key and check_key in g.audit_handled_ids: + # 底层已处理,跳过装饰器日志 + current_app.logger.info(f'[审计] 底层监听器已处理 {check_key},跳过装饰器') + return response + # 也检查 target_name 是否匹配 + for key in g.audit_handled_ids: + if target_name and target_name in key: + current_app.logger.info(f'[审计] 底层监听器已处理 {key},跳过装饰器') + return response + # 保存日志 log_entry = AuditLog( user_id=user_id,