From 1c8def7e6f433bf1c821f7bd698d1475e077d2ea Mon Sep 17 00:00:00 2001 From: DXC Date: Mon, 20 Apr 2026 14:41:40 +0800 Subject: [PATCH] =?UTF-8?q?refactor(audit):=20=E5=88=86=E7=A6=BB=E6=9E=B6?= =?UTF-8?q?=E6=9E=84-=E7=9B=91=E5=90=AC=E5=99=A8=E8=AE=A1=E7=AE=97?= =?UTF-8?q?=E8=A3=85=E9=A5=B0=E5=99=A8=E5=85=A5=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inventory-backend/app/core/audit_listener.py | 236 ++++--------------- inventory-backend/app/utils/decorators.py | 31 +-- 2 files changed, 57 insertions(+), 210 deletions(-) diff --git a/inventory-backend/app/core/audit_listener.py b/inventory-backend/app/core/audit_listener.py index 6867fec..8673f20 100644 --- a/inventory-backend/app/core/audit_listener.py +++ b/inventory-backend/app/core/audit_listener.py @@ -1,68 +1,21 @@ # inventory-backend/app/core/audit_listener.py """ -SQLAlchemy Event Listener 审计监听器 -基于 SQLAlchemy before_update / before_delete 事件,自动记录数据变更明细 - -用法: - from app.core.audit_listener import register_audit_listeners - from app.extensions import db - register_audit_listeners(db) +SQLAlchemy Event Listener 审计监听器(精简版) +仅负责计算变更数据,存入 g.audit_details,由装饰器统一入库 """ from sqlalchemy import event, inspect -from flask import current_app, g, has_request_context, request +from flask import current_app, g, has_request_context from datetime import datetime -import json -# 需要忽略的字段(系统自动管理,无业务意义) +# 需要忽略的字段 IGNORE_FIELDS = { 'updated_at', 'update_time', ' modified_time', 'last_modified', 'created_at', 'create_time', 'created_on', } -def _get_current_user(): - """ - 获取当前操作人信息 - 兼容两种场景: - 1. HTTP 请求上下文(从 g 或 request 获取) - 2. 系统定时任务(无请求上下文,返回 system) - """ - if has_request_context(): - # 优先从 g 获取(如果视图函数已设置) - user_id = getattr(g, 'user_id', None) - username = getattr(g, 'username', None) - display_name = getattr(g, 'display_name', None) - - if not user_id: - # 尝试从 JWT 获取 - from flask_jwt_extended import get_jwt_identity, get_jwt - try: - user_id = get_jwt_identity() - claims = get_jwt() - username = claims.get('username', '') - display_name = claims.get('display_name', username) - except Exception: - pass - - if user_id: - return user_id, username or '', display_name or username or '' - - # 系统定时任务或脚本 - return None, 'system', '系统任务' - - -def _get_client_ip(): - """获取客户端 IP""" - if has_request_context(): - return request.headers.get('X-Forwarded-For') or request.remote_addr or '' - return '' - - def _serialize_value(value): - """ - 序列化值确保 JSON 兼容 - 处理 datetime, date, bytes 等类型 - """ + """序列化值确保 JSON 兼容""" if value is None: return None if isinstance(value, datetime): @@ -72,22 +25,16 @@ def _serialize_value(value): return value.decode('utf-8') except Exception: return '[二进制数据]' - # 处理 SQLAlchemy 未加载实例 if hasattr(value, '__class__') and value.__class__.__name__ in ('InstanceState', 'LazyLoader'): return str(value) return value def _is_audit_model(mapper): - """ - 判断模型是否需要审计 - 检查模型是否有 audit_enabled 属性或是否在白名单中 - """ - # 方案1:检查模型属性 + """判断模型是否需要审计""" if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled: return True - # 方案2:白名单 AUDIT_WHITELIST = { 'MaterialBase', 'MaterialWarningSetting', 'StockBuy', 'StockSemi', 'StockProduct', 'StockService', @@ -99,9 +46,7 @@ def _is_audit_model(mapper): def _get_module_name(mapper): - """ - 获取业务模块名称 - """ + """获取业务模块名称""" model_name = mapper.class_.__name__ module_map = { 'MaterialBase': '基础信息', @@ -123,12 +68,9 @@ def _get_module_name(mapper): return module_map.get(model_name, model_name) -def _extract_unique_key(instance, inspector): - """ - 提取记录的唯一标识(用于 target_id 和 target_name) - 优先使用:id, material_id, stock_id, user_id 等 - """ - # 优先字段列表 +def _extract_unique_key(instance): + """提取记录的唯一标识""" + inspector = inspect(instance.__class__) priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no'] for field in priority_fields: @@ -137,7 +79,6 @@ def _extract_unique_key(instance, inspector): if value: return str(value), f'{instance.__class__.__name__}:{value}' - # 回退:使用所有非空字符串字段 for col in inspector.columns: if col.type.python_type == str: value = getattr(instance, col.name, None) @@ -147,52 +88,16 @@ def _extract_unique_key(instance, inspector): return str(instance.id), f'Record:{instance.id}' -def _build_audit_log(instance, action, module, details, target_id=None, target_name=None): - """ - 构建并保存审计日志 - 自动处理异常,不影响主事务 - """ - try: - from app.models.audit import AuditLog - from app.extensions import db - - user_id, username, display_name = _get_current_user() - - # 如果没有获取到 target_id/target_name,尝试从实例提取 - if not target_id or not target_name: - inspector = inspect(instance.__class__) - tid, tname = _extract_unique_key(instance, inspector) - target_id = target_id or tid - target_name = target_name or tname - - # ★ 关键:在 g 对象上打标记,告知装饰器已处理 - if target_id and has_request_context(): - if not hasattr(g, 'audit_handled_ids'): - g.audit_handled_ids = {} - # key: "module:target_id", value: action type - key = f"{module}:{target_id}" - g.audit_handled_ids[key] = action - - log_entry = AuditLog( - user_id=user_id, - username=username or 'system', - display_name=display_name or '系统任务', - action=action, - module=module, - target_id=target_id, - target_name=target_name, - details=details, - ip_address=_get_client_ip(), - method='UPDATE' if action == 'update' else 'DELETE', - url=f'model:{instance.__class__.__tablename__}', - status_code=200 - ) - db.session.add(log_entry) - # 注意:此处不 commit,由外层事务统一管理 - - except Exception as e: - # 审计失败不应影响主业务,但需要记录日志 - current_app.logger.error(f'[审计监听] 构建日志失败: {str(e)}') +def _store_audit_details(module, target_id, action, details): + """将审计详情存入 g 对象""" + if has_request_context(): + if not hasattr(g, 'audit_details'): + g.audit_details = {} + key = f"{module}:{target_id}" + g.audit_details[key] = { + 'action': action, + 'details': details + } # ============================================================ @@ -200,24 +105,19 @@ def _build_audit_log(instance, action, module, details, target_id=None, target_n # ============================================================ def before_update_listener(mapper, connection, target): - """ - SQLAlchemy before_update 监听器 - 触发时机:UPDATE 语句执行前 - """ - # 1. 判断是否需要审计 + """SQLAlchemy before_update 监听器""" if not _is_audit_model(mapper): return - # 2. 检查是否有字段变更 - inspector = inspect(target.__class__) - state = inspector._instance_state(target) + # ★ 使用 inspect(target) 而非 inspector._instance_state(target) + state = inspect(target) changes = {} for attr in state.attrs: if attr.key in IGNORE_FIELDS: continue if attr.history.has_changes(): - # ★ 修复:使用 added[0] 和 deleted[0] + # ★ 正确使用 added[0] 和 deleted[0] old_val = attr.history.deleted[0] if attr.history.deleted else None new_val = attr.history.added[0] if attr.history.added else None changes[attr.key] = { @@ -225,26 +125,21 @@ def before_update_listener(mapper, connection, target): 'new': _serialize_value(new_val) } - # 3. 如果有变更,记录审计日志 if changes: module = _get_module_name(mapper) - # 提取 target_id/target_name - target_id, target_name = _extract_unique_key(target, inspector) - + target_id, target_name = _extract_unique_key(target) + + # ★ 将变更存入 g details = {'changes': changes} - _build_audit_log(target, 'update', module, details, target_id, target_name) + _store_audit_details(module, target_id, 'update', details) def before_delete_listener(mapper, connection, target): - """ - SQLAlchemy before_delete 监听器 - 触发时机:DELETE 语句执行前 - """ - # 1. 判断是否需要审计 + """SQLAlchemy before_delete 监听器""" if not _is_audit_model(mapper): return - # 2. 序列化被删除的数据快照 + # 序列化被删除的数据快照 inspector = inspect(target.__class__) deleted_snapshot = {} @@ -255,24 +150,21 @@ def before_delete_listener(mapper, connection, target): except Exception: deleted_snapshot[col.name] = '[无法序列化]' - # 3. 记录审计日志 module = _get_module_name(mapper) - target_id, target_name = _extract_unique_key(target, inspector) + target_id, target_name = _extract_unique_key(target) + # ★ 将删除快照存入 g details = {'deleted_snapshot': deleted_snapshot} - _build_audit_log(target, 'delete', module, details, target_id, target_name) + _store_audit_details(module, target_id, 'delete', details) def after_insert_listener(mapper, connection, target): - """ - SQLAlchemy after_insert 监听器(可选) - 记录新增操作 - """ + """SQLAlchemy after_insert 监听器""" if not _is_audit_model(mapper): return inspector = inspect(target.__class__) - target_id, target_name = _extract_unique_key(target, inspector) + target_id, target_name = _extract_unique_key(target) module = _get_module_name(mapper) # 序列化新记录 @@ -284,8 +176,9 @@ def after_insert_listener(mapper, connection, target): except Exception: new_snapshot[col.name] = '[无法序列化]' + # ★ 将新增数据存入 g details = {'created': new_snapshot} - _build_audit_log(target, 'create', module, details, target_id, target_name) + _store_audit_details(module, target_id, 'create', details) # ============================================================ @@ -293,10 +186,7 @@ def after_insert_listener(mapper, connection, target): # ============================================================ def register_audit_listeners(db): - """ - 注册审计监听器到所有需要的模型 - """ - # 获取所有已映射的模型 + """注册审计监听器""" from app.models import ( MaterialBase, MaterialWarningSetting, StockBuy, StockSemi, StockProduct, StockService, @@ -305,7 +195,6 @@ def register_audit_listeners(db): TransScrap, SysUser ) - # 需要审计的模型列表 audit_models = [ MaterialBase, MaterialWarningSetting, StockBuy, StockSemi, StockProduct, StockService, @@ -314,58 +203,15 @@ def register_audit_listeners(db): TransScrap, SysUser ] - # 过滤掉不存在的模型 audit_models = [m for m in audit_models if m is not None] for model in audit_models: try: - # 绑定 before_update - event.listen( - model, 'before_update', before_update_listener, - propagate=True - ) - # 绑定 before_delete - event.listen( - model, 'before_delete', before_delete_listener, - propagate=True - ) - # 绑定 after_insert(记录新增) - event.listen( - model, 'after_insert', after_insert_listener, - propagate=True - ) + event.listen(model, 'before_update', before_update_listener, propagate=True) + event.listen(model, 'before_delete', before_delete_listener, propagate=True) + event.listen(model, 'after_insert', after_insert_listener, propagate=True) current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}') except Exception as e: current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}') return len(audit_models) - - -def unregister_audit_listeners(db): - """ - 注销审计监听器(用于测试) - """ - from app.models import ( - MaterialBase, MaterialWarningSetting, - StockBuy, StockSemi, StockProduct, StockService, - RepairRecord, TransOutbound, TransBorrow, TransReturn, - BomTable, StockTake, StockAdjust, - TransScrap, SysUser - ) - - audit_models = [ - MaterialBase, MaterialWarningSetting, - StockBuy, StockSemi, StockProduct, StockService, - RepairRecord, TransOutbound, TransBorrow, TransReturn, - BomTable, StockTake, StockAdjust, - TransScrap, SysUser - ] - audit_models = [m for m in audit_models if m is not None] - - for model in audit_models: - try: - event.remove(model, 'before_update', before_update_listener) - event.remove(model, 'before_delete', before_delete_listener) - event.remove(model, 'after_insert', after_insert_listener) - except Exception: - pass \ No newline at end of file diff --git a/inventory-backend/app/utils/decorators.py b/inventory-backend/app/utils/decorators.py index eff730d..fade754 100644 --- a/inventory-backend/app/utils/decorators.py +++ b/inventory-backend/app/utils/decorators.py @@ -301,23 +301,24 @@ def audit_log(module: str, action: str = None, get_target_id_fn=None, get_target # 默认:记录请求 Payload details = {'payload': filtered_payload} - # ★ 关键:检查是否已被底层监听器处理 - # 如果底层已生成高级日志(changes/deleted_snapshot),则跳过简单的 payload 日志 - from flask import g, has_request_context - if has_request_context() and hasattr(g, 'audit_handled_ids') and g.audit_handled_ids: - # 构建检查 key: "module:target_id" + # ★ 核心重构:监听器计算,装饰器入库 + # 检查 g.audit_details 是否有底层监听器传递的高级日志 + if has_request_context() and hasattr(g, 'audit_details') and g.audit_details: check_key = f"{module}:{target_id}" if target_id else None - if check_key and check_key in g.audit_handled_ids: - # 底层已处理,跳过装饰器日志 - current_app.logger.info(f'[审计] 底层监听器已处理 {check_key},跳过装饰器') - return response - # 也检查 target_name 是否匹配 - for key in g.audit_handled_ids: - if target_name and target_name in key: - current_app.logger.info(f'[审计] 底层监听器已处理 {key},跳过装饰器') - return response + if check_key and check_key in g.audit_details: + # 用底层的高级日志(changes/deleted_snapshot/created)替换 payload + audit_info = g.audit_details[check_key] + details = audit_info.get('details', details) + current_app.logger.info(f'[审计] 底层监听器已计算 {check_key},使用高级日志') + else: + # 兼容 fallback:模糊匹配 target_name + for key, audit_info in g.audit_details.items(): + if target_name and target_name in key: + details = audit_info.get('details', details) + current_app.logger.info(f'[审计] 底层监听器已计算 {key},使用高级日志') + break - # 保存日志 + # 保存日志(统一入库) log_entry = AuditLog( user_id=user_id, username=username,