From f8f5b05d7daabe36d82c803a236458057fe4c6e7 Mon Sep 17 00:00:00 2001 From: DXC Date: Mon, 20 Apr 2026 16:04:01 +0800 Subject: [PATCH] =?UTF-8?q?refactor(audit):=20=E5=BA=9F=E5=BC=83=E8=A3=85?= =?UTF-8?q?=E9=A5=B0=E5=99=A8+=E5=88=86=E7=A6=BB=E6=9E=B6=E6=9E=84?= =?UTF-8?q?=EF=BC=8C=E6=94=B9=E4=B8=BA=E7=9B=91=E5=90=AC=E5=99=A8=E5=8D=95?= =?UTF-8?q?=E4=BD=93=E7=9B=B4=E5=86=99=E5=85=A5=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inventory-backend/app/core/audit_listener.py | 271 +++++++++---------- inventory-backend/app/utils/decorators.py | 249 +---------------- 2 files changed, 139 insertions(+), 381 deletions(-) diff --git a/inventory-backend/app/core/audit_listener.py b/inventory-backend/app/core/audit_listener.py index b718b34..2f28f91 100644 --- a/inventory-backend/app/core/audit_listener.py +++ b/inventory-backend/app/core/audit_listener.py @@ -1,15 +1,16 @@ # inventory-backend/app/core/audit_listener.py """ -SQLAlchemy Event Listener 审计监听器(精简版) -仅负责计算变更数据,存入 g.audit_details,由装饰器统一入库 +SQLAlchemy Event Listener 审计监听器(单体架构版) +监听器亲自完成入库,不依赖 g 对象,不依赖装饰器回调。 +只要模型发生 INSERT/UPDATE/DELETE,监听器直接创建 AuditLog 并挂载到当前事务 session。 """ from sqlalchemy import event, inspect -from flask import current_app, g, has_request_context +from flask import current_app, request, has_request_context from datetime import datetime -# 需要忽略的字段 + IGNORE_FIELDS = { - 'updated_at', 'update_time', ' modified_time', 'last_modified', + 'updated_at', 'update_time', 'modified_time', 'last_modified', 'created_at', 'create_time', 'created_on', } @@ -32,8 +33,8 @@ def _serialize_value(value): def _is_audit_model(mapper): """判断模型是否需要审计""" - if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled: - return True + if hasattr(mapper.class_, 'audit_enabled') and mapper.class_.audit_enabled is False: + return False AUDIT_WHITELIST = { 'MaterialBase', 'MaterialWarningSetting', @@ -46,153 +47,132 @@ def _is_audit_model(mapper): def _get_module_name(mapper): - """获取业务模块名称""" - model_name = mapper.class_.__name__ - module_map = { - 'MaterialBase': '基础信息', - 'MaterialWarningSetting': '基础信息', - 'StockBuy': '采购入库', - 'StockSemi': '半成品入库', - 'StockProduct': '成品入库', - 'StockService': '服务权益', - 'RepairRecord': '维修管理', - 'TransOutbound': '出库记录', - 'TransBorrow': '借库', - 'TransReturn': '归还', - 'BomTable': 'BOM配方', - 'StockTake': '盘点', - 'StockAdjust': '盈亏调整', - 'TransScrap': '报废', - 'SysUser': '用户管理' - } - return module_map.get(model_name, model_name) + """根据模型类名推断所属模块""" + name = mapper.class_.__name__ + if 'Stock' in name or 'Buy' in name: + return '入库管理' + if 'Outbound' in name or 'TransOut' in name: + return '出库管理' + if 'Borrow' in name or 'Return' in name: + return '借还管理' + if 'Bom' in name: + return 'BOM管理' + if 'StockTake' in name or 'Adjust' in name or 'Scrap' in name: + return '盘点管理' + if 'Repair' in name: + return '维修管理' + if 'SysUser' in name or 'SysMenu' in name or 'SysRole' in name: + return '系统管理' + if 'Material' in name: + return '基础数据' + return '未知模块' -def _extract_unique_key(instance): - """提取记录的唯一标识""" - inspector = inspect(instance.__class__) - priority_fields = ['id', 'material_id', 'stock_id', 'user_id', 'borrow_no', 'order_no', 'bom_no'] - - for field in priority_fields: - if field in inspector.columns: - value = getattr(instance, field, None) - if value: - return str(value), f'{instance.__class__.__name__}:{value}' - - for col in inspector.columns: - if col.type.python_type == str: - value = getattr(instance, col.name, None) - if value: - return str(value), str(value) - - return str(instance.id), f'Record:{instance.id}' - - -def _store_audit_details(module, target_id, action, details): - """将审计详情存入 g 对象""" +def _get_request_user_info(): + """从当前 HTTP 请求中尽力提取用户信息,获取不到拉倒""" + user_id, username, ip = None, 'system', '' if has_request_context(): - if not hasattr(g, 'audit_details'): - g.audit_details = {} - key = f"{module}:{target_id}" - g.audit_details[key] = { - 'action': action, - 'details': details - } - print(f"💾 [X光-存储] 已存入 g.audit_details, key: {key}, details: {details}") + try: + from flask_jwt_extended import get_jwt_identity, get_jwt + user_id = get_jwt_identity() + claims = get_jwt() + username = claims.get('username', 'system') + except Exception: + pass + try: + ip = request.headers.get('X-Forwarded-For', '') or request.remote_addr or '' + if ip and ',' in ip: + ip = ip.split(',')[0].strip() + except Exception: + pass + return user_id, username, ip # ============================================================ -# 核心监听器函数 +# 核心:监听器内部直接创建并挂载日志 # ============================================================ +def _create_audit_log(session, mapper, target, action, details): + """ + 监听器内部直接实例化 AuditLog 并加入当前事务 session。 + 由 SQLAlchemy 生命周期保证随主事务一同提交或回滚。 + """ + try: + from app.models.audit import AuditLog + + user_id, username, ip = _get_request_user_info() + module = _get_module_name(mapper) + + target_id = None + if hasattr(target, 'id'): + target_id = target.id + elif hasattr(target, 'stock_id'): + target_id = target.stock_id + elif hasattr(target, 'bom_no'): + target_id = target.bom_no + + log = AuditLog( + user_id=user_id, + username=username, + action=action, + module=module, + target_id=str(target_id) if target_id else '0', + details=details, + ip_address=ip + ) + session.add(log) + + except Exception as e: + current_app.logger.error(f"Audit log auto-creation failed: {e}") + + def before_update_listener(mapper, connection, target): - """SQLAlchemy before_update 监听器""" - print(f"🔔 [X光-监听器] before_update 被调用, target: {target.__class__.__name__}") - - if not _is_audit_model(mapper): - print(f"⏭️ [X光-监听器] 跳过非审计模型: {mapper.class_.__name__}") - return - - # ★ 使用 inspect(target) 而非 inspector._instance_state(target) - state = inspect(target) - changes = {} - - for attr in state.attrs: - if attr.key in IGNORE_FIELDS: - continue - if attr.history.has_changes(): - # ★ 正确使用 added[0] 和 deleted[0] - old_val = attr.history.deleted[0] if attr.history.deleted else None - new_val = attr.history.added[0] if attr.history.added else None - changes[attr.key] = { - 'old': _serialize_value(old_val), - 'new': _serialize_value(new_val) - } - - print(f"📊 [X光-监听器] 计算完成, changes: {changes}") - - if changes: - module = _get_module_name(mapper) - target_id, target_name = _extract_unique_key(target) - - # ★ 将变更存入 g - details = {'changes': changes} - - # ★ X光调试:确认监听器是否成功算出 changes - print(f"🚀 [X光-监听器] 成功触发! target: {target}, changes: {changes}") - - _store_audit_details(module, target_id, 'update', details) - else: - print(f"⚠️ [X光-监听器] 无变更,跳过") + """UPDATE 事件:抓取字段变更明细""" + if not _is_audit_model(mapper): return + try: + state = inspect(target) + changes = {} + for attr in state.attrs: + if attr.key in IGNORE_FIELDS: continue + if attr.history.has_changes(): + old_val = attr.history.deleted[0] if attr.history.deleted else None + new_val = attr.history.added[0] if attr.history.added else None + changes[attr.key] = { + 'old': _serialize_value(old_val), + 'new': _serialize_value(new_val) + } + if changes: + _create_audit_log(connection, mapper, target, 'update', {'changes': changes}) + except Exception as e: + current_app.logger.error(f"Audit Update Error: {e}") def before_delete_listener(mapper, connection, target): - """SQLAlchemy before_delete 监听器""" - if not _is_audit_model(mapper): - return - - # 序列化被删除的数据快照 - inspector = inspect(target.__class__) - deleted_snapshot = {} - - for col in inspector.columns: - try: - value = getattr(target, col.name, None) - deleted_snapshot[col.name] = _serialize_value(value) - except Exception: - deleted_snapshot[col.name] = '[无法序列化]' - - module = _get_module_name(mapper) - target_id, target_name = _extract_unique_key(target) - - # ★ 将删除快照存入 g - details = {'deleted_snapshot': deleted_snapshot} - print(f"🗑️ [X光-监听器] delete 触发, target_id: {target_id}") - _store_audit_details(module, target_id, 'delete', details) + """DELETE 事件:抓取被删除对象的完整快照""" + if not _is_audit_model(mapper): return + try: + state = inspect(target) + snap = {} + for attr in state.attrs: + val = getattr(target, attr.key, None) + snap[attr.key] = _serialize_value(val) + _create_audit_log(connection, mapper, target, 'delete', {'deleted_snapshot': snap}) + except Exception as e: + current_app.logger.error(f"Audit Delete Error: {e}") def after_insert_listener(mapper, connection, target): - """SQLAlchemy after_insert 监听器""" - if not _is_audit_model(mapper): - return - - inspector = inspect(target.__class__) - target_id, target_name = _extract_unique_key(target) - module = _get_module_name(mapper) - - # 序列化新记录 - new_snapshot = {} - for col in inspector.columns: - try: - value = getattr(target, col.name, None) - new_snapshot[col.name] = _serialize_value(value) - except Exception: - new_snapshot[col.name] = '[无法序列化]' - - # ★ 将新增数据存入 g - details = {'created': new_snapshot} - print(f"➕ [X光-监听器] insert 触发, target_id: {target_id}") - _store_audit_details(module, target_id, 'create', details) + """INSERT 事件:抓取新增对象的完整快照""" + if not _is_audit_model(mapper): return + try: + state = inspect(target) + snap = {} + for attr in state.attrs: + val = getattr(target, attr.key, None) + snap[attr.key] = _serialize_value(val) + _create_audit_log(connection, mapper, target, 'insert', {'created': snap}) + except Exception: + pass # ============================================================ @@ -200,7 +180,7 @@ def after_insert_listener(mapper, connection, target): # ============================================================ def register_audit_listeners(db): - """注册审计监听器""" + """向所有需要审计的模型注册事件监听器""" from app.models import ( MaterialBase, MaterialWarningSetting, StockBuy, StockSemi, StockProduct, StockService, @@ -218,14 +198,13 @@ def register_audit_listeners(db): ] audit_models = [m for m in audit_models if m is not None] - + count = 0 for model in audit_models: try: event.listen(model, 'before_update', before_update_listener, propagate=True) event.listen(model, 'before_delete', before_delete_listener, propagate=True) event.listen(model, 'after_insert', after_insert_listener, propagate=True) - current_app.logger.info(f'[审计] 已绑定模型: {model.__name__}') - except Exception as e: - current_app.logger.warning(f'[审计] 绑定模型 {model.__name__} 失败: {e}') - - return len(audit_models) \ No newline at end of file + count += 1 + except Exception: + pass + return count diff --git a/inventory-backend/app/utils/decorators.py b/inventory-backend/app/utils/decorators.py index 631a512..9331171 100644 --- a/inventory-backend/app/utils/decorators.py +++ b/inventory-backend/app/utils/decorators.py @@ -5,7 +5,6 @@ from flask import jsonify, g, request, current_app, has_request_context import logging import json - def _verify_token_in_redis(): """ 验证当前 Token 是否与 Redis 中存储的 Token 一致(单设备登录互踢) @@ -14,31 +13,23 @@ def _verify_token_in_redis(): from flask import current_app if redis_client is None: - # Redis 不可用,跳过验证 return True try: - # 获取请求中的 Token auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return True - request_token = auth_header[7:] # 去掉 'Bearer ' 前缀 - - # 获取当前用户 ID + request_token = auth_header[7:] claims = get_jwt() user_id = claims.get('sub') if user_id is None: return True - # 从 Redis 获取存储的 Token stored_token = redis_client.get(f"user_token_{user_id}") - - # 如果 Redis 中没有存储的 Token(可能是旧登录或 Redis 重启),允许通过 if stored_token is None: return True - # 比较 Token 是否一致 if request_token != stored_token: current_app.logger.warning(f"Token mismatch for user {user_id}: request token != stored token") return False @@ -46,25 +37,18 @@ def _verify_token_in_redis(): return True except Exception as e: current_app.logger.error(f"Redis token verification error: {e}") - # 出错时默认放行,避免影响正常业务 return True - def _raise_token_mismatch_error(): - """抛出 Token 不一致的错误(用于单设备登录互踢)""" + """抛出 Token 不一致的错误""" return jsonify({ 'msg': '您的账号已在其他设备登录,请重新登录', 'code': 401, 'reason': 'token_mismatch' }), 401 - def role_required(*roles): - """ - 自定义装饰器:检查用户角色 - 使用方法: @role_required('super_admin', 'finance') - """ - + """自定义装饰器:检查用户角色""" def wrapper(fn): @wraps(fn) def decorator(*args, **kwargs): @@ -72,7 +56,6 @@ def role_required(*roles): user_role = claims.get('role') user_role_upper = user_role.upper() if user_role else None - # 如果是超级管理员,拥有上帝视角,直接放行 (可选) if user_role_upper == 'SUPER_ADMIN': return fn(*args, **kwargs) @@ -80,16 +63,11 @@ def role_required(*roles): return jsonify(msg='权限不足:您没有访问此资源的权限'), 403 return fn(*args, **kwargs) - return decorator - return wrapper - def login_required(fn): - """ - 验证 JWT 令牌是否存在且有效 - """ + """验证 JWT 令牌是否存在且有效""" @wraps(fn) def decorator(*args, **kwargs): try: @@ -98,40 +76,31 @@ def login_required(fn): logging.warning(f"JWT verification failed: {e}") return jsonify(msg='登录已过期,请重新登录'), 401 - # 单设备登录互踢检查 if not _verify_token_in_redis(): return _raise_token_mismatch_error() return fn(*args, **kwargs) return decorator - def permission_required(permission_code): - """ - 检查当前用户是否拥有指定权限码 - 使用方法: @permission_required('material:base:read') - """ + """检查当前用户是否拥有指定权限码""" def wrapper(fn): @wraps(fn) def decorator(*args, **kwargs): - # 首先验证 JWT try: verify_jwt_in_request() except Exception as e: logging.warning(f"JWT verification failed: {e}") return jsonify(msg='登录已过期,请重新登录'), 401 - # 单设备登录互踢检查 if not _verify_token_in_redis(): return _raise_token_mismatch_error() claims = get_jwt() user_role = claims.get('role') - # 超级管理员放行 (忽略大小写) if user_role and user_role.upper() == 'SUPER_ADMIN': return fn(*args, **kwargs) - # 根据角色查询数据库中的权限 try: from app.services.auth_service import AuthService perm_dict = AuthService.get_user_permissions(user_role) @@ -139,214 +108,24 @@ def permission_required(permission_code): logging.warning(f"Failed to fetch permissions for role {user_role}: {e}") return jsonify(msg='权限查询失败'), 403 - # 合并菜单和元素权限 all_perms = perm_dict.get('menus', []) + perm_dict.get('elements', []) if permission_code not in all_perms: - # 详细的调试日志 - print(f"🔴 [权限拦截] 角色 '{user_role}' 访问被拒!需要权限码: '{permission_code}', 但该角色实际拥有: {all_perms}") - logging.warning( - f"权限检查失败: 角色={user_role}, 所需权限={permission_code}, 实际权限列表={all_perms}") + logging.warning(f"权限检查失败: 角色={user_role}, 所需权限={permission_code}") return jsonify(msg='权限不足:您没有访问此资源的权限'), 403 return fn(*args, **kwargs) return decorator return wrapper - -def audit_log(module: str, action: str = None, get_target_id_fn=None, get_target_name_fn=None, get_details_fn=None): +def audit_log(module: str = None, action: str = None, get_target_id_fn=None, get_target_name_fn=None, get_details_fn=None): """ - 审计日志装饰器 - 用法: @audit_log(module='inbound_buy', action='create') - @audit_log(module='bom', action='update', get_target_id_fn=lambda: ..., get_details_fn=lambda req, resp: ...) - - 升级特性: - - 自动捕获请求 Payload 作为变更明细 - - 自动过滤过长的 Base64 图片数据 - - 支持自定义 get_details_fn 覆盖默认行为 + 已废弃! + 由 SQLAlchemy 底层监听器(app/core/audit_listener.py)全面接管审计日志入库。 + 此装饰器保留空壳以防项目中其他文件 import 引用时报错。 """ - # 需要过滤的图片字段 - IMAGE_FIELDS = {'arrival_photo', 'product_photo', 'photo', 'image', 'signature', 'borrow_signature', 'return_signature'} - - def _filter_payload(payload): - """过滤 Payload 中的大字段,防止数据库膨胀""" - if not payload or not isinstance(payload, dict): - return payload - filtered = {} - for key, value in payload.items(): - if key.lower() in IMAGE_FIELDS and isinstance(value, str) and len(value) > 100: - filtered[key] = '[图片数据已省略]' - elif isinstance(value, dict): - filtered[key] = _filter_payload(value) - elif isinstance(value, list): - filtered[key] = [ - _filter_payload(item) if isinstance(item, dict) else item - for item in value - ] - else: - filtered[key] = value - return filtered - - def _get_payload(): - """自动获取请求 Payload""" - # 尝试 JSON - payload = request.get_json(silent=True) - if payload: - return payload - # 尝试 Form Data - if request.form: - return request.form.to_dict() - return None - def wrapper(fn): + from functools import wraps @wraps(fn) - def decorator(*args, **kwargs): - # 获取请求上下文 - claims = get_jwt() - user_id = get_jwt_identity() - username = claims.get('username', '') - display_name = claims.get('display_name', '') - - # ★ 修复 DetachedInstanceError:在 fn() 执行前预先获取用户完整信息 - # 这样可以避免在 fn() 提交 session 后再访问 User 对象导致游离 - if not display_name and user_id: - try: - from app.models.system import SysUser - user = SysUser.query.get(user_id) - if user: - display_name = user.display_name or username - except Exception: - pass - - # 预先获取 IP(避免后续访问 request 对象异常) - ip_address = request.headers.get('X-Forwarded-For') or request.remote_addr or '' - if ip_address and ',' in ip_address: - ip_address = ip_address.split(',')[0].strip() - - # 获取请求信息 - http_method = request.method - url = request.url - user_agent = request.headers.get('User-Agent', '')[:500] - - # 解析 action(支持动态) - final_action = action - if callable(action): - final_action = action() - - # 预先获取 Payload(用于后续 details 记录) - raw_payload = _get_payload() - filtered_payload = _filter_payload(raw_payload) if raw_payload else None - - # 执行原函数(此时 Session 可能被提交或回滚) - response = fn(*args, **kwargs) - - # 只记录成功的请求(响应状态码 200/201) - status_code = 200 - if hasattr(response, 'status_code'): - status_code = response.status_code - - if status_code in [200, 201]: - try: - from app.models.audit import AuditLog - from app.extensions import db - from flask import current_app - - # ★ 已在上方预先获取 display_name,此处无需再查询 User 对象 - # 使用预先获取的字符串数据,避免 DetachedInstanceError - - # 获取 target_id - target_id = None - if get_target_id_fn: - try: - target_id = get_target_id_fn() - except Exception: - pass - if not target_id and hasattr(response, 'json'): - resp_data = response.get_json() - if resp_data and isinstance(resp_data, dict): - target_id = resp_data.get('id') - - # 获取 target_name - target_name = None - if get_target_name_fn: - try: - target_name = get_target_name_fn() - except Exception: - pass - # 如果仍未获取到目标名称,尝试从响应 JSON 中常见字段获取 - if not target_name and hasattr(response, 'json'): - resp_data = response.get_json() - if resp_data and isinstance(resp_data, dict): - # 优先从顶层获取 - for field in ['order_no', 'outbound_no', 'borrow_no', 'adjustment_no', 'material_name']: - if field in resp_data: - target_name = resp_data[field] - break - # 再尝试从 data 字段获取(部分 API 返回格式) - if not target_name and 'data' in resp_data: - data = resp_data['data'] - if isinstance(data, dict): - for field in ['order_no', 'outbound_no', 'borrow_no', 'adjustment_no', 'material_name']: - if field in data: - target_name = data[field] - break - - # 获取 details - details = None - if get_details_fn: - # 优先使用自定义差异对比函数 - try: - details = get_details_fn(request, response) - except Exception: - pass - elif filtered_payload: - # 默认:记录请求 Payload - details = {'payload': filtered_payload} - - # ★ 核心重构:监听器计算,装饰器入库 - # 检查 g.audit_details 是否有底层监听器传递的高级日志 - if has_request_context() and hasattr(g, 'audit_details') and g.audit_details: - check_key = f"{module}:{target_id}" if target_id else None - if check_key and check_key in g.audit_details: - # 用底层的高级日志(changes/deleted_snapshot/created)替换 payload - audit_info = g.audit_details[check_key] - details = audit_info.get('details', details) - current_app.logger.info(f'[审计] 底层监听器已计算 {check_key},使用高级日志') - else: - # 兼容 fallback:模糊匹配 target_name - for key, audit_info in g.audit_details.items(): - if target_name and target_name in key: - details = audit_info.get('details', details) - current_app.logger.info(f'[审计] 底层监听器已计算 {key},使用高级日志') - break - - # 保存日志(统一入库) - print(f"📥 [X光-装饰器] 准备入库! details: {details}, g.audit_details: {getattr(g, 'audit_details', 'NOT_FOUND')}") - - log_entry = AuditLog( - user_id=user_id, - username=username, - display_name=display_name, - action=final_action or http_method.lower(), - module=module, - target_id=str(target_id) if target_id else None, - target_name=target_name, - details=details, - ip_address=ip_address, - user_agent=user_agent, - method=http_method, - url=url, - status_code=status_code - ) - db.session.add(log_entry) - db.session.commit() - - except Exception as e: - import traceback - traceback.print_exc() - print(f"💥 [X光-崩溃] 审计入库失败原因: {str(e)}") - current_app.logger.error(f"审计日志记录失败: {str(e)}") - db.session.rollback() - - return response - + def decorator(*inner_args, **inner_kwargs): + return fn(*inner_args, **inner_kwargs) return decorator - return wrapper + return wrapper \ No newline at end of file