fix: 修复 JWT 幽灵令牌漏洞,新增 Dify 权限过滤服务

This commit is contained in:
DXC
2026-05-18 16:16:50 +08:00
parent d1e49c343c
commit 3cb31c2b67
5 changed files with 481 additions and 54 deletions

View File

@ -2,6 +2,7 @@ from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate from flask_migrate import Migrate
from flask_cors import CORS from flask_cors import CORS
from flask_jwt_extended import JWTManager # 确保引入了 JWTManager from flask_jwt_extended import JWTManager # 确保引入了 JWTManager
from flask import current_app
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import redis import redis
@ -11,15 +12,78 @@ migrate = Migrate()
cors = CORS() cors = CORS()
jwt = JWTManager() # 必须实例化 jwt = JWTManager() # 必须实例化
# Redis 客户端 (单设备登录互踢用) # Redis 客户端 (单设备登录互踢 + JWT Token 黑名单用)
redis_client = None redis_client = None
# Redis Key 前缀
_JWT_BLOCKED_USER_PREFIX = "jwt_blocked_user:" # 存储被删除/禁用的 user_id
def beijing_time(): def beijing_time():
"""获取北京时间 (UTC+8),剥离时区信息以兼容数据库 naive DateTime 字段""" """获取北京时间 (UTC+8),剥离时区信息以兼容数据库 naive DateTime 字段"""
return datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) return datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
# =============================================================================
# 全局 JWT Token 黑名单拦截器
# 原理Flask-JWT-Extended 在每次 @jwt_required() 验证时,
# 会自动触发 token_in_blocklist_loader 回调。
# 若该回调返回 True命中黑名单请求直接被 401 拒绝,后续代码不会执行。
# =============================================================================
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload):
"""
全局 JWT 黑名单检查:每次 @jwt_required() 调用时自动触发。
无论 AIDify还是人类用户调用的接口均受此拦截。
检查逻辑:
1. 通过 jwt_payload['sub']user_id查询 Redis 黑名单
2. 若 user_id 存在于黑名单 → 返回 True → 请求被 401 拒绝
3. 若 Redis 不可用fail-open→ 放行(不影响正常业务)
"""
user_id = jwt_payload.get('sub')
if user_id is None:
return False
global redis_client
if redis_client is None:
return False
try:
blocked_key = f"{_JWT_BLOCKED_USER_PREFIX}{user_id}"
is_blocked = redis_client.exists(blocked_key)
if is_blocked:
current_app.logger.warning(
f"🚫 JWT revoked for deleted/disabled user: user_id={user_id}"
)
return bool(is_blocked)
except Exception as e:
current_app.logger.error(f"JWT blocklist check error: {e}")
return False # Redis 出错时 fail-open不阻断正常业务
def revoke_all_tokens_for_user(user_id):
"""
将指定用户的 ID 加入 JWT 黑名单14 天)。
效果:该用户的所有已发放 Token无论是否过期瞬间失效。
由 delete_user() / update_user(status!='active') 时调用。
"""
global redis_client
if redis_client is None:
current_app.logger.warning(
f"Redis unavailable, cannot revoke tokens for user_id={user_id}"
)
return
try:
blocked_key = f"{_JWT_BLOCKED_USER_PREFIX}{user_id}"
ttl_seconds = 14 * 24 * 3600 # 14 天,与 Refresh Token 有效期对齐
redis_client.setex(blocked_key, ttl_seconds, "1")
current_app.logger.info(f"✅ User {user_id} added to JWT blocklist (TTL={ttl_seconds}s)")
except Exception as e:
current_app.logger.error(f"Failed to revoke tokens for user_id={user_id}: {e}")
# 2. 定义初始化函数 (供工厂函数 create_app 调用) # 2. 定义初始化函数 (供工厂函数 create_app 调用)
def init_extensions(app): def init_extensions(app):
""" """

View File

