feat: 重构鉴权系统为双Token无感刷新,并增加前端Token过期安全预判机制
This commit is contained in:
@ -2,7 +2,7 @@
|
||||
from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission
|
||||
from app.extensions import db
|
||||
from sqlalchemy import func
|
||||
from flask_jwt_extended import create_access_token
|
||||
from flask_jwt_extended import create_access_token, create_refresh_token
|
||||
from app.utils.constants import UserRole
|
||||
from datetime import timedelta
|
||||
|
||||
@ -61,21 +61,67 @@ class AuthService:
|
||||
# Token 中 identity 存数据库ID,claims 存登录账号ID
|
||||
account_id = user_info.get('account_id', login_input)
|
||||
|
||||
# Access Token: 2小时有效期
|
||||
access_token = create_access_token(
|
||||
identity=user_id,
|
||||
additional_claims={
|
||||
'role': user_role,
|
||||
'username': account_id, # 存纯账号ID
|
||||
'display_name': user_info.get('username') # 存显示名
|
||||
},
|
||||
expires_delta=timedelta(days=7)
|
||||
}
|
||||
)
|
||||
|
||||
# Refresh Token: 7天有效期
|
||||
refresh_token = create_refresh_token(
|
||||
identity=user_id,
|
||||
additional_claims={
|
||||
'role': user_role,
|
||||
'username': account_id
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
'access_token': access_token,
|
||||
'refresh_token': refresh_token,
|
||||
'user': user_info
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def refresh_access_token(refresh_token_str):
|
||||
"""
|
||||
使用 refresh_token 换发新的 access_token
|
||||
"""
|
||||
from flask_jwt_extended import decode_token
|
||||
from flask import current_app
|
||||
|
||||
try:
|
||||
# 解码 refresh_token(不验证过期,仅获取 claims)
|
||||
decoded = decode_token(refresh_token_str)
|
||||
user_id = decoded.get('sub')
|
||||
role = decoded.get('role')
|
||||
username = decoded.get('username')
|
||||
display_name = decoded.get('display_name')
|
||||
|
||||
if not user_id:
|
||||
raise ValueError("无效的 refresh_token")
|
||||
|
||||
# 生成新的 access_token
|
||||
new_access_token = create_access_token(
|
||||
identity=user_id,
|
||||
additional_claims={
|
||||
'role': role,
|
||||
'username': username,
|
||||
'display_name': display_name
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
'access_token': new_access_token
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Token 刷新失败: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def create_user(data, operator_role):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user