Compare commits
4 Commits
2242aca6fe
...
e1006f383a
| Author | SHA1 | Date | |
|---|---|---|---|
| e1006f383a | |||
| 505b8e5a32 | |||
| 0bdd4c068e | |||
| 61ec906cfb |
@ -1,6 +1,24 @@
|
|||||||
# app/api/v1/auth.py
|
# app/api/v1/auth.py
|
||||||
|
# ==============================================================================
|
||||||
|
# Flask & Extensions
|
||||||
|
# ==============================================================================
|
||||||
from flask import Blueprint, request, jsonify, current_app
|
from flask import Blueprint, request, jsonify, current_app
|
||||||
from flask_jwt_extended import jwt_required, get_jwt
|
from flask_jwt_extended import (
|
||||||
|
jwt_required,
|
||||||
|
get_jwt,
|
||||||
|
get_jwt_identity,
|
||||||
|
create_access_token,
|
||||||
|
)
|
||||||
|
from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden, NotFound, InternalServerError
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# Database & Extensions
|
||||||
|
# ==============================================================================
|
||||||
|
from app.extensions import db
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# Service & Decorators
|
||||||
|
# ==============================================================================
|
||||||
from app.services.auth_service import AuthService
|
from app.services.auth_service import AuthService
|
||||||
from app.utils.decorators import permission_required, audit_log
|
from app.utils.decorators import permission_required, audit_log
|
||||||
|
|
||||||
@ -52,6 +70,9 @@ def filter_item_by_permissions(item_dict, user_permissions):
|
|||||||
return item_dict
|
return item_dict
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# 登录接口
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/login', methods=['POST'])
|
@auth_bp.route('/login', methods=['POST'])
|
||||||
def login():
|
def login():
|
||||||
try:
|
try:
|
||||||
@ -73,17 +94,19 @@ def login():
|
|||||||
return jsonify(response_data), 200
|
return jsonify(response_data), 200
|
||||||
|
|
||||||
except ValueError as ve:
|
except ValueError as ve:
|
||||||
|
current_app.logger.warning(f"Login validation failed: {str(ve)}")
|
||||||
return jsonify({'msg': str(ve)}), 401
|
return jsonify({'msg': str(ve)}), 401
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.error(f"Login Failed Error: {str(e)}")
|
current_app.logger.error(f"Login Failed Error: {str(e)}")
|
||||||
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
|
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# Token 刷新接口
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/refresh', methods=['POST'])
|
@auth_bp.route('/refresh', methods=['POST'])
|
||||||
def refresh():
|
def refresh():
|
||||||
"""
|
"""使用 refresh_token 换发新的 access_token"""
|
||||||
使用 refresh_token 换发新的 access_token
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
if not data or not data.get('refresh_token'):
|
if not data or not data.get('refresh_token'):
|
||||||
@ -98,12 +121,16 @@ def refresh():
|
|||||||
}), 200
|
}), 200
|
||||||
|
|
||||||
except ValueError as ve:
|
except ValueError as ve:
|
||||||
|
current_app.logger.warning(f"Token refresh validation failed: {str(ve)}")
|
||||||
return jsonify({'msg': str(ve)}), 401
|
return jsonify({'msg': str(ve)}), 401
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.error(f"Token Refresh Error: {str(e)}")
|
current_app.logger.error(f"Token Refresh Error: {str(e)}")
|
||||||
return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500
|
return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# 创建用户(管理员)
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/user/create', methods=['POST'])
|
@auth_bp.route('/user/create', methods=['POST'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
@permission_required('system_user:operation')
|
@permission_required('system_user:operation')
|
||||||
@ -119,7 +146,6 @@ def create_user():
|
|||||||
user_permissions = get_current_user_permissions()
|
user_permissions = get_current_user_permissions()
|
||||||
# 超级管理员不过滤
|
# 超级管理员不过滤
|
||||||
if 'system_user:*' not in user_permissions:
|
if 'system_user:*' not in user_permissions:
|
||||||
# 字段名到权限码的映射
|
|
||||||
field_to_perm = {
|
field_to_perm = {
|
||||||
'cn_name': 'system_user:username',
|
'cn_name': 'system_user:username',
|
||||||
'username': 'system_user:username',
|
'username': 'system_user:username',
|
||||||
@ -128,13 +154,9 @@ def create_user():
|
|||||||
'role': 'system_user:role',
|
'role': 'system_user:role',
|
||||||
'email': 'system_user:email',
|
'email': 'system_user:email',
|
||||||
}
|
}
|
||||||
# 对于 password 字段,如果没有对应权限但用户有操作权限,可以保留(由装饰器保证)
|
|
||||||
# 但如果连操作权限都没有,则不会进入此接口。
|
|
||||||
for field in list(data.keys()):
|
for field in list(data.keys()):
|
||||||
perm_code = field_to_perm.get(field)
|
perm_code = field_to_perm.get(field)
|
||||||
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
|
|
||||||
if field == 'password':
|
if field == 'password':
|
||||||
# 检查用户是否有操作权限,如果有则保留
|
|
||||||
if 'system_user:operation' not in user_permissions:
|
if 'system_user:operation' not in user_permissions:
|
||||||
data.pop(field, None)
|
data.pop(field, None)
|
||||||
continue
|
continue
|
||||||
@ -152,7 +174,9 @@ def create_user():
|
|||||||
return jsonify({'msg': str(e)}), 400
|
return jsonify({'msg': str(e)}), 400
|
||||||
|
|
||||||
|
|
||||||
# [新增] 更新用户
|
# ==============================================================================
|
||||||
|
# 更新用户(管理员)
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
|
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
@permission_required('system_user:operation')
|
@permission_required('system_user:operation')
|
||||||
@ -164,11 +188,8 @@ def create_user():
|
|||||||
def update_user(user_id):
|
def update_user(user_id):
|
||||||
try:
|
try:
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
# 数据清洗:移除用户没有权限的字段
|
|
||||||
user_permissions = get_current_user_permissions()
|
user_permissions = get_current_user_permissions()
|
||||||
# 超级管理员不过滤
|
|
||||||
if 'system_user:*' not in user_permissions:
|
if 'system_user:*' not in user_permissions:
|
||||||
# 字段名到权限码的映射
|
|
||||||
field_to_perm = {
|
field_to_perm = {
|
||||||
'cn_name': 'system_user:username',
|
'cn_name': 'system_user:username',
|
||||||
'username': 'system_user:username',
|
'username': 'system_user:username',
|
||||||
@ -179,9 +200,7 @@ def update_user(user_id):
|
|||||||
}
|
}
|
||||||
for field in list(data.keys()):
|
for field in list(data.keys()):
|
||||||
perm_code = field_to_perm.get(field)
|
perm_code = field_to_perm.get(field)
|
||||||
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
|
|
||||||
if field == 'password':
|
if field == 'password':
|
||||||
# 检查用户是否有操作权限,如果有则保留
|
|
||||||
if 'system_user:operation' not in user_permissions:
|
if 'system_user:operation' not in user_permissions:
|
||||||
data.pop(field, None)
|
data.pop(field, None)
|
||||||
continue
|
continue
|
||||||
@ -199,21 +218,26 @@ def update_user(user_id):
|
|||||||
return jsonify({'msg': str(e)}), 400
|
return jsonify({'msg': str(e)}), 400
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# 获取所有用户列表(管理员)
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/users', methods=['GET'])
|
@auth_bp.route('/users', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
@permission_required('system_user')
|
@permission_required('system_user')
|
||||||
def get_users():
|
def get_users():
|
||||||
try:
|
try:
|
||||||
users = AuthService.get_all_users()
|
users = AuthService.get_all_users()
|
||||||
# 字段级脱敏
|
|
||||||
user_permissions = get_current_user_permissions()
|
user_permissions = get_current_user_permissions()
|
||||||
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
|
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
|
||||||
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
|
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.error(f"Get Users Failed: {str(e)}")
|
current_app.logger.error(f"Get Users Failed: {str(e)}")
|
||||||
return jsonify({'msg': '获取用户列表失败'}), 500
|
return jsonify({'msg': f'获取用户列表失败: {str(e)}'}), 500
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# 删除用户(管理员)
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
|
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
@permission_required('system_user:operation')
|
@permission_required('system_user:operation')
|
||||||
@ -234,6 +258,9 @@ def delete_user(user_id):
|
|||||||
return jsonify({'msg': str(e)}), 400
|
return jsonify({'msg': str(e)}), 400
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# 获取当前用户权限列表(登录后调用)
|
||||||
|
# ==============================================================================
|
||||||
@auth_bp.route('/my-permissions', methods=['GET'])
|
@auth_bp.route('/my-permissions', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_my_permissions():
|
def get_my_permissions():
|
||||||
@ -241,30 +268,75 @@ def get_my_permissions():
|
|||||||
try:
|
try:
|
||||||
claims = get_jwt()
|
claims = get_jwt()
|
||||||
role = claims.get('role')
|
role = claims.get('role')
|
||||||
|
|
||||||
# 调用 Service 获取权限
|
|
||||||
permissions = AuthService.get_user_permissions(role)
|
permissions = AuthService.get_user_permissions(role)
|
||||||
|
|
||||||
return jsonify({'msg': '获取成功', 'data': permissions}), 200
|
return jsonify({'msg': '获取成功', 'data': permissions}), 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
|
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
|
||||||
return jsonify({'msg': '获取权限失败'}), 500
|
return jsonify({'msg': f'获取权限失败: {str(e)}'}), 500
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/me/password', methods=['PUT'])
|
# ==============================================================================
|
||||||
|
# 获取当前用户个人资料(自我查看)
|
||||||
|
# ==============================================================================
|
||||||
|
@auth_bp.route('/me', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def change_my_password():
|
def get_my_profile():
|
||||||
"""
|
"""
|
||||||
【改造】自我修改密码接口
|
【重构】获取当前登录用户的个人资料(自我查看)
|
||||||
- 无需管理员权限,无需旧密码
|
- 只返回姓名/账号和所属部门
|
||||||
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码
|
- 严格脱敏:不暴露系统角色字段
|
||||||
- 字段脱敏:不暴露系统角色
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
from app.models.system import SysUser
|
from app.models.system import SysUser
|
||||||
|
|
||||||
claims = get_jwt()
|
user_id = get_jwt_identity()
|
||||||
user_id = claims.get('sub')
|
|
||||||
|
# 超级管理员(user_id=0)
|
||||||
|
if user_id == 0:
|
||||||
|
return jsonify({
|
||||||
|
'msg': '获取成功',
|
||||||
|
'data': {
|
||||||
|
'id': 0,
|
||||||
|
'username': 'IRIS',
|
||||||
|
'display_name': '超级管理员(IRIS)',
|
||||||
|
'department': 'System',
|
||||||
|
}
|
||||||
|
}), 200
|
||||||
|
|
||||||
|
user = SysUser.query.get(user_id)
|
||||||
|
if not user:
|
||||||
|
return jsonify({'msg': '用户不存在'}), 404
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
'msg': '获取成功',
|
||||||
|
'data': {
|
||||||
|
'id': user.id,
|
||||||
|
'username': user.username.split('/')[1] if '/' in user.username else user.username,
|
||||||
|
'display_name': user.username.split('/')[0] if '/' in user.username else user.username,
|
||||||
|
'department': user.department or '-',
|
||||||
|
}
|
||||||
|
}), 200
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
current_app.logger.error(f"Get Profile Failed: {str(e)}")
|
||||||
|
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# 自我修改密码(无需旧密码)
|
||||||
|
# ==============================================================================
|
||||||
|
@auth_bp.route('/me/password', methods=['PUT'])
|
||||||
|
@jwt_required()
|
||||||
|
def change_my_password():
|
||||||
|
"""
|
||||||
|
【重构】自我修改密码接口
|
||||||
|
- 无需管理员权限,无需旧密码
|
||||||
|
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from app.models.system import SysUser
|
||||||
|
|
||||||
|
user_id = get_jwt_identity()
|
||||||
|
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
if not data:
|
if not data:
|
||||||
@ -299,52 +371,3 @@ def change_my_password():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.error(f"Change Password Failed: {str(e)}")
|
current_app.logger.error(f"Change Password Failed: {str(e)}")
|
||||||
return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500
|
return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/me', methods=['GET'])
|
|
||||||
@jwt_required()
|
|
||||||
def get_my_profile():
|
|
||||||
"""
|
|
||||||
【新增】获取当前登录用户的个人资料(自我查看)
|
|
||||||
- 只返回姓名/账号和所属部门
|
|
||||||
- 严格脱敏:不暴露系统角色字段
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
from app.models.system import SysUser
|
|
||||||
|
|
||||||
claims = get_jwt()
|
|
||||||
user_id = claims.get('sub')
|
|
||||||
display_name = claims.get('display_name', '')
|
|
||||||
account_id = claims.get('username', '')
|
|
||||||
|
|
||||||
# 超级管理员(user_id=0)
|
|
||||||
if user_id == 0:
|
|
||||||
return jsonify({
|
|
||||||
'msg': '获取成功',
|
|
||||||
'data': {
|
|
||||||
'id': 0,
|
|
||||||
'username': 'IRIS',
|
|
||||||
'display_name': '超级管理员(IRIS)',
|
|
||||||
'department': 'System',
|
|
||||||
# 【关键】严格脱敏:不暴露 role 字段
|
|
||||||
}
|
|
||||||
}), 200
|
|
||||||
|
|
||||||
user = SysUser.query.get(user_id)
|
|
||||||
if not user:
|
|
||||||
return jsonify({'msg': '用户不存在'}), 404
|
|
||||||
|
|
||||||
return jsonify({
|
|
||||||
'msg': '获取成功',
|
|
||||||
'data': {
|
|
||||||
'id': user.id,
|
|
||||||
'username': account_id,
|
|
||||||
'display_name': user.username.split('/')[0] if user.username else display_name,
|
|
||||||
'department': user.department or '-',
|
|
||||||
# 【关键】严格脱敏:不暴露 role 字段
|
|
||||||
}
|
|
||||||
}), 200
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
current_app.logger.error(f"Get Profile Failed: {str(e)}")
|
|
||||||
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500
|
|
||||||
|
|||||||
Reference in New Issue
Block a user