@ -1,6 +1,6 @@
# app/services/auth_service.py # app/services/auth_service.py
from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission
from app.extensions import db, redis_client from app.extensions import db, redis_client, revoke_all_tokens_for_user
from sqlalchemy import func from sqlalchemy import func
from flask_jwt_extended import create_access_token, create_refresh_token, get_jwt_identity from flask_jwt_extended import create_access_token, create_refresh_token, get_jwt_identity
from flask import current_app from flask import current_app
@ -334,7 +334,11 @@ class AuthService:
user.email = email user.email = email
if 'status' in data: if 'status' in data:
user.status = data['status'] new_status = data['status']
# ★ 幽灵令牌漏洞修复:用户被禁用时,立即吊销其所有 Token
if new_status != 'active' and user.status == 'active':
revoke_all_tokens_for_user(user_id)
user.status = new_status
new_password = data.get('password') new_password = data.get('password')
if new_password and str(new_password).strip(): if new_password and str(new_password).strip():
@ -353,7 +357,7 @@ class AuthService:
@staticmethod @staticmethod
def delete_user(user_id, operator_role): def delete_user(user_id, operator_role):
"""删除用户""" """删除用户:删除前自动吊销该用户所有 JWT Token"""
# 标准化操作者角色为全大写 # 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper != UserRole.SUPER_ADMIN: if operator_role_upper != UserRole.SUPER_ADMIN:
@ -365,6 +369,18 @@ class AuthService:
# 提前获取用户名用于审计日志 # 提前获取用户名用于审计日志
username = user.username username = user.username
# ★ 幽灵令牌漏洞修复:删除用户前,先将 user_id 加入 JWT 黑名单
# 效果:该用户持有的所有 Token 瞬间失效,无论是否已过期
revoke_all_tokens_for_user(user_id)
# 清除 Redis 中的单设备登录 Token防止残留
if redis_client is not None:
try:
redis_client.delete(f"user_token_{user_id}")
except Exception as e:
current_app.logger.warning(f"Failed to delete user token from Redis: {e}")
db.session.delete(user) db.session.delete(user)
db.session.commit() db.session.commit()
return username return username

View File

