Files
KCGL/inventory-backend/app/api/v1/auth.py

504 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/api/v1/auth.py
# ==============================================================================
# Flask & Extensions
# ==============================================================================
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import (
jwt_required,
get_jwt,
get_jwt_identity,
create_access_token,
)
from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden, NotFound, InternalServerError
# ==============================================================================
# Database & Extensions
# ==============================================================================
from app.extensions import db
# ==============================================================================
# Service & Decorators
# ==============================================================================
from app.services.auth_service import AuthService
from app.utils.decorators import permission_required, audit_log
auth_bp = Blueprint('auth', __name__)
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
"""
返回当前用户拥有的所有权限码列表(包括菜单和元素)
此函数根据角色查询数据库得到权限。
"""
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
# 超级管理员返回所有字段权限 (忽略大小写)
if user_role.upper() == 'SUPER_ADMIN':
return ['system_user:*']
perm_dict = AuthService.get_user_permissions(user_role)
# 合并菜单和元素权限
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
def filter_item_by_permissions(item_dict, user_permissions):
"""
根据用户权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'system_user:id',
'username': 'system_user:username',
'account_id': 'system_user:account_id',
'email': 'system_user:email',
'department': 'system_user:department',
'role': 'system_user:role',
'status': 'system_user:status',
'created_at': 'system_user:created_at',
}
# 如果用户是超级管理员且有 'system_user:*',则不过滤
if 'system_user:*' in user_permissions:
return item_dict
for field, perm_code in field_to_perm.items():
if field in item_dict and perm_code not in user_permissions:
item_dict[field] = None
return item_dict
# ==============================================================================
# 登录接口
# ==============================================================================
@auth_bp.route('/login', methods=['POST'])
def login():
try:
data = request.get_json()
if not data:
return jsonify({'msg': '无效的请求数据'}), 400
if not data.get('username') or not data.get('password'):
return jsonify({'msg': '请输入用户名和密码'}), 400
result = AuthService.login(data)
response_data = {
'msg': '登录成功',
'access_token': result.get('access_token'),
'refresh_token': result.get('refresh_token'),
'user': result.get('user')
}
return jsonify(response_data), 200
except ValueError as ve:
current_app.logger.warning(f"Login validation failed: {str(ve)}")
return jsonify({'msg': str(ve)}), 401
except Exception as e:
current_app.logger.error(f"Login Failed Error: {str(e)}")
return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500
# ==============================================================================
# Token 刷新接口
# ==============================================================================
@auth_bp.route('/refresh', methods=['POST'])
def refresh():
"""使用 refresh_token 换发新的 access_token"""
try:
data = request.get_json()
if not data or not data.get('refresh_token'):
return jsonify({'msg': '缺少 refresh_token'}), 400
refresh_token = data.get('refresh_token')
result = AuthService.refresh_access_token(refresh_token)
return jsonify({
'msg': 'Token 刷新成功',
'access_token': result.get('access_token')
}), 200
except ValueError as ve:
current_app.logger.warning(f"Token refresh validation failed: {str(ve)}")
return jsonify({'msg': str(ve)}), 401
except Exception as e:
current_app.logger.error(f"Token Refresh Error: {str(e)}")
return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500
# ==============================================================================
# 创建用户(管理员)
# ==============================================================================
@auth_bp.route('/user/create', methods=['POST'])
@jwt_required()
@permission_required('system_user:operation')
@audit_log(
module='用户管理',
action='新增',
get_target_name_fn=lambda: request.get_json().get('username') if request.get_json() else None
)
def create_user():
try:
data = request.get_json()
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions()
# 超级管理员不过滤
if 'system_user:*' not in user_permissions:
field_to_perm = {
'cn_name': 'system_user:username',
'username': 'system_user:username',
'password': 'system_user:password',
'department': 'system_user:department',
'role': 'system_user:role',
'email': 'system_user:email',
}
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
if field == 'password':
if 'system_user:operation' not in user_permissions:
data.pop(field, None)
continue
if perm_code and perm_code not in user_permissions:
data.pop(field, None)
claims = get_jwt()
operator_role = claims.get('role')
result = AuthService.create_user(data, operator_role)
return jsonify({'msg': '用户创建成功', 'data': result}), 201
except Exception as e:
current_app.logger.error(f"User Create Failed: {str(e)}")
return jsonify({'msg': str(e)}), 400
# ==============================================================================
# 批量创建用户
# ==============================================================================
@auth_bp.route('/user/batch', methods=['POST'])
@jwt_required()
def batch_create_user():
try:
data_list = request.get_json()
if not data_list or not isinstance(data_list, list):
return jsonify({'msg': '请求数据必须是用户数组'}), 400
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions()
for data in data_list:
if 'system_user:*' not in user_permissions:
field_to_perm = {
'cn_name': 'system_user:username',
'username': 'system_user:username',
'password': 'system_user:password',
'department': 'system_user:department',
'role': 'system_user:role',
'email': 'system_user:email',
}
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
if field == 'password':
if 'system_user:operation' not in user_permissions:
data.pop(field, None)
continue
if perm_code and perm_code not in user_permissions:
data.pop(field, None)
claims = get_jwt()
operator_role = claims.get('role')
results = AuthService.batch_create_users(data_list, operator_role)
return jsonify({'msg': '批量处理完成', 'data': results}), 200
except Exception as e:
current_app.logger.error(f"Batch User Create Failed: {str(e)}")
return jsonify({'msg': str(e)}), 500
# ==============================================================================
# 更新用户(管理员)
# ==============================================================================
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
@jwt_required()
@permission_required('system_user:operation')
@audit_log(
module='用户管理',
action='修改',
get_target_id_fn=lambda: request.view_args.get('user_id')
)
def update_user(user_id):
try:
data = request.get_json()
user_permissions = get_current_user_permissions()
if 'system_user:*' not in user_permissions:
field_to_perm = {
'cn_name': 'system_user:username',
'username': 'system_user:username',
'password': 'system_user:password',
'department': 'system_user:department',
'role': 'system_user:role',
'email': 'system_user:email',
}
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
if field == 'password':
if 'system_user:operation' not in user_permissions:
data.pop(field, None)
continue
if perm_code and perm_code not in user_permissions:
data.pop(field, None)
claims = get_jwt()
operator_role = claims.get('role')
result = AuthService.update_user(user_id, data, operator_role)
return jsonify({'msg': '用户更新成功', 'data': result}), 200
except Exception as e:
current_app.logger.error(f"User Update Failed: {str(e)}")
return jsonify({'msg': str(e)}), 400
# ==============================================================================
# 获取所有用户列表(管理员)
# ==============================================================================
@auth_bp.route('/users', methods=['GET'])
@jwt_required()
@permission_required('system_user')
def get_users():
try:
users = AuthService.get_all_users()
user_permissions = get_current_user_permissions()
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
except Exception as e:
current_app.logger.error(f"Get Users Failed: {str(e)}")
return jsonify({'msg': f'获取用户列表失败: {str(e)}'}), 500
# ==============================================================================
# 删除用户(管理员)
# ==============================================================================
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
@jwt_required()
@permission_required('system_user:operation')
@audit_log(
module='用户管理',
action='删除',
get_target_id_fn=lambda: request.view_args.get('user_id')
)
def delete_user(user_id):
try:
claims = get_jwt()
operator_role = claims.get('role')
username = AuthService.delete_user(user_id, operator_role)
return jsonify({'msg': '删除成功', 'username': username}), 200
except Exception as e:
current_app.logger.error(f"Delete User Failed: {str(e)}")
return jsonify({'msg': str(e)}), 400
# ==============================================================================
# 获取当前用户权限列表(登录后调用)
# ==============================================================================
@auth_bp.route('/my-permissions', methods=['GET'])
@jwt_required()
def get_my_permissions():
"""获取当前登录用户的权限列表"""
try:
claims = get_jwt()
role = claims.get('role')
permissions = AuthService.get_user_permissions(role)
return jsonify({'msg': '获取成功', 'data': permissions}), 200
except Exception as e:
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
return jsonify({'msg': f'获取权限失败: {str(e)}'}), 500
# ==============================================================================
# 获取可指定审批人列表SUPERVISOR / SUPER_ADMIN 且 status=active
# ==============================================================================
@auth_bp.route('/users/approvers', methods=['GET'])
@jwt_required()
def get_approvers():
"""
查询角色为 SUPER_ADMIN 或 SUPERVISOR 且状态为活跃的用户列表
返回: [{id, username, email, role}]
"""
try:
from app.models.system import SysUser
users = SysUser.query.filter(
SysUser.role.in_(['SUPER_ADMIN', 'SUPERVISOR']),
SysUser.status == 'active'
).all()
return jsonify({
'msg': '获取成功',
'data': [
{
'id': u.id,
'username': u.username,
'email': u.email or '',
'role': u.role
} for u in users
]
}), 200
except Exception as e:
current_app.logger.error(f"Get Approvers Failed: {str(e)}")
return jsonify({'msg': f'获取审批人列表失败: {str(e)}'}), 500
# ==============================================================================
# 获取当前用户个人资料(自我查看)
# ==============================================================================
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_my_profile():
"""
【重构】获取当前登录用户的个人资料(自我查看)
- 只返回姓名/账号和所属部门
- 严格脱敏:不暴露系统角色字段
"""
try:
from app.models.system import SysUser
user_id = get_jwt_identity()
# 超级管理员user_id=0
if user_id == 0:
return jsonify({
'msg': '获取成功',
'data': {
'id': 0,
'username': 'IRIS',
'display_name': '超级管理员(IRIS)',
'department': 'System',
}
}), 200
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
return jsonify({
'msg': '获取成功',
'data': {
'id': user.id,
'username': user.username.split('/')[1] if '/' in user.username else user.username,
'display_name': user.username.split('/')[0] if '/' in user.username else user.username,
'department': user.department or '-',
}
}), 200
except Exception as e:
current_app.logger.error(f"Get Profile Failed: {str(e)}")
return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500
# ==============================================================================
# 自我修改密码(无需旧密码)
# ==============================================================================
@auth_bp.route('/me/password', methods=['PUT'])
@jwt_required()
def change_my_password():
"""
【重构】自我修改密码接口
- 无需管理员权限,无需旧密码
- 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码
"""
try:
from app.models.system import SysUser
user_id = get_jwt_identity()
data = request.get_json()
if not data:
return jsonify({'msg': '无效的请求数据'}), 400
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
if not new_password or not confirm_password:
return jsonify({'msg': '新密码和确认新密码均不能为空'}), 400
if new_password != confirm_password:
return jsonify({'msg': '新密码与确认密码不一致'}), 400
if len(new_password) < 6:
return jsonify({'msg': '新密码长度不能少于6位'}), 400
# 超级管理员user_id=0使用硬编码密码不存入数据库
if user_id == 0:
return jsonify({'msg': '超级管理员密码由系统管理员管理,当前会话无法修改'}), 200
# 普通用户JWT 已证明身份,直接更新新密码
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
user.set_password(new_password)
db.session.commit()
return jsonify({'msg': '密码修改成功,请使用新密码重新登录'}), 200
except Exception as e:
current_app.logger.error(f"Change Password Failed: {str(e)}")
return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500
# ==============================================================================
# 自我更新邮箱
# ==============================================================================
@auth_bp.route('/me/email', methods=['PUT'])
@jwt_required()
def update_my_email():
"""
自我更新邮箱接口
- 仅更新 email 字段,与密码修改完全隔离
- 防止后端意外清空用户密码
"""
try:
from app.models.system import SysUser
user_id = get_jwt_identity()
# 超级管理员user_id=0不允许修改邮箱
if user_id == 0:
return jsonify({'msg': '超级管理员邮箱由系统管理员管理'}), 400
data = request.get_json()
if not data:
return jsonify({'msg': '无效的请求数据'}), 400
email = data.get('email')
if not email:
return jsonify({'msg': '邮箱不能为空'}), 400
# 简单的邮箱格式校验
import re
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return jsonify({'msg': '邮箱格式不正确'}), 400
user = SysUser.query.get(user_id)
if not user:
return jsonify({'msg': '用户不存在'}), 404
# 检查邮箱是否已被其他用户使用
existing = SysUser.query.filter(SysUser.email == email, SysUser.id != user_id).first()
if existing:
return jsonify({'msg': '该邮箱已被其他用户使用'}), 400
user.email = email
db.session.commit()
return jsonify({'msg': '邮箱更新成功'}), 200
except Exception as e:
current_app.logger.error(f"Update Email Failed: {str(e)}")
return jsonify({'msg': f'邮箱更新失败: {str(e)}'}), 500