Files
KCGL/inventory-backend/app/utils/decorators.py

131 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/utils/decorators.py
from functools import wraps
from flask_jwt_extended import get_jwt, verify_jwt_in_request, get_jwt_identity
from flask import jsonify, g, request, current_app, has_request_context
import logging
import json
def _verify_token_in_redis():
"""
验证当前 Token 是否与 Redis 中存储的 Token 一致(单设备登录互踢)
"""
from app.extensions import redis_client
from flask import current_app
if redis_client is None:
return True
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return True
request_token = auth_header[7:]
claims = get_jwt()
user_id = claims.get('sub')
if user_id is None:
return True
stored_token = redis_client.get(f"user_token_{user_id}")
if stored_token is None:
return True
if request_token != stored_token:
current_app.logger.warning(f"Token mismatch for user {user_id}: request token != stored token")
return False
return True
except Exception as e:
current_app.logger.error(f"Redis token verification error: {e}")
return True
def _raise_token_mismatch_error():
"""抛出 Token 不一致的错误"""
return jsonify({
'msg': '您的账号已在其他设备登录,请重新登录',
'code': 401,
'reason': 'token_mismatch'
}), 401
def role_required(*roles):
"""自定义装饰器:检查用户角色"""
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
claims = get_jwt()
user_role = claims.get('role')
user_role_upper = user_role.upper() if user_role else None
if user_role_upper == 'SUPER_ADMIN':
return fn(*args, **kwargs)
if user_role_upper not in [r.upper() for r in roles]:
return jsonify(msg='权限不足:您没有访问此资源的权限'), 403
return fn(*args, **kwargs)
return decorator
return wrapper
def login_required(fn):
"""验证 JWT 令牌是否存在且有效"""
@wraps(fn)
def decorator(*args, **kwargs):
try:
verify_jwt_in_request()
except Exception as e:
logging.warning(f"JWT verification failed: {e}")
return jsonify(msg='登录已过期,请重新登录'), 401
if not _verify_token_in_redis():
return _raise_token_mismatch_error()
return fn(*args, **kwargs)
return decorator
def permission_required(permission_code):
"""检查当前用户是否拥有指定权限码"""
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
try:
verify_jwt_in_request()
except Exception as e:
logging.warning(f"JWT verification failed: {e}")
return jsonify(msg='登录已过期,请重新登录'), 401
if not _verify_token_in_redis():
return _raise_token_mismatch_error()
claims = get_jwt()
user_role = claims.get('role')
if user_role and user_role.upper() == 'SUPER_ADMIN':
return fn(*args, **kwargs)
try:
from app.services.auth_service import AuthService
perm_dict = AuthService.get_user_permissions(user_role)
except Exception as e:
logging.warning(f"Failed to fetch permissions for role {user_role}: {e}")
return jsonify(msg='权限查询失败'), 403
all_perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
if permission_code not in all_perms:
logging.warning(f"权限检查失败: 角色={user_role}, 所需权限={permission_code}")
return jsonify(msg='权限不足:您没有访问此资源的权限'), 403
return fn(*args, **kwargs)
return decorator
return wrapper
def audit_log(module: str = None, action: str = None, get_target_id_fn=None, get_target_name_fn=None, get_details_fn=None):
"""
已废弃!
由 SQLAlchemy 底层监听器app/core/audit_listener.py全面接管审计日志入库。
此装饰器保留空壳以防项目中其他文件 import 引用时报错。
"""
def wrapper(fn):
from functools import wraps
@wraps(fn)
def decorator(*inner_args, **inner_kwargs):
return fn(*inner_args, **inner_kwargs)
return decorator
return wrapper