Files
KCGL/inventory-backend/app/api/v1/auth.py

351 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/api/v1/auth.py
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt
from app.services.auth_service import AuthService
from app.utils.decorators import permission_required, audit_log
auth_bp = Blueprint('auth', __name__)
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
"""
返回当前用户拥有的所有权限码列表(包括菜单和元素)
此函数根据角色查询数据库得到权限。
"""
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
# 超级管理员返回所有字段权限 (忽略大小写)
if user_role.upper() == 'SUPER_ADMIN':
return ['system_user:*']
perm_dict = AuthService.get_user_permissions(user_role)
# 合并菜单和元素权限
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
def filter_item_by_permissions(item_dict, user_permissions):
"""
根据用户权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'system_user:id',
'username': 'system_user:username',
'account_id': 'system_user:account_id',
'email': 'system_user:email',
'department': 'system_user:department',
'role': 'system_user:role',
'status': 'system_user:status',
'created_at': 'system_user:created_at',
}
# 如果用户是超级管理员且有 'system_user:*',则不过滤
if 'system_user:*' in user_permissions:
return item_dict
for field, perm_code in field_to_perm.items():
if field in item_dict and perm_code not in user_permissions:
item_dict[field] = None
return item_dict
@auth_bp.route('/login', methods=['POST'])
def login():
try:
data = request.get_json()
if not data:
return jsonify({'msg': '无效的请求数据'}), 400
if not data.get('username') or not data.get('password'):
return jsonify({'msg': '请输入用户名和密码'}), 400
result = AuthService.login(data)
response_data = {
'msg': '登录成功',
'access_token': result.get('access_token'),
'refresh_token': result.get('refresh_token'),
'user': result.get('user')
}
return jsonify(response_data), 200
except ValueError as ve:
return jsonify({'msg': str(ve)}), 401
except Exception as e:
current_app.logger.error(f"Login Failed Error: {str(e)}")
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
@auth_bp.route('/refresh', methods=['POST'])
def refresh():
"""
使用 refresh_token 换发新的 access_token
"""
try:
data = request.get_json()
if not data or not data.get('refresh_token'):
return jsonify({'msg': '缺少 refresh_token'}), 400
refresh_token = data.get('refresh_token')
result = AuthService.refresh_access_token(refresh_token)
return jsonify({
'msg': 'Token 刷新成功',
'access_token': result.get('access_token')
}), 200
except ValueError as ve:
return jsonify({'msg': str(ve)}), 401
except Exception as e:
current_app.logger.error(f"Token Refresh Error: {str(e)}")
return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500
@auth_bp.route('/user/create', methods=['POST'])
@jwt_required()
@permission_required('system_user:operation')
@audit_log(
module='用户管理',
action='新增',
get_target_name_fn=lambda: request.get_json().get('username') if request.get_json() else None
)
def create_user():
try:
data = request.get_json()
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions()
# 超级管理员不过滤
if 'system_user:*' not in user_permissions:
# 字段名到权限码的映射
field_to_perm = {
'cn_name': 'system_user:username',
'username': 'system_user:username',
'password': 'system_user:password',
'department': 'system_user:department',
'role': 'system_user:role',
'email': 'system_user:email',
}
# 对于 password 字段,如果没有对应权限但用户有操作权限,可以保留(由装饰器保证)
# 但如果连操作权限都没有,则不会进入此接口。
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
if field == 'password':
# 检查用户是否有操作权限,如果有则保留
if 'system_user:operation' not in user_permissions:
data.pop(field, None)
continue
if perm_code and perm_code not in user_permissions:
data.pop(field, None)
claims = get_jwt()
operator_role = claims.get('role')
result = AuthService.create_user(data, operator_role)
return jsonify({'msg': '用户创建成功', 'data': result}), 201
except Exception as e:
current_app.logger.error(f"User Create Failed: {str(e)}")
return jsonify({'msg': str(e)}), 400
# [新增] 更新用户
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
@jwt_required()
@permission_required('system_user:operation')
@audit_log(
module='用户管理',
action='修改',
get_target_id_fn=lambda: request.view_args.get('user_id')
)
def update_user(user_id):
try:
data = request.get_json()
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions()
# 超级管理员不过滤
if 'system_user:*' not in user_permissions:
# 字段名到权限码的映射
field_to_perm = {
'cn_name': 'system_user:username',
'username': 'system_user:username',
'password': 'system_user:password',
'department': 'system_user:department',
'role': 'system_user:role',
'email': 'system_user:email',
}
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
# 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除)
if field == 'password':
# 检查用户是否有操作权限,如果有则保留
if 'system_user:operation' not in user_permissions:
data.pop(field, None)
continue
if perm_code and perm_code not in user_permissions:
data.pop(field, None)
claims = get_jwt()
operator_role = claims.get('role')
result = AuthService.update_user(user_id, data, operator_role)
return jsonify({'msg': '用户更新成功', 'data': result}), 200
except Exception as e:
current_app.logger.error(f"User Update Failed: {str(e)}")
return jsonify({'msg': str(e)}), 400
@auth_bp.route('/users', methods=['GET'])
@jwt_required()
@permission_required('system_user')
def get_users():
try:
users = AuthService.get_all_users()
# 字段级脱敏
user_permissions = get_current_user_permissions()
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
except Exception as e:
current_app.logger.error(f"Get Users Failed: {str(e)}")
return jsonify({'msg': '获取用户列表失败'}), 500
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
@jwt_required()
@permission_required('system_user:operation')
@audit_log(
module='用户管理',
action='删除',
get_target_id_fn=lambda: request.view_args.get('user_id')
)
def delete_user(user_id):
try:
claims = get_jwt()
operator_role = claims.get('role')
username = AuthService.delete_user(user_id, operator_role)
return jsonify({'msg': '删除成功', 'username': username}), 200
except Exception as e:
current_app.logger.error(f"Delete User Failed: {str(e)}")
return jsonify({'msg': str(e)}), 400
@auth_bp.route('/my-permissions', methods=['GET'])
@jwt_required()
def get_my_permissions():
"""获取当前登录用户的权限列表"""
try:
claims = get_jwt()
role = claims.get('role')
# 调用 Service 获取权限
permissions = AuthService.get_user_permissions(role)
return jsonify({'msg': '获取成功', 'data': permissions}), 200
except Exception as e:
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
return jsonify({'msg': '获取权限失败'}), 500
@auth_bp.route('/me/password', methods=['PUT'])
@jwt_required()
def change_my_password():
"""
【改造】自我修改密码接口
- 无需管理员权限,无需旧密码
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码
- 字段脱敏:不暴露系统角色
"""
try:
from app.models.system import SysUser
claims = get_jwt()
user_id = claims.get('sub')
data = request.get_json()
if not data:
return jsonify({'msg': '无效的请求数据'}), 400
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
if not new_password or not confirm_password:
return jsonify({'msg': '新密码和确认新密码均不能为空'}), 400
if new_password != confirm_password:
return jsonify({'msg': '新密码与确认密码不一致'}), 400
if len(new_password) < 6:
return jsonify({'msg': '新密码长度不能少于6位'}), 400
# 超级管理员user_id=0使用硬编码密码不存入数据库
if user_id == 0:
return jsonify({'msg': '超级管理员密码由系统管理员管理,当前会话无法修改'}), 200
# 普通用户JWT 已证明身份,直接更新新密码
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
user.set_password(new_password)
db.session.commit()
return jsonify({'msg': '密码修改成功,请使用新密码重新登录'}), 200
except Exception as e:
current_app.logger.error(f"Change Password Failed: {str(e)}")
return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_my_profile():
"""
【新增】获取当前登录用户的个人资料(自我查看)
- 只返回姓名/账号和所属部门
- 严格脱敏:不暴露系统角色字段
"""
try:
from app.models.system import SysUser
claims = get_jwt()
user_id = claims.get('sub')
display_name = claims.get('display_name', '')
account_id = claims.get('username', '')
# 超级管理员user_id=0
if user_id == 0:
return jsonify({
'msg': '获取成功',
'data': {
'id': 0,
'username': 'IRIS',
'display_name': '超级管理员(IRIS)',
'department': 'System',
# 【关键】严格脱敏:不暴露 role 字段
}
}), 200
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
return jsonify({
'msg': '获取成功',
'data': {
'id': user.id,
'username': account_id,
'display_name': user.username.split('/')[0] if user.username else display_name,
'department': user.department or '-',
# 【关键】严格脱敏:不暴露 role 字段
}
}), 200
except Exception as e:
current_app.logger.error(f"Get Profile Failed: {str(e)}")
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500