Compare commits

4 Commits

View File

@ -1,6 +1,24 @@
# app/api/v1/auth.py # app/api/v1/auth.py
# ==============================================================================
# Flask & Extensions
# ==============================================================================
from flask import Blueprint, request, jsonify, current_app from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt from flask_jwt_extended import (
jwt_required,
get_jwt,
get_jwt_identity,
create_access_token,
)
from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden, NotFound, InternalServerError
# ==============================================================================
# Database & Extensions
# ==============================================================================
from app.extensions import db
# ==============================================================================
# Service & Decorators
# ==============================================================================
from app.services.auth_service import AuthService from app.services.auth_service import AuthService
from app.utils.decorators import permission_required, audit_log from app.utils.decorators import permission_required, audit_log
@ -52,6 +70,9 @@ def filter_item_by_permissions(item_dict, user_permissions):
return item_dict return item_dict
# ==============================================================================
# 登录接口
# ==============================================================================
@auth_bp.route('/login', methods=['POST']) @auth_bp.route('/login', methods=['POST'])
def login(): def login():
try: try:
@ -73,17 +94,19 @@ def login():
return jsonify(response_data), 200 return jsonify(response_data), 200
except ValueError as ve: except ValueError as ve:
current_app.logger.warning(f"Login validation failed: {str(ve)}")
return jsonify({'msg': str(ve)}), 401 return jsonify({'msg': str(ve)}), 401
except Exception as e: except Exception as e:
current_app.logger.error(f"Login Failed Error: {str(e)}") current_app.logger.error(f"Login Failed Error: {str(e)}")
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500 return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
# ==============================================================================
# Token 刷新接口
# ==============================================================================
@auth_bp.route('/refresh', methods=['POST']) @auth_bp.route('/refresh', methods=['POST'])
def refresh(): def refresh():
""" """使用 refresh_token 换发新的 access_token"""
使用 refresh_token 换发新的 access_token
"""
try: try:
data = request.get_json() data = request.get_json()
if not data or not data.get('refresh_token'): if not data or not data.get('refresh_token'):
@ -98,12 +121,16 @@ def refresh():
}), 200 }), 200
except ValueError as ve: except ValueError as ve:
current_app.logger.warning(f"Token refresh validation failed: {str(ve)}")
return jsonify({'msg': str(ve)}), 401 return jsonify({'msg': str(ve)}), 401
except Exception as e: except Exception as e:
current_app.logger.error(f"Token Refresh Error: {str(e)}") current_app.logger.error(f"Token Refresh Error: {str(e)}")
return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500 return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500
# ==============================================================================
# 创建用户(管理员)
# ==============================================================================
@auth_bp.route('/user/create', methods=['POST']) @auth_bp.route('/user/create', methods=['POST'])
@jwt_required() @jwt_required()
@permission_required('system_user:operation') @permission_required('system_user:operation')
@ -119,7 +146,6 @@ def create_user():
user_permissions = get_current_user_permissions() user_permissions = get_current_user_permissions()
# 超级管理员不过滤 # 超级管理员不过滤
if 'system_user:*' not in user_permissions: if 'system_user:*' not in user_permissions:
# 字段名到权限码的映射
field_to_perm = { field_to_perm = {
'cn_name': 'system_user:username', 'cn_name': 'system_user:username',
'username': 'system_user:username', 'username': 'system_user:username',
@ -128,13 +154,9 @@ def create_user():
'role': 'system_user:role', 'role': 'system_user:role',
'email': 'system_user:email', 'email': 'system_user:email',
} }
# 对于 password 字段,如果没有对应权限但用户有操作权限,可以保留(由装饰器保证)
# 但如果连操作权限都没有,则不会进入此接口。
for field in list(data.keys()): for field in list(data.keys()):
perm_code = field_to_perm.get(field) perm_code = field_to_perm.get(field)
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
if field == 'password': if field == 'password':
# 检查用户是否有操作权限,如果有则保留
if 'system_user:operation' not in user_permissions: if 'system_user:operation' not in user_permissions:
data.pop(field, None) data.pop(field, None)
continue continue
@ -152,7 +174,9 @@ def create_user():
return jsonify({'msg': str(e)}), 400 return jsonify({'msg': str(e)}), 400
# [新增] 更新用户 # ==============================================================================
# 更新用户(管理员)
# ==============================================================================
@auth_bp.route('/user/<int:user_id>', methods=['PUT']) @auth_bp.route('/user/<int:user_id>', methods=['PUT'])
@jwt_required() @jwt_required()
@permission_required('system_user:operation') @permission_required('system_user:operation')
@ -164,11 +188,8 @@ def create_user():
def update_user(user_id): def update_user(user_id):
try: try:
data = request.get_json() data = request.get_json()
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions() user_permissions = get_current_user_permissions()
# 超级管理员不过滤
if 'system_user:*' not in user_permissions: if 'system_user:*' not in user_permissions:
# 字段名到权限码的映射
field_to_perm = { field_to_perm = {
'cn_name': 'system_user:username', 'cn_name': 'system_user:username',
'username': 'system_user:username', 'username': 'system_user:username',
@ -179,9 +200,7 @@ def update_user(user_id):
} }
for field in list(data.keys()): for field in list(data.keys()):
perm_code = field_to_perm.get(field) perm_code = field_to_perm.get(field)
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
if field == 'password': if field == 'password':
# 检查用户是否有操作权限,如果有则保留
if 'system_user:operation' not in user_permissions: if 'system_user:operation' not in user_permissions:
data.pop(field, None) data.pop(field, None)
continue continue
@ -199,21 +218,26 @@ def update_user(user_id):
return jsonify({'msg': str(e)}), 400 return jsonify({'msg': str(e)}), 400
# ==============================================================================
# 获取所有用户列表(管理员)
# ==============================================================================
@auth_bp.route('/users', methods=['GET']) @auth_bp.route('/users', methods=['GET'])
@jwt_required() @jwt_required()
@permission_required('system_user') @permission_required('system_user')
def get_users(): def get_users():
try: try:
users = AuthService.get_all_users() users = AuthService.get_all_users()
# 字段级脱敏
user_permissions = get_current_user_permissions() user_permissions = get_current_user_permissions()
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users] filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200 return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
except Exception as e: except Exception as e:
current_app.logger.error(f"Get Users Failed: {str(e)}") current_app.logger.error(f"Get Users Failed: {str(e)}")
return jsonify({'msg': '获取用户列表失败'}), 500 return jsonify({'msg': f'获取用户列表失败: {str(e)}'}), 500
# ==============================================================================
# 删除用户(管理员)
# ==============================================================================
@auth_bp.route('/user/<int:user_id>', methods=['DELETE']) @auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
@jwt_required() @jwt_required()
@permission_required('system_user:operation') @permission_required('system_user:operation')
@ -234,6 +258,9 @@ def delete_user(user_id):
return jsonify({'msg': str(e)}), 400 return jsonify({'msg': str(e)}), 400
# ==============================================================================
# 获取当前用户权限列表(登录后调用)
# ==============================================================================
@auth_bp.route('/my-permissions', methods=['GET']) @auth_bp.route('/my-permissions', methods=['GET'])
@jwt_required() @jwt_required()
def get_my_permissions(): def get_my_permissions():
@ -241,30 +268,75 @@ def get_my_permissions():
try: try:
claims = get_jwt() claims = get_jwt()
role = claims.get('role') role = claims.get('role')
# 调用 Service 获取权限
permissions = AuthService.get_user_permissions(role) permissions = AuthService.get_user_permissions(role)
return jsonify({'msg': '获取成功', 'data': permissions}), 200 return jsonify({'msg': '获取成功', 'data': permissions}), 200
except Exception as e: except Exception as e:
current_app.logger.error(f"Get Permissions Failed: {str(e)}") current_app.logger.error(f"Get Permissions Failed: {str(e)}")
return jsonify({'msg': '获取权限失败'}), 500 return jsonify({'msg': f'获取权限失败: {str(e)}'}), 500
@auth_bp.route('/me/password', methods=['PUT']) # ==============================================================================
# 获取当前用户个人资料(自我查看)
# ==============================================================================
@auth_bp.route('/me', methods=['GET'])
@jwt_required() @jwt_required()
def change_my_password(): def get_my_profile():
""" """
改造】自我修改密码接口 重构】获取当前登录用户的个人资料(自我查看)
- 无需管理员权限,无需旧密码 - 只返回姓名/账号和所属部门
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码 - 严格脱敏:不暴露系统角色字段
- 字段脱敏:不暴露系统角色
""" """
try: try:
from app.models.system import SysUser from app.models.system import SysUser
claims = get_jwt() user_id = get_jwt_identity()
user_id = claims.get('sub')
# 超级管理员user_id=0
if user_id == 0:
return jsonify({
'msg': '获取成功',
'data': {
'id': 0,
'username': 'IRIS',
'display_name': '超级管理员(IRIS)',
'department': 'System',
}
}), 200
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
return jsonify({
'msg': '获取成功',
'data': {
'id': user.id,
'username': user.username.split('/')[1] if '/' in user.username else user.username,
'display_name': user.username.split('/')[0] if '/' in user.username else user.username,
'department': user.department or '-',
}
}), 200
except Exception as e:
current_app.logger.error(f"Get Profile Failed: {str(e)}")
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500
# ==============================================================================
# 自我修改密码(无需旧密码)
# ==============================================================================
@auth_bp.route('/me/password', methods=['PUT'])
@jwt_required()
def change_my_password():
"""
【重构】自我修改密码接口
- 无需管理员权限,无需旧密码
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码
"""
try:
from app.models.system import SysUser
user_id = get_jwt_identity()
data = request.get_json() data = request.get_json()
if not data: if not data:
@ -299,52 +371,3 @@ def change_my_password():
except Exception as e: except Exception as e:
current_app.logger.error(f"Change Password Failed: {str(e)}") current_app.logger.error(f"Change Password Failed: {str(e)}")
return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500 return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_my_profile():
"""
【新增】获取当前登录用户的个人资料(自我查看)
- 只返回姓名/账号和所属部门
- 严格脱敏:不暴露系统角色字段
"""
try:
from app.models.system import SysUser
claims = get_jwt()
user_id = claims.get('sub')
display_name = claims.get('display_name', '')
account_id = claims.get('username', '')
# 超级管理员user_id=0
if user_id == 0:
return jsonify({
'msg': '获取成功',
'data': {
'id': 0,
'username': 'IRIS',
'display_name': '超级管理员(IRIS)',
'department': 'System',
# 【关键】严格脱敏:不暴露 role 字段
}
}), 200
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
return jsonify({
'msg': '获取成功',
'data': {
'id': user.id,
'username': account_id,
'display_name': user.username.split('/')[0] if user.username else display_name,
'department': user.department or '-',
# 【关键】严格脱敏:不暴露 role 字段
}
}), 200
except Exception as e:
current_app.logger.error(f"Get Profile Failed: {str(e)}")
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500