90 lines
3.1 KiB
Python
90 lines
3.1 KiB
Python
# app/utils/decorators.py
|
|
from functools import wraps
|
|
from flask_jwt_extended import get_jwt, verify_jwt_in_request
|
|
from flask import jsonify, g
|
|
import logging
|
|
|
|
|
|
def role_required(*roles):
|
|
"""
|
|
自定义装饰器:检查用户角色
|
|
使用方法: @role_required('super_admin', 'finance')
|
|
"""
|
|
|
|
def wrapper(fn):
|
|
@wraps(fn)
|
|
def decorator(*args, **kwargs):
|
|
claims = get_jwt()
|
|
user_role = claims.get('role')
|
|
user_role_upper = user_role.upper() if user_role else None
|
|
|
|
# 如果是超级管理员,拥有上帝视角,直接放行 (可选)
|
|
if user_role_upper == 'SUPER_ADMIN':
|
|
return fn(*args, **kwargs)
|
|
|
|
if user_role_upper not in [r.upper() for r in roles]:
|
|
return jsonify(msg='权限不足:您没有访问此资源的权限'), 403
|
|
|
|
return fn(*args, **kwargs)
|
|
|
|
return decorator
|
|
|
|
return wrapper
|
|
|
|
|
|
def login_required(fn):
|
|
"""
|
|
验证 JWT 令牌是否存在且有效
|
|
"""
|
|
@wraps(fn)
|
|
def decorator(*args, **kwargs):
|
|
try:
|
|
verify_jwt_in_request()
|
|
except Exception as e:
|
|
logging.warning(f"JWT verification failed: {e}")
|
|
return jsonify(msg='登录已过期,请重新登录'), 401
|
|
return fn(*args, **kwargs)
|
|
return decorator
|
|
|
|
|
|
def permission_required(permission_code):
|
|
"""
|
|
检查当前用户是否拥有指定权限码
|
|
使用方法: @permission_required('material:base:read')
|
|
"""
|
|
def wrapper(fn):
|
|
@wraps(fn)
|
|
def decorator(*args, **kwargs):
|
|
# 首先验证 JWT
|
|
try:
|
|
verify_jwt_in_request()
|
|
except Exception as e:
|
|
logging.warning(f"JWT verification failed: {e}")
|
|
return jsonify(msg='登录已过期,请重新登录'), 401
|
|
|
|
claims = get_jwt()
|
|
user_role = claims.get('role')
|
|
# 超级管理员放行 (忽略大小写)
|
|
if user_role and user_role.upper() == 'SUPER_ADMIN':
|
|
return fn(*args, **kwargs)
|
|
|
|
# 根据角色查询数据库中的权限
|
|
try:
|
|
from app.services.auth_service import AuthService
|
|
perm_dict = AuthService.get_user_permissions(user_role)
|
|
except Exception as e:
|
|
logging.warning(f"Failed to fetch permissions for role {user_role}: {e}")
|
|
return jsonify(msg='权限查询失败'), 403
|
|
|
|
# 合并菜单和元素权限
|
|
all_perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
|
|
if permission_code not in all_perms:
|
|
# 详细的调试日志
|
|
print(f"🔴 [权限拦截] 角色 '{user_role}' 访问被拒!需要权限码: '{permission_code}', 但该角色实际拥有: {all_perms}")
|
|
logging.warning(
|
|
f"权限检查失败: 角色={user_role}, 所需权限={permission_code}, 实际权限列表={all_perms}")
|
|
return jsonify(msg='权限不足:您没有访问此资源的权限'), 403
|
|
return fn(*args, **kwargs)
|
|
return decorator
|
|
return wrapper
|