@ -0,0 +1,284 @@
# app/services/dify_permission_service.py
"""
Dify 智能客服权限服务层
职责:
1. 从 JWT Token / g.dify_user_role 解析用户角色
2. 根据角色查询 sys_role_permission 表,获取用户拥有的 target_code
3. AI 专属的动态脱敏策略:
- SUPER_ADMIN无条件放行所有数据
- SALES / INBOUND剔除 price, cost, supplier 等敏感字段
- 跨模块越权查询:直接阻断,返回角色专属的错误信息给大模型
"""
from flask import g, current_app
from flask_jwt_extended import decode_token
from app.models.system import SysRolePermission
from app.services.auth_service import AuthService
from app.utils.constants import UserRole
from sqlalchemy import func
# ==============================================================================
# 角色敏感字段定义(按角色黑名单)
# ==============================================================================
# 每个角色在 AI 对话场景下,禁止查看的字段集合
ROLE_SENSITIVE_FIELDS = {
# 入库员:禁止查看采购相关金额
'INBOUND': frozenset([
'purchase_price',
'cost',
'price',
'supplier',
'supplier_id',
'unit_price',
'total_price',
'order_price',
'last_purchase_price',
'avg_purchase_price',
'supplier_name',
]),
# 销售员:禁止查看成本/采购相关数据
'SALES': frozenset([
'purchase_price',
'cost',
'unit_cost',
'supplier',
'supplier_id',
'unit_price',
'last_purchase_price',
'avg_purchase_price',
'supplier_name',
'stock_cost',
'total_cost',
'supply_price',
]),
# 采购员:禁止查看销售毛利相关数据
'PURCHASER': frozenset([
'sale_price',
'retail_price',
'suggested_price',
'margin',
'profit',
'profit_rate',
]),
}
# 跨模块越权查询拦截配置
# key: 角色, value: { 尝试查询的字段: 给 AI 的回复 }
ROLE_FORBIDDEN_QUERIES = {
'INBOUND': {
'purchase_price': '抱歉,您当前的角色(入库员)无权查看采购金额数据。',
'cost': '抱歉,您当前的角色(入库员)无权查看采购成本数据。',
'supplier': '抱歉,您当前的角色(入库员)无权查看供应商数据。',
'unit_price': '抱歉,您当前的角色(入库员)无权查看采购单价数据。',
'last_purchase_price': '抱歉,您当前的角色(入库员)无权查看历史采购价数据。',
},
'SALES': {
'purchase_price': '抱歉,您当前的角色(销售员)无权查看采购成本数据。',
'cost': '抱歉,您当前的角色(销售员)无权查看成本数据。',
'supplier': '抱歉,您当前的角色(销售员)无权查看供应商信息。',
'unit_cost': '抱歉,您当前的角色(销售员)无权查看单位成本数据。',
'last_purchase_price': '抱歉,您当前的角色(销售员)无权查看历史采购价数据。',
},
'PURCHASER': {
'sale_price': '抱歉,您当前的角色(采购员)无权查看销售价格数据。',
'retail_price': '抱歉,您当前的角色(采购员)无权查看零售价数据。',
'margin': '抱歉,您当前的角色(采购员)无权查看毛利数据。',
'profit': '抱歉,您当前的角色(采购员)无权查看利润数据。',
},
}
class DifyPermissionService:
"""Dify AI 专属权限服务"""
@staticmethod
def get_user_role(token: str = None) -> str:
"""
从 JWT Token 或 g.dify_user_role 解析用户角色
返回标准化的大写角色码(如 'INBOUND', 'SALES', 'SUPER_ADMIN'
"""
user_role = None
# 优先从 g 对象获取(由 dify_auth_required 存入)
if hasattr(g, 'dify_user_role') and g.dify_user_role:
return g.dify_user_role
# 从传入的 token 解码
if token:
try:
claims = decode_token(token)
user_role = claims.get('role')
except Exception as e:
current_app.logger.warning(f"[DifyPermission] token 解码失败: {e}")
return ''
return (user_role or '').upper()
@staticmethod
def is_super_admin(role: str = None) -> bool:
"""判断是否为超级管理员"""
if not role:
role = DifyPermissionService.get_user_role()
return role.upper() == UserRole.SUPER_ADMIN if role else False
@staticmethod
def get_role_permissions(role: str = None) -> dict:
"""
获取指定角色的所有权限代码列表
内部调用 AuthService.get_user_permissions()
"""
if not role:
role = DifyPermissionService.get_user_role()
return AuthService.get_user_permissions(role)
@staticmethod
def get_target_codes(role: str = None) -> list:
"""
获取用户角色在 sys_role_permission 表中的所有 target_code
包括 menu 和 element 两种类型
"""
if not role:
role = DifyPermissionService.get_user_role()
if not role:
return []
# 超级管理员拥有所有权限
if DifyPermissionService.is_super_admin(role):
return ['*']
# 从数据库查询
try:
perms = SysRolePermission.query.filter(
func.upper(SysRolePermission.role_code) == role.upper()
).all()
return [p.target_code for p in perms]
except Exception as e:
current_app.logger.error(f"[DifyPermission] 查询 target_code 失败: {e}")
return []
@staticmethod
def has_permission(permission_code: str, role: str = None) -> bool:
"""
检查用户是否拥有指定权限码
超级管理员永远返回 True
"""
if DifyPermissionService.is_super_admin(role):
return True
if not role:
role = DifyPermissionService.get_user_role()
perms = DifyPermissionService.get_role_permissions(role)
all_perms = perms.get('menus', []) + perms.get('elements', [])
return permission_code in all_perms
@staticmethod
def check_forbidden_query(query_fields: list, role: str = None) -> dict:
"""
检查用户尝试查询的字段是否越权。
参数:
query_fields: 用户查询涉及的字段名列表(可能包含敏感字段)
role: 用户角色(可选,默认从 token/g 解析)
返回:
{
'blocked': bool, # 是否被拦截
'message': str | None, # AI 应返回给用户的错误信息(如果有)
}
"""
if DifyPermissionService.is_super_admin(role):
return {'blocked': False, 'message': None}
if not role:
role = DifyPermissionService.get_user_role()
if not role:
return {
'blocked': True,
'message': '无法识别您的身份,请重新登录后再试。'
}
# 获取该角色的越权拦截配置
forbidden_map = ROLE_FORBIDDEN_QUERIES.get(role.upper(), {})
if not forbidden_map:
return {'blocked': False, 'message': None}
# 规范化字段名(小写比较)
query_fields_lower = {f.lower() for f in query_fields}
# 检查是否命中越权字段
for forbidden_field, msg in forbidden_map.items():
if forbidden_field.lower() in query_fields_lower:
current_app.logger.warning(
f"[DifyPermission] 越权查询拦截: role={role}, field={forbidden_field}"
)
return {'blocked': True, 'message': msg}
return {'blocked': False, 'message': None}
@staticmethod
def filter_sensitive_fields(data: dict, role: str = None) -> dict:
"""
根据用户角色对字典数据进行敏感字段脱敏。
- SUPER_ADMIN不过滤返回原数据
- SALES / INBOUND / PURCHASER按角色黑名单剔除敏感字段
参数:
data: 原始数据字典
role: 用户角色
返回:
脱敏后的数据字典(敏感字段被置为 None
"""
if DifyPermissionService.is_super_admin(role):
return data
if not role:
role = DifyPermissionService.get_user_role()
if not role:
return data
# 获取该角色的敏感字段黑名单
sensitive = ROLE_SENSITIVE_FIELDS.get(role.upper(), frozenset())
# 如果没有敏感字段定义,不过滤
if not sensitive:
return data
# 深拷贝,避免修改原数据
import copy
result = copy.deepcopy(data)
# 将敏感字段置为 None
for field in sensitive:
if field in result:
result[field] = None
# 如果有 children 数组(子件列表),递归脱敏
if isinstance(result.get('children'), list):
result['children'] = [
DifyPermissionService.filter_sensitive_fields(child, role)
for child in result['children']
]
return result
@staticmethod
def filter_sensitive_fields_in_list(data_list: list, role: str = None) -> list:
"""
对列表数据批量应用敏感字段脱敏
"""
if DifyPermissionService.is_super_admin(role):
return data_list
if not role:
role = DifyPermissionService.get_user_role()
return [
DifyPermissionService.filter_sensitive_fields(item, role)
for item in data_list
]

