From 3cb31c2b67c781fc7b5227ccc239ed58a100c725 Mon Sep 17 00:00:00 2001 From: DXC Date: Mon, 18 May 2026 16:16:50 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20JWT=20=E5=B9=BD?= =?UTF-8?q?=E7=81=B5=E4=BB=A4=E7=89=8C=E6=BC=8F=E6=B4=9E=EF=BC=8C=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20Dify=20=E6=9D=83=E9=99=90=E8=BF=87=E6=BB=A4?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inventory-backend/app/extensions.py | 66 +++- .../app/services/auth_service.py | 22 +- .../app/services/dify_permission_service.py | 284 ++++++++++++++++++ inventory-backend/app/utils/decorators.py | 52 +++- inventory-web/index.html | 111 ++++--- 5 files changed, 481 insertions(+), 54 deletions(-) create mode 100644 inventory-backend/app/services/dify_permission_service.py diff --git a/inventory-backend/app/extensions.py b/inventory-backend/app/extensions.py index c2a9440..f9a3b40 100644 --- a/inventory-backend/app/extensions.py +++ b/inventory-backend/app/extensions.py @@ -2,6 +2,7 @@ from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_cors import CORS from flask_jwt_extended import JWTManager # 确保引入了 JWTManager +from flask import current_app from datetime import datetime, timezone, timedelta import redis @@ -11,15 +12,78 @@ migrate = Migrate() cors = CORS() jwt = JWTManager() # 必须实例化 -# Redis 客户端 (单设备登录互踢用) +# Redis 客户端 (单设备登录互踢 + JWT Token 黑名单用) redis_client = None +# Redis Key 前缀 +_JWT_BLOCKED_USER_PREFIX = "jwt_blocked_user:" # 存储被删除/禁用的 user_id + def beijing_time(): """获取北京时间 (UTC+8),剥离时区信息以兼容数据库 naive DateTime 字段""" return datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) +# ============================================================================= +# 全局 JWT Token 黑名单拦截器 +# 原理:Flask-JWT-Extended 在每次 @jwt_required() 验证时, +# 会自动触发 token_in_blocklist_loader 回调。 +# 若该回调返回 True(命中黑名单),请求直接被 401 拒绝,后续代码不会执行。 +# ============================================================================= +@jwt.token_in_blocklist_loader +def check_if_token_is_revoked(jwt_header, jwt_payload): + """ + 全局 JWT 黑名单检查:每次 @jwt_required() 调用时自动触发。 + 无论 AI(Dify)还是人类用户调用的接口,均受此拦截。 + + 检查逻辑: + 1. 通过 jwt_payload['sub'](user_id)查询 Redis 黑名单 + 2. 若 user_id 存在于黑名单 → 返回 True → 请求被 401 拒绝 + 3. 若 Redis 不可用(fail-open)→ 放行(不影响正常业务) + """ + user_id = jwt_payload.get('sub') + if user_id is None: + return False + + global redis_client + if redis_client is None: + return False + + try: + blocked_key = f"{_JWT_BLOCKED_USER_PREFIX}{user_id}" + is_blocked = redis_client.exists(blocked_key) + if is_blocked: + current_app.logger.warning( + f"🚫 JWT revoked for deleted/disabled user: user_id={user_id}" + ) + return bool(is_blocked) + except Exception as e: + current_app.logger.error(f"JWT blocklist check error: {e}") + return False # Redis 出错时 fail-open,不阻断正常业务 + + +def revoke_all_tokens_for_user(user_id): + """ + 将指定用户的 ID 加入 JWT 黑名单(14 天)。 + 效果:该用户的所有已发放 Token(无论是否过期)瞬间失效。 + 由 delete_user() / update_user(status!='active') 时调用。 + """ + global redis_client + if redis_client is None: + current_app.logger.warning( + f"Redis unavailable, cannot revoke tokens for user_id={user_id}" + ) + return + + try: + blocked_key = f"{_JWT_BLOCKED_USER_PREFIX}{user_id}" + ttl_seconds = 14 * 24 * 3600 # 14 天,与 Refresh Token 有效期对齐 + redis_client.setex(blocked_key, ttl_seconds, "1") + current_app.logger.info(f"✅ User {user_id} added to JWT blocklist (TTL={ttl_seconds}s)") + except Exception as e: + current_app.logger.error(f"Failed to revoke tokens for user_id={user_id}: {e}") + + # 2. 定义初始化函数 (供工厂函数 create_app 调用) def init_extensions(app): """ diff --git a/inventory-backend/app/services/auth_service.py b/inventory-backend/app/services/auth_service.py index 82b97bc..66db6b5 100644 --- a/inventory-backend/app/services/auth_service.py +++ b/inventory-backend/app/services/auth_service.py @@ -1,6 +1,6 @@ # app/services/auth_service.py from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission -from app.extensions import db, redis_client +from app.extensions import db, redis_client, revoke_all_tokens_for_user from sqlalchemy import func from flask_jwt_extended import create_access_token, create_refresh_token, get_jwt_identity from flask import current_app @@ -334,7 +334,11 @@ class AuthService: user.email = email if 'status' in data: - user.status = data['status'] + new_status = data['status'] + # ★ 幽灵令牌漏洞修复:用户被禁用时,立即吊销其所有 Token + if new_status != 'active' and user.status == 'active': + revoke_all_tokens_for_user(user_id) + user.status = new_status new_password = data.get('password') if new_password and str(new_password).strip(): @@ -353,7 +357,7 @@ class AuthService: @staticmethod def delete_user(user_id, operator_role): - """删除用户""" + """删除用户:删除前自动吊销该用户所有 JWT Token""" # 标准化操作者角色为全大写 operator_role_upper = operator_role.upper() if operator_role else None if operator_role_upper != UserRole.SUPER_ADMIN: @@ -365,6 +369,18 @@ class AuthService: # 提前获取用户名用于审计日志 username = user.username + + # ★ 幽灵令牌漏洞修复:删除用户前,先将 user_id 加入 JWT 黑名单 + # 效果:该用户持有的所有 Token 瞬间失效,无论是否已过期 + revoke_all_tokens_for_user(user_id) + + # 清除 Redis 中的单设备登录 Token(防止残留) + if redis_client is not None: + try: + redis_client.delete(f"user_token_{user_id}") + except Exception as e: + current_app.logger.warning(f"Failed to delete user token from Redis: {e}") + db.session.delete(user) db.session.commit() return username diff --git a/inventory-backend/app/services/dify_permission_service.py b/inventory-backend/app/services/dify_permission_service.py new file mode 100644 index 0000000..7e888f5 --- /dev/null +++ b/inventory-backend/app/services/dify_permission_service.py @@ -0,0 +1,284 @@ +# app/services/dify_permission_service.py +""" +Dify 智能客服权限服务层 + +职责: +1. 从 JWT Token / g.dify_user_role 解析用户角色 +2. 根据角色查询 sys_role_permission 表,获取用户拥有的 target_code +3. AI 专属的动态脱敏策略: + - SUPER_ADMIN:无条件放行所有数据 + - SALES / INBOUND:剔除 price, cost, supplier 等敏感字段 + - 跨模块越权查询:直接阻断,返回角色专属的错误信息给大模型 +""" + +from flask import g, current_app +from flask_jwt_extended import decode_token +from app.models.system import SysRolePermission +from app.services.auth_service import AuthService +from app.utils.constants import UserRole +from sqlalchemy import func + + +# ============================================================================== +# 角色敏感字段定义(按角色黑名单) +# ============================================================================== +# 每个角色在 AI 对话场景下,禁止查看的字段集合 +ROLE_SENSITIVE_FIELDS = { + # 入库员:禁止查看采购相关金额 + 'INBOUND': frozenset([ + 'purchase_price', + 'cost', + 'price', + 'supplier', + 'supplier_id', + 'unit_price', + 'total_price', + 'order_price', + 'last_purchase_price', + 'avg_purchase_price', + 'supplier_name', + ]), + # 销售员:禁止查看成本/采购相关数据 + 'SALES': frozenset([ + 'purchase_price', + 'cost', + 'unit_cost', + 'supplier', + 'supplier_id', + 'unit_price', + 'last_purchase_price', + 'avg_purchase_price', + 'supplier_name', + 'stock_cost', + 'total_cost', + 'supply_price', + ]), + # 采购员:禁止查看销售毛利相关数据 + 'PURCHASER': frozenset([ + 'sale_price', + 'retail_price', + 'suggested_price', + 'margin', + 'profit', + 'profit_rate', + ]), +} + +# 跨模块越权查询拦截配置 +# key: 角色, value: { 尝试查询的字段: 给 AI 的回复 } +ROLE_FORBIDDEN_QUERIES = { + 'INBOUND': { + 'purchase_price': '抱歉,您当前的角色(入库员)无权查看采购金额数据。', + 'cost': '抱歉,您当前的角色(入库员)无权查看采购成本数据。', + 'supplier': '抱歉,您当前的角色(入库员)无权查看供应商数据。', + 'unit_price': '抱歉,您当前的角色(入库员)无权查看采购单价数据。', + 'last_purchase_price': '抱歉,您当前的角色(入库员)无权查看历史采购价数据。', + }, + 'SALES': { + 'purchase_price': '抱歉,您当前的角色(销售员)无权查看采购成本数据。', + 'cost': '抱歉,您当前的角色(销售员)无权查看成本数据。', + 'supplier': '抱歉,您当前的角色(销售员)无权查看供应商信息。', + 'unit_cost': '抱歉,您当前的角色(销售员)无权查看单位成本数据。', + 'last_purchase_price': '抱歉,您当前的角色(销售员)无权查看历史采购价数据。', + }, + 'PURCHASER': { + 'sale_price': '抱歉,您当前的角色(采购员)无权查看销售价格数据。', + 'retail_price': '抱歉,您当前的角色(采购员)无权查看零售价数据。', + 'margin': '抱歉,您当前的角色(采购员)无权查看毛利数据。', + 'profit': '抱歉,您当前的角色(采购员)无权查看利润数据。', + }, +} + + +class DifyPermissionService: + """Dify AI 专属权限服务""" + + @staticmethod + def get_user_role(token: str = None) -> str: + """ + 从 JWT Token 或 g.dify_user_role 解析用户角色 + 返回标准化的大写角色码(如 'INBOUND', 'SALES', 'SUPER_ADMIN') + """ + user_role = None + + # 优先从 g 对象获取(由 dify_auth_required 存入) + if hasattr(g, 'dify_user_role') and g.dify_user_role: + return g.dify_user_role + + # 从传入的 token 解码 + if token: + try: + claims = decode_token(token) + user_role = claims.get('role') + except Exception as e: + current_app.logger.warning(f"[DifyPermission] token 解码失败: {e}") + return '' + + return (user_role or '').upper() + + @staticmethod + def is_super_admin(role: str = None) -> bool: + """判断是否为超级管理员""" + if not role: + role = DifyPermissionService.get_user_role() + return role.upper() == UserRole.SUPER_ADMIN if role else False + + @staticmethod + def get_role_permissions(role: str = None) -> dict: + """ + 获取指定角色的所有权限代码列表 + 内部调用 AuthService.get_user_permissions() + """ + if not role: + role = DifyPermissionService.get_user_role() + return AuthService.get_user_permissions(role) + + @staticmethod + def get_target_codes(role: str = None) -> list: + """ + 获取用户角色在 sys_role_permission 表中的所有 target_code + 包括 menu 和 element 两种类型 + """ + if not role: + role = DifyPermissionService.get_user_role() + + if not role: + return [] + + # 超级管理员拥有所有权限 + if DifyPermissionService.is_super_admin(role): + return ['*'] + + # 从数据库查询 + try: + perms = SysRolePermission.query.filter( + func.upper(SysRolePermission.role_code) == role.upper() + ).all() + return [p.target_code for p in perms] + except Exception as e: + current_app.logger.error(f"[DifyPermission] 查询 target_code 失败: {e}") + return [] + + @staticmethod + def has_permission(permission_code: str, role: str = None) -> bool: + """ + 检查用户是否拥有指定权限码 + 超级管理员永远返回 True + """ + if DifyPermissionService.is_super_admin(role): + return True + if not role: + role = DifyPermissionService.get_user_role() + perms = DifyPermissionService.get_role_permissions(role) + all_perms = perms.get('menus', []) + perms.get('elements', []) + return permission_code in all_perms + + @staticmethod + def check_forbidden_query(query_fields: list, role: str = None) -> dict: + """ + 检查用户尝试查询的字段是否越权。 + + 参数: + query_fields: 用户查询涉及的字段名列表(可能包含敏感字段) + role: 用户角色(可选,默认从 token/g 解析) + + 返回: + { + 'blocked': bool, # 是否被拦截 + 'message': str | None, # AI 应返回给用户的错误信息(如果有) + } + """ + if DifyPermissionService.is_super_admin(role): + return {'blocked': False, 'message': None} + + if not role: + role = DifyPermissionService.get_user_role() + + if not role: + return { + 'blocked': True, + 'message': '无法识别您的身份,请重新登录后再试。' + } + + # 获取该角色的越权拦截配置 + forbidden_map = ROLE_FORBIDDEN_QUERIES.get(role.upper(), {}) + if not forbidden_map: + return {'blocked': False, 'message': None} + + # 规范化字段名(小写比较) + query_fields_lower = {f.lower() for f in query_fields} + + # 检查是否命中越权字段 + for forbidden_field, msg in forbidden_map.items(): + if forbidden_field.lower() in query_fields_lower: + current_app.logger.warning( + f"[DifyPermission] 越权查询拦截: role={role}, field={forbidden_field}" + ) + return {'blocked': True, 'message': msg} + + return {'blocked': False, 'message': None} + + @staticmethod + def filter_sensitive_fields(data: dict, role: str = None) -> dict: + """ + 根据用户角色对字典数据进行敏感字段脱敏。 + + - SUPER_ADMIN:不过滤,返回原数据 + - SALES / INBOUND / PURCHASER:按角色黑名单剔除敏感字段 + + 参数: + data: 原始数据字典 + role: 用户角色 + + 返回: + 脱敏后的数据字典(敏感字段被置为 None) + """ + if DifyPermissionService.is_super_admin(role): + return data + + if not role: + role = DifyPermissionService.get_user_role() + + if not role: + return data + + # 获取该角色的敏感字段黑名单 + sensitive = ROLE_SENSITIVE_FIELDS.get(role.upper(), frozenset()) + + # 如果没有敏感字段定义,不过滤 + if not sensitive: + return data + + # 深拷贝,避免修改原数据 + import copy + result = copy.deepcopy(data) + + # 将敏感字段置为 None + for field in sensitive: + if field in result: + result[field] = None + + # 如果有 children 数组(子件列表),递归脱敏 + if isinstance(result.get('children'), list): + result['children'] = [ + DifyPermissionService.filter_sensitive_fields(child, role) + for child in result['children'] + ] + + return result + + @staticmethod + def filter_sensitive_fields_in_list(data_list: list, role: str = None) -> list: + """ + 对列表数据批量应用敏感字段脱敏 + """ + if DifyPermissionService.is_super_admin(role): + return data_list + + if not role: + role = DifyPermissionService.get_user_role() + + return [ + DifyPermissionService.filter_sensitive_fields(item, role) + for item in data_list + ] \ No newline at end of file diff --git a/inventory-backend/app/utils/decorators.py b/inventory-backend/app/utils/decorators.py index 9331171..40c2aa8 100644 --- a/inventory-backend/app/utils/decorators.py +++ b/inventory-backend/app/utils/decorators.py @@ -5,6 +5,43 @@ from flask import jsonify, g, request, current_app, has_request_context import logging import json +def _verify_user_active(): + """ + JWT「幽灵令牌」安全漏洞修复: + 在 Token 签名验证通过之后,进一步检查用户在数据库中是否仍然存在且未被禁用。 + + 调用时机:login_required / permission_required 装饰器中, + 在 verify_jwt_in_request() 成功之后立即调用。 + + 返回 True → 用户正常,放行 + 返回 False → 用户已从数据库删除或被禁用,阻断请求 + """ + try: + claims = get_jwt() + user_id = claims.get('sub') + if user_id is None: + return True + + from app.models.system import SysUser + user = SysUser.query.get(user_id) + if user is None: + current_app.logger.warning( + f"🚫 [Ghost Token Blocked] user_id={user_id} not found in database (deleted account)" + ) + return False + + if user.status != 'active': + current_app.logger.warning( + f"🚫 [Token Blocked] user_id={user_id} status={user.status} (disabled account)" + ) + return False + + return True + except Exception as e: + current_app.logger.error(f"User active check error: {e}") + return True # 出错时 fail-open,避免数据库故障导致全站不可用 + + def _verify_token_in_redis(): """ 验证当前 Token 是否与 Redis 中存储的 Token 一致(单设备登录互踢) @@ -67,7 +104,10 @@ def role_required(*roles): return wrapper def login_required(fn): - """验证 JWT 令牌是否存在且有效""" + """ + 验证 JWT 令牌是否存在且有效,并检查用户是否仍在数据库中且未被禁用。 + 双重防护:1) Token 签名验证 2) 数据库用户存在性 3) Redis 单设备互踢 + """ @wraps(fn) def decorator(*args, **kwargs): try: @@ -76,6 +116,10 @@ def login_required(fn): logging.warning(f"JWT verification failed: {e}") return jsonify(msg='登录已过期,请重新登录'), 401 + # ★ 幽灵令牌漏洞修复:检查用户是否已从数据库删除或被禁用 + if not _verify_user_active(): + return jsonify(msg='账号已失效(已删除或已禁用),请重新登录'), 401 + if not _verify_token_in_redis(): return _raise_token_mismatch_error() @@ -83,7 +127,7 @@ def login_required(fn): return decorator def permission_required(permission_code): - """检查当前用户是否拥有指定权限码""" + """检查当前用户是否拥有指定权限码,同时检查用户是否仍然有效""" def wrapper(fn): @wraps(fn) def decorator(*args, **kwargs): @@ -93,6 +137,10 @@ def permission_required(permission_code): logging.warning(f"JWT verification failed: {e}") return jsonify(msg='登录已过期,请重新登录'), 401 + # ★ 幽灵令牌漏洞修复:检查用户是否已从数据库删除或被禁用 + if not _verify_user_active(): + return jsonify(msg='账号已失效(已删除或已禁用),请重新登录'), 401 + if not _verify_token_in_redis(): return _raise_token_mismatch_error() diff --git a/inventory-web/index.html b/inventory-web/index.html index 053119c..62aed23 100644 --- a/inventory-web/index.html +++ b/inventory-web/index.html @@ -10,56 +10,71 @@
+ // 获取当前用户的登录凭证 (Token) + var currentToken = localStorage.getItem('access_token') || localStorage.getItem('token') || ''; - + window.difyChatbotConfig = { + token: '6T0eTgukUEqzK0iW', + baseUrl: 'http://172.16.0.198:8080', + inputs: { + "user_token": currentToken + }, + systemVariables: {}, + userVariables: {}, + } + - + + + }); + }); +