211 lines
7.9 KiB
Python
211 lines
7.9 KiB
Python
# app/api/v1/auth.py
|
|
from flask import Blueprint, request, jsonify, current_app
|
|
from flask_jwt_extended import jwt_required, get_jwt
|
|
from app.services.auth_service import AuthService
|
|
from app.utils.decorators import permission_required
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
|
|
# ==============================================================================
|
|
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
|
|
# ==============================================================================
|
|
def get_current_user_permissions():
|
|
"""
|
|
返回当前用户拥有的所有权限码列表(包括菜单和元素)
|
|
此函数根据角色查询数据库得到权限。
|
|
"""
|
|
claims = get_jwt()
|
|
user_role = claims.get('role')
|
|
if not user_role:
|
|
return []
|
|
# 超级管理员返回所有字段权限
|
|
if user_role == 'super_admin':
|
|
return ['system_user:*']
|
|
perm_dict = AuthService.get_user_permissions(user_role)
|
|
# 合并菜单和元素权限
|
|
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
|
|
return perms
|
|
|
|
|
|
def filter_item_by_permissions(item_dict, user_permissions):
|
|
"""
|
|
根据用户权限过滤 item 字典,无权限的字段值置为 None
|
|
"""
|
|
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
|
|
field_to_perm = {
|
|
'id': 'system_user:id',
|
|
'username': 'system_user:username',
|
|
'account_id': 'system_user:account_id',
|
|
'email': 'system_user:email',
|
|
'department': 'system_user:department',
|
|
'role': 'system_user:role',
|
|
'status': 'system_user:status',
|
|
'created_at': 'system_user:created_at',
|
|
}
|
|
# 如果用户是超级管理员且有 'system_user:*',则不过滤
|
|
if 'system_user:*' in user_permissions:
|
|
return item_dict
|
|
for field, perm_code in field_to_perm.items():
|
|
if field in item_dict and perm_code not in user_permissions:
|
|
item_dict[field] = None
|
|
return item_dict
|
|
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
def login():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'msg': '无效的请求数据'}), 400
|
|
|
|
if not data.get('username') or not data.get('password'):
|
|
return jsonify({'msg': '请输入用户名和密码'}), 400
|
|
|
|
result = AuthService.login(data)
|
|
|
|
response_data = {
|
|
'msg': '登录成功',
|
|
'access_token': result.get('access_token'),
|
|
'user': result.get('user')
|
|
}
|
|
return jsonify(response_data), 200
|
|
|
|
except ValueError as ve:
|
|
return jsonify({'msg': str(ve)}), 401
|
|
except Exception as e:
|
|
current_app.logger.error(f"Login Failed Error: {str(e)}")
|
|
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
|
|
|
|
|
|
@auth_bp.route('/user/create', methods=['POST'])
|
|
@jwt_required()
|
|
@permission_required('system_user:operation')
|
|
def create_user():
|
|
try:
|
|
data = request.get_json()
|
|
# 数据清洗:移除用户没有权限的字段
|
|
user_permissions = get_current_user_permissions()
|
|
# 超级管理员不过滤
|
|
if 'system_user:*' not in user_permissions:
|
|
# 字段名到权限码的映射
|
|
field_to_perm = {
|
|
'cn_name': 'system_user:username',
|
|
'username': 'system_user:username',
|
|
'password': 'system_user:password',
|
|
'department': 'system_user:department',
|
|
'role': 'system_user:role',
|
|
'email': 'system_user:email',
|
|
}
|
|
# 对于 password 字段,如果没有对应权限但用户有操作权限,可以保留(由装饰器保证)
|
|
# 但如果连操作权限都没有,则不会进入此接口。
|
|
for field in list(data.keys()):
|
|
perm_code = field_to_perm.get(field)
|
|
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
|
|
if field == 'password':
|
|
# 检查用户是否有操作权限,如果有则保留
|
|
if 'system_user:operation' not in user_permissions:
|
|
data.pop(field, None)
|
|
continue
|
|
if perm_code and perm_code not in user_permissions:
|
|
data.pop(field, None)
|
|
|
|
claims = get_jwt()
|
|
operator_role = claims.get('role')
|
|
|
|
result = AuthService.create_user(data, operator_role)
|
|
return jsonify({'msg': '用户创建成功', 'data': result}), 201
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"User Create Failed: {str(e)}")
|
|
return jsonify({'msg': str(e)}), 400
|
|
|
|
|
|
# [新增] 更新用户
|
|
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
|
|
@jwt_required()
|
|
@permission_required('system_user:operation')
|
|
def update_user(user_id):
|
|
try:
|
|
data = request.get_json()
|
|
# 数据清洗:移除用户没有权限的字段
|
|
user_permissions = get_current_user_permissions()
|
|
# 超级管理员不过滤
|
|
if 'system_user:*' not in user_permissions:
|
|
# 字段名到权限码的映射
|
|
field_to_perm = {
|
|
'cn_name': 'system_user:username',
|
|
'username': 'system_user:username',
|
|
'password': 'system_user:password',
|
|
'department': 'system_user:department',
|
|
'role': 'system_user:role',
|
|
'email': 'system_user:email',
|
|
}
|
|
for field in list(data.keys()):
|
|
perm_code = field_to_perm.get(field)
|
|
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
|
|
if field == 'password':
|
|
# 检查用户是否有操作权限,如果有则保留
|
|
if 'system_user:operation' not in user_permissions:
|
|
data.pop(field, None)
|
|
continue
|
|
if perm_code and perm_code not in user_permissions:
|
|
data.pop(field, None)
|
|
|
|
claims = get_jwt()
|
|
operator_role = claims.get('role')
|
|
|
|
result = AuthService.update_user(user_id, data, operator_role)
|
|
return jsonify({'msg': '用户更新成功', 'data': result}), 200
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"User Update Failed: {str(e)}")
|
|
return jsonify({'msg': str(e)}), 400
|
|
|
|
|
|
@auth_bp.route('/users', methods=['GET'])
|
|
@jwt_required()
|
|
@permission_required('system_user')
|
|
def get_users():
|
|
try:
|
|
users = AuthService.get_all_users()
|
|
# 字段级脱敏
|
|
user_permissions = get_current_user_permissions()
|
|
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
|
|
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
|
|
except Exception as e:
|
|
current_app.logger.error(f"Get Users Failed: {str(e)}")
|
|
return jsonify({'msg': '获取用户列表失败'}), 500
|
|
|
|
|
|
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
|
|
@jwt_required()
|
|
@permission_required('system_user:operation')
|
|
def delete_user(user_id):
|
|
try:
|
|
claims = get_jwt()
|
|
operator_role = claims.get('role')
|
|
|
|
AuthService.delete_user(user_id, operator_role)
|
|
return jsonify({'msg': '删除成功'}), 200
|
|
except Exception as e:
|
|
current_app.logger.error(f"Delete User Failed: {str(e)}")
|
|
return jsonify({'msg': str(e)}), 400
|
|
|
|
|
|
@auth_bp.route('/my-permissions', methods=['GET'])
|
|
@jwt_required()
|
|
def get_my_permissions():
|
|
"""获取当前登录用户的权限列表"""
|
|
try:
|
|
claims = get_jwt()
|
|
role = claims.get('role')
|
|
|
|
# 调用 Service 获取权限
|
|
permissions = AuthService.get_user_permissions(role)
|
|
|
|
return jsonify({'msg': '获取成功', 'data': permissions}), 200
|
|
except Exception as e:
|
|
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
|
|
return jsonify({'msg': '获取权限失败'}), 500
|