View File

@ -5,6 +5,43 @@ from flask import jsonify, g, request, current_app, has_request_context
import logging import logging
import json import json
def _verify_user_active():
"""
JWT「幽灵令牌」安全漏洞修复
在 Token 签名验证通过之后,进一步检查用户在数据库中是否仍然存在且未被禁用。
调用时机login_required / permission_required 装饰器中,
在 verify_jwt_in_request() 成功之后立即调用。
返回 True → 用户正常,放行
返回 False → 用户已从数据库删除或被禁用,阻断请求
"""
try:
claims = get_jwt()
user_id = claims.get('sub')
if user_id is None:
return True
from app.models.system import SysUser
user = SysUser.query.get(user_id)
if user is None:
current_app.logger.warning(
f"🚫 [Ghost Token Blocked] user_id={user_id} not found in database (deleted account)"
)
return False
if user.status != 'active':
current_app.logger.warning(
f"🚫 [Token Blocked] user_id={user_id} status={user.status} (disabled account)"
)
return False
return True
except Exception as e:
current_app.logger.error(f"User active check error: {e}")
return True # 出错时 fail-open避免数据库故障导致全站不可用
def _verify_token_in_redis(): def _verify_token_in_redis():
""" """
验证当前 Token 是否与 Redis 中存储的 Token 一致(单设备登录互踢) 验证当前 Token 是否与 Redis 中存储的 Token 一致(单设备登录互踢)
@ -67,7 +104,10 @@ def role_required(*roles):
return wrapper return wrapper
def login_required(fn): def login_required(fn):
"""验证 JWT 令牌是否存在且有效""" """
验证 JWT 令牌是否存在且有效,并检查用户是否仍在数据库中且未被禁用。
双重防护1) Token 签名验证 2) 数据库用户存在性 3) Redis 单设备互踢
"""
@wraps(fn) @wraps(fn)
def decorator(*args, **kwargs): def decorator(*args, **kwargs):
try: try:
@ -76,6 +116,10 @@ def login_required(fn):
logging.warning(f"JWT verification failed: {e}") logging.warning(f"JWT verification failed: {e}")
return jsonify(msg='登录已过期,请重新登录'), 401 return jsonify(msg='登录已过期,请重新登录'), 401
# ★ 幽灵令牌漏洞修复:检查用户是否已从数据库删除或被禁用
if not _verify_user_active():
return jsonify(msg='账号已失效(已删除或已禁用),请重新登录'), 401
if not _verify_token_in_redis(): if not _verify_token_in_redis():
return _raise_token_mismatch_error() return _raise_token_mismatch_error()
@ -83,7 +127,7 @@ def login_required(fn):
return decorator return decorator
def permission_required(permission_code): def permission_required(permission_code):
"""检查当前用户是否拥有指定权限码""" """检查当前用户是否拥有指定权限码,同时检查用户是否仍然有效"""
def wrapper(fn): def wrapper(fn):
@wraps(fn) @wraps(fn)
def decorator(*args, **kwargs): def decorator(*args, **kwargs):
@ -93,6 +137,10 @@ def permission_required(permission_code):
logging.warning(f"JWT verification failed: {e}") logging.warning(f"JWT verification failed: {e}")
return jsonify(msg='登录已过期,请重新登录'), 401 return jsonify(msg='登录已过期,请重新登录'), 401
# ★ 幽灵令牌漏洞修复:检查用户是否已从数据库删除或被禁用
if not _verify_user_active():
return jsonify(msg='账号已失效(已删除或已禁用),请重新登录'), 401
if not _verify_token_in_redis(): if not _verify_token_in_redis():
return _raise_token_mismatch_error() return _raise_token_mismatch_error()

