refactor: global import cleanup and stability fix for auth module
This commit is contained in:
@ -1,7 +1,24 @@
|
||||
# app/api/v1/auth.py
|
||||
# ==============================================================================
|
||||
# Flask & Extensions
|
||||
# ==============================================================================
|
||||
from flask import Blueprint, request, jsonify, current_app
|
||||
from flask_jwt_extended import jwt_required, get_jwt, get_jwt_identity
|
||||
from flask_jwt_extended import (
|
||||
jwt_required,
|
||||
get_jwt,
|
||||
get_jwt_identity,
|
||||
create_access_token,
|
||||
)
|
||||
from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden, NotFound, InternalServerError
|
||||
|
||||
# ==============================================================================
|
||||
# Database & Extensions
|
||||
# ==============================================================================
|
||||
from app.extensions import db
|
||||
|
||||
# ==============================================================================
|
||||
# Service & Decorators
|
||||
# ==============================================================================
|
||||
from app.services.auth_service import AuthService
|
||||
from app.utils.decorators import permission_required, audit_log
|
||||
|
||||
@ -53,6 +70,9 @@ def filter_item_by_permissions(item_dict, user_permissions):
|
||||
return item_dict
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 登录接口
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/login', methods=['POST'])
|
||||
def login():
|
||||
try:
|
||||
@ -74,17 +94,19 @@ def login():
|
||||
return jsonify(response_data), 200
|
||||
|
||||
except ValueError as ve:
|
||||
current_app.logger.warning(f"Login validation failed: {str(ve)}")
|
||||
return jsonify({'msg': str(ve)}), 401
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Login Failed Error: {str(e)}")
|
||||
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Token 刷新接口
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/refresh', methods=['POST'])
|
||||
def refresh():
|
||||
"""
|
||||
使用 refresh_token 换发新的 access_token
|
||||
"""
|
||||
"""使用 refresh_token 换发新的 access_token"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data or not data.get('refresh_token'):
|
||||
@ -99,12 +121,16 @@ def refresh():
|
||||
}), 200
|
||||
|
||||
except ValueError as ve:
|
||||
current_app.logger.warning(f"Token refresh validation failed: {str(ve)}")
|
||||
return jsonify({'msg': str(ve)}), 401
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Token Refresh Error: {str(e)}")
|
||||
return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 创建用户(管理员)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/user/create', methods=['POST'])
|
||||
@jwt_required()
|
||||
@permission_required('system_user:operation')
|
||||
@ -120,7 +146,6 @@ def create_user():
|
||||
user_permissions = get_current_user_permissions()
|
||||
# 超级管理员不过滤
|
||||
if 'system_user:*' not in user_permissions:
|
||||
# 字段名到权限码的映射
|
||||
field_to_perm = {
|
||||
'cn_name': 'system_user:username',
|
||||
'username': 'system_user:username',
|
||||
@ -129,13 +154,9 @@ def create_user():
|
||||
'role': 'system_user:role',
|
||||
'email': 'system_user:email',
|
||||
}
|
||||
# 对于 password 字段,如果没有对应权限但用户有操作权限,可以保留(由装饰器保证)
|
||||
# 但如果连操作权限都没有,则不会进入此接口。
|
||||
for field in list(data.keys()):
|
||||
perm_code = field_to_perm.get(field)
|
||||
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
|
||||
if field == 'password':
|
||||
# 检查用户是否有操作权限,如果有则保留
|
||||
if 'system_user:operation' not in user_permissions:
|
||||
data.pop(field, None)
|
||||
continue
|
||||
@ -153,7 +174,9 @@ def create_user():
|
||||
return jsonify({'msg': str(e)}), 400
|
||||
|
||||
|
||||
# [新增] 更新用户
|
||||
# ==============================================================================
|
||||
# 更新用户(管理员)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
|
||||
@jwt_required()
|
||||
@permission_required('system_user:operation')
|
||||
@ -165,11 +188,8 @@ def create_user():
|
||||
def update_user(user_id):
|
||||
try:
|
||||
data = request.get_json()
|
||||
# 数据清洗:移除用户没有权限的字段
|
||||
user_permissions = get_current_user_permissions()
|
||||
# 超级管理员不过滤
|
||||
if 'system_user:*' not in user_permissions:
|
||||
# 字段名到权限码的映射
|
||||
field_to_perm = {
|
||||
'cn_name': 'system_user:username',
|
||||
'username': 'system_user:username',
|
||||
@ -180,9 +200,7 @@ def update_user(user_id):
|
||||
}
|
||||
for field in list(data.keys()):
|
||||
perm_code = field_to_perm.get(field)
|
||||
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
|
||||
if field == 'password':
|
||||
# 检查用户是否有操作权限,如果有则保留
|
||||
if 'system_user:operation' not in user_permissions:
|
||||
data.pop(field, None)
|
||||
continue
|
||||
@ -200,21 +218,26 @@ def update_user(user_id):
|
||||
return jsonify({'msg': str(e)}), 400
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 获取所有用户列表(管理员)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/users', methods=['GET'])
|
||||
@jwt_required()
|
||||
@permission_required('system_user')
|
||||
def get_users():
|
||||
try:
|
||||
users = AuthService.get_all_users()
|
||||
# 字段级脱敏
|
||||
user_permissions = get_current_user_permissions()
|
||||
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
|
||||
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Get Users Failed: {str(e)}")
|
||||
return jsonify({'msg': '获取用户列表失败'}), 500
|
||||
return jsonify({'msg': f'获取用户列表失败: {str(e)}'}), 500
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 删除用户(管理员)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
|
||||
@jwt_required()
|
||||
@permission_required('system_user:operation')
|
||||
@ -235,6 +258,9 @@ def delete_user(user_id):
|
||||
return jsonify({'msg': str(e)}), 400
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 获取当前用户权限列表(登录后调用)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/my-permissions', methods=['GET'])
|
||||
@jwt_required()
|
||||
def get_my_permissions():
|
||||
@ -242,16 +268,63 @@ def get_my_permissions():
|
||||
try:
|
||||
claims = get_jwt()
|
||||
role = claims.get('role')
|
||||
|
||||
# 调用 Service 获取权限
|
||||
permissions = AuthService.get_user_permissions(role)
|
||||
|
||||
return jsonify({'msg': '获取成功', 'data': permissions}), 200
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
|
||||
return jsonify({'msg': '获取权限失败'}), 500
|
||||
return jsonify({'msg': f'获取权限失败: {str(e)}'}), 500
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 获取当前用户个人资料(自我查看)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/me', methods=['GET'])
|
||||
@jwt_required()
|
||||
def get_my_profile():
|
||||
"""
|
||||
【重构】获取当前登录用户的个人资料(自我查看)
|
||||
- 只返回姓名/账号和所属部门
|
||||
- 严格脱敏:不暴露系统角色字段
|
||||
"""
|
||||
try:
|
||||
from app.models.system import SysUser
|
||||
|
||||
user_id = get_jwt_identity()
|
||||
|
||||
# 超级管理员(user_id=0)
|
||||
if user_id == 0:
|
||||
return jsonify({
|
||||
'msg': '获取成功',
|
||||
'data': {
|
||||
'id': 0,
|
||||
'username': 'IRIS',
|
||||
'display_name': '超级管理员(IRIS)',
|
||||
'department': 'System',
|
||||
}
|
||||
}), 200
|
||||
|
||||
user = SysUser.query.get(user_id)
|
||||
if not user:
|
||||
return jsonify({'msg': '用户不存在'}), 404
|
||||
|
||||
return jsonify({
|
||||
'msg': '获取成功',
|
||||
'data': {
|
||||
'id': user.id,
|
||||
'username': user.username.split('/')[1] if '/' in user.username else user.username,
|
||||
'display_name': user.username.split('/')[0] if '/' in user.username else user.username,
|
||||
'department': user.department or '-',
|
||||
}
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Get Profile Failed: {str(e)}")
|
||||
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 自我修改密码(无需旧密码)
|
||||
# ==============================================================================
|
||||
@auth_bp.route('/me/password', methods=['PUT'])
|
||||
@jwt_required()
|
||||
def change_my_password():
|
||||
@ -259,13 +332,10 @@ def change_my_password():
|
||||
【重构】自我修改密码接口
|
||||
- 无需管理员权限,无需旧密码
|
||||
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码
|
||||
- 使用 get_jwt_identity() 获取用户 ID,与项目其他接口保持一致
|
||||
"""
|
||||
try:
|
||||
from app.models.system import SysUser
|
||||
|
||||
# 【关键修复】使用 get_jwt_identity() 而非 claims.get('sub'),
|
||||
# 与项目其他接口(outbound.py / scrap.py 等)保持一致,避免 JWT sub 字段取不到导致 500
|
||||
user_id = get_jwt_identity()
|
||||
|
||||
data = request.get_json()
|
||||
@ -301,51 +371,3 @@ def change_my_password():
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Change Password Failed: {str(e)}")
|
||||
return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500
|
||||
|
||||
|
||||
@auth_bp.route('/me', methods=['GET'])
|
||||
@jwt_required()
|
||||
def get_my_profile():
|
||||
"""
|
||||
【重构】获取当前登录用户的个人资料(自我查看)
|
||||
- 只返回姓名/账号和所属部门
|
||||
- 严格脱敏:不暴露系统角色字段
|
||||
- 使用 get_jwt_identity() 获取用户 ID
|
||||
"""
|
||||
try:
|
||||
from app.models.system import SysUser
|
||||
|
||||
# 【关键修复】统一使用 get_jwt_identity()
|
||||
user_id = get_jwt_identity()
|
||||
|
||||
# 超级管理员(user_id=0)
|
||||
if user_id == 0:
|
||||
return jsonify({
|
||||
'msg': '获取成功',
|
||||
'data': {
|
||||
'id': 0,
|
||||
'username': 'IRIS',
|
||||
'display_name': '超级管理员(IRIS)',
|
||||
'department': 'System',
|
||||
# 【关键】严格脱敏:不暴露 role 字段
|
||||
}
|
||||
}), 200
|
||||
|
||||
user = SysUser.query.get(user_id)
|
||||
if not user:
|
||||
return jsonify({'msg': '用户不存在'}), 404
|
||||
|
||||
return jsonify({
|
||||
'msg': '获取成功',
|
||||
'data': {
|
||||
'id': user.id,
|
||||
'username': user.username.split('/')[1] if '/' in user.username else user.username,
|
||||
'display_name': user.username.split('/')[0] if '/' in user.username else user.username,
|
||||
'department': user.department or '-',
|
||||
# 【关键】严格脱敏:不暴露 role 字段
|
||||
}
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Get Profile Failed: {str(e)}")
|
||||
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500
|
||||
|
||||
Reference in New Issue
Block a user