View File

@ -10,56 +10,71 @@
<div id="app"></div> <div id="app"></div>
<script type="module" src="/src/main.ts"></script> <script type="module" src="/src/main.ts"></script>
<script> <script>
window.difyChatbotConfig = { // 获取当前用户的登录凭证 (Token)
token: 'Zp6B44AgCUPKprFG', var currentToken = localStorage.getItem('access_token') || localStorage.getItem('token') || '';
baseUrl: 'http://172.16.0.198:8080',
inputs: {},
systemVariables: {},
userVariables: {},
};
</script>
<script window.difyChatbotConfig = {
src="http://172.16.0.198:8080/embed.min.js" token: '6T0eTgukUEqzK0iW',
id="Zp6B44AgCUPKprFG" baseUrl: 'http://172.16.0.198:8080',
defer> inputs: {
</script> "user_token": currentToken
},
systemVariables: {},
userVariables: {},
}
</script>
<style> <script
#dify-chatbot-bubble-button { src="http://172.16.0.198:8080/embed.min.js"
background-color: #409EFF !important; id="6T0eTgukUEqzK0iW"
box-shadow: 0 4px 12px rgba(64, 158, 255, 0.4) !important; defer>
</script>
<style>
#dify-chatbot-bubble-button {
background-color: #409EFF !important;
box-shadow: 0 4px 12px rgba(64, 158, 255, 0.4) !important;
}
#dify-chatbot-bubble-window {
width: 28rem !important;
height: 42rem !important;
border-radius: 12px !important;
box-shadow: 0 8px 24px rgba(0, 0, 0, 0.12) !important;
/* 👇 核心:开启拖拽改变大小功能 👇 */
resize: both !important;
overflow: hidden !important; /* 必须配合 overflow: hidden 才能生效 */
/* 防止缩得太小或拉得太大导致页面崩坏 */
min-width: 24rem !important;
min-height: 35rem !important;
max-width: 90vw !important; /* 最大不超过屏幕宽度的 90% */
max-height: 90vh !important; /* 最大不超过屏幕高度的 90% */
}
/* 确保内部的网页容器 100% 填满外壳,丝滑缩放 */
#dify-chatbot-bubble-window iframe {
width: 100% !important;
height: 100% !important;
}
</style>
<script>
document.addEventListener('DOMContentLoaded', function() {
document.addEventListener('click', function(event) {
var bubbleWindow = document.getElementById('dify-chatbot-bubble-window');
var bubbleButton = document.getElementById('dify-chatbot-bubble-button');
if (bubbleWindow && bubbleButton) {
var isWindowOpen = window.getComputedStyle(bubbleWindow).display !== 'none';
if (isWindowOpen && !bubbleWindow.contains(event.target) && !bubbleButton.contains(event.target)) {
bubbleButton.click();
}
} }
});
#dify-chatbot-bubble-window { });
width: 28rem !important; </script>
height: 42rem !important;
border-radius: 12px !important;
box-shadow: 0 8px 24px rgba(0, 0, 0, 0.12) !important;
}
</style>
<script>
// 等待页面加载完毕
document.addEventListener('DOMContentLoaded', function() {
// 给整个网页添加点击监听器
document.addEventListener('click', function(event) {
// 获取 Dify 的聊天窗口和按钮元素
var bubbleWindow = document.getElementById('dify-chatbot-bubble-window');
var bubbleButton = document.getElementById('dify-chatbot-bubble-button');
if (bubbleWindow && bubbleButton) {
// 判断窗口当前是否处于打开状态 (不为 none 说明是打开的)
var isWindowOpen = window.getComputedStyle(bubbleWindow).display !== 'none';
// 如果窗口是打开的,并且点击的位置既不在窗口内,也不在按钮上
if (isWindowOpen && !bubbleWindow.contains(event.target) && !bubbleButton.contains(event.target)) {
// 模拟点击按钮,关闭窗口
bubbleButton.click();
}
}
});
});
</script>
</body> </body>
</html> </html>