# app/api/v1/auth.py # ============================================================================== # Flask & Extensions # ============================================================================== from flask import Blueprint, request, jsonify, current_app from flask_jwt_extended import ( jwt_required, get_jwt, get_jwt_identity, create_access_token, ) from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden, NotFound, InternalServerError # ============================================================================== # Database & Extensions # ============================================================================== from app.extensions import db # ============================================================================== # Service & Decorators # ============================================================================== from app.services.auth_service import AuthService from app.utils.decorators import permission_required, audit_log auth_bp = Blueprint('auth', __name__) # ============================================================================== # 辅助函数:获取当前用户的完整权限列表(基于角色查询) # ============================================================================== def get_current_user_permissions(): """ 返回当前用户拥有的所有权限码列表(包括菜单和元素) 此函数根据角色查询数据库得到权限。 """ claims = get_jwt() user_role = claims.get('role') if not user_role: return [] # 超级管理员返回所有字段权限 (忽略大小写) if user_role.upper() == 'SUPER_ADMIN': return ['system_user:*'] perm_dict = AuthService.get_user_permissions(user_role) # 合并菜单和元素权限 perms = perm_dict.get('menus', []) + perm_dict.get('elements', []) return perms def filter_item_by_permissions(item_dict, user_permissions): """ 根据用户权限过滤 item 字典,无权限的字段值置为 None """ # 字段名到权限码的映射(与前端 permissionMap 保持一致) field_to_perm = { 'id': 'system_user:id', 'username': 'system_user:username', 'account_id': 'system_user:account_id', 'email': 'system_user:email', 'department': 'system_user:department', 'role': 'system_user:role', 'status': 'system_user:status', 'created_at': 'system_user:created_at', } # 如果用户是超级管理员且有 'system_user:*',则不过滤 if 'system_user:*' in user_permissions: return item_dict for field, perm_code in field_to_perm.items(): if field in item_dict and perm_code not in user_permissions: item_dict[field] = None return item_dict # ============================================================================== # 登录接口 # ============================================================================== @auth_bp.route('/login', methods=['POST']) def login(): try: data = request.get_json() if not data: return jsonify({'msg': '无效的请求数据'}), 400 if not data.get('username') or not data.get('password'): return jsonify({'msg': '请输入用户名和密码'}), 400 result = AuthService.login(data) response_data = { 'msg': '登录成功', 'access_token': result.get('access_token'), 'refresh_token': result.get('refresh_token'), 'user': result.get('user') } return jsonify(response_data), 200 except ValueError as ve: current_app.logger.warning(f"Login validation failed: {str(ve)}") return jsonify({'msg': str(ve)}), 401 except Exception as e: current_app.logger.error(f"Login Failed Error: {str(e)}") return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500 # ============================================================================== # Token 刷新接口 # ============================================================================== @auth_bp.route('/refresh', methods=['POST']) def refresh(): """使用 refresh_token 换发新的 access_token""" try: data = request.get_json() if not data or not data.get('refresh_token'): return jsonify({'msg': '缺少 refresh_token'}), 400 refresh_token = data.get('refresh_token') result = AuthService.refresh_access_token(refresh_token) return jsonify({ 'msg': 'Token 刷新成功', 'access_token': result.get('access_token') }), 200 except ValueError as ve: current_app.logger.warning(f"Token refresh validation failed: {str(ve)}") return jsonify({'msg': str(ve)}), 401 except Exception as e: current_app.logger.error(f"Token Refresh Error: {str(e)}") return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500 # ============================================================================== # 创建用户(管理员) # ============================================================================== @auth_bp.route('/user/create', methods=['POST']) @jwt_required() @permission_required('system_user:operation') @audit_log( module='用户管理', action='新增', get_target_name_fn=lambda: request.get_json().get('username') if request.get_json() else None ) def create_user(): try: data = request.get_json() # 数据清洗:移除用户没有权限的字段 user_permissions = get_current_user_permissions() # 超级管理员不过滤 if 'system_user:*' not in user_permissions: field_to_perm = { 'cn_name': 'system_user:username', 'username': 'system_user:username', 'password': 'system_user:password', 'department': 'system_user:department', 'role': 'system_user:role', 'email': 'system_user:email', } for field in list(data.keys()): perm_code = field_to_perm.get(field) if field == 'password': if 'system_user:operation' not in user_permissions: data.pop(field, None) continue if perm_code and perm_code not in user_permissions: data.pop(field, None) claims = get_jwt() operator_role = claims.get('role') result = AuthService.create_user(data, operator_role) return jsonify({'msg': '用户创建成功', 'data': result}), 201 except Exception as e: current_app.logger.error(f"User Create Failed: {str(e)}") return jsonify({'msg': str(e)}), 400 # ============================================================================== # 批量创建用户 # ============================================================================== @auth_bp.route('/user/batch', methods=['POST']) @jwt_required() def batch_create_user(): try: data_list = request.get_json() if not data_list or not isinstance(data_list, list): return jsonify({'msg': '请求数据必须是用户数组'}), 400 # 数据清洗:移除用户没有权限的字段 user_permissions = get_current_user_permissions() for data in data_list: if 'system_user:*' not in user_permissions: field_to_perm = { 'cn_name': 'system_user:username', 'username': 'system_user:username', 'password': 'system_user:password', 'department': 'system_user:department', 'role': 'system_user:role', 'email': 'system_user:email', } for field in list(data.keys()): perm_code = field_to_perm.get(field) if field == 'password': if 'system_user:operation' not in user_permissions: data.pop(field, None) continue if perm_code and perm_code not in user_permissions: data.pop(field, None) claims = get_jwt() operator_role = claims.get('role') results = AuthService.batch_create_users(data_list, operator_role) return jsonify({'msg': '批量处理完成', 'data': results}), 200 except Exception as e: current_app.logger.error(f"Batch User Create Failed: {str(e)}") return jsonify({'msg': str(e)}), 500 # ============================================================================== # 更新用户(管理员) # ============================================================================== @auth_bp.route('/user/', methods=['PUT']) @jwt_required() @permission_required('system_user:operation') @audit_log( module='用户管理', action='修改', get_target_id_fn=lambda: request.view_args.get('user_id') ) def update_user(user_id): try: data = request.get_json() user_permissions = get_current_user_permissions() if 'system_user:*' not in user_permissions: field_to_perm = { 'cn_name': 'system_user:username', 'username': 'system_user:username', 'password': 'system_user:password', 'department': 'system_user:department', 'role': 'system_user:role', 'email': 'system_user:email', } for field in list(data.keys()): perm_code = field_to_perm.get(field) if field == 'password': if 'system_user:operation' not in user_permissions: data.pop(field, None) continue if perm_code and perm_code not in user_permissions: data.pop(field, None) claims = get_jwt() operator_role = claims.get('role') result = AuthService.update_user(user_id, data, operator_role) return jsonify({'msg': '用户更新成功', 'data': result}), 200 except Exception as e: current_app.logger.error(f"User Update Failed: {str(e)}") return jsonify({'msg': str(e)}), 400 # ============================================================================== # 获取所有用户列表(管理员) # ============================================================================== @auth_bp.route('/users', methods=['GET']) @jwt_required() @permission_required('system_user') def get_users(): try: users = AuthService.get_all_users() user_permissions = get_current_user_permissions() filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users] return jsonify({'msg': '获取成功', 'data': filtered_users}), 200 except Exception as e: current_app.logger.error(f"Get Users Failed: {str(e)}") return jsonify({'msg': f'获取用户列表失败: {str(e)}'}), 500 # ============================================================================== # 删除用户(管理员) # ============================================================================== @auth_bp.route('/user/', methods=['DELETE']) @jwt_required() @permission_required('system_user:operation') @audit_log( module='用户管理', action='删除', get_target_id_fn=lambda: request.view_args.get('user_id') ) def delete_user(user_id): try: claims = get_jwt() operator_role = claims.get('role') username = AuthService.delete_user(user_id, operator_role) return jsonify({'msg': '删除成功', 'username': username}), 200 except Exception as e: current_app.logger.error(f"Delete User Failed: {str(e)}") return jsonify({'msg': str(e)}), 400 # ============================================================================== # 获取当前用户权限列表(登录后调用) # ============================================================================== @auth_bp.route('/my-permissions', methods=['GET']) @jwt_required() def get_my_permissions(): """获取当前登录用户的权限列表""" try: claims = get_jwt() role = claims.get('role') permissions = AuthService.get_user_permissions(role) return jsonify({'msg': '获取成功', 'data': permissions}), 200 except Exception as e: current_app.logger.error(f"Get Permissions Failed: {str(e)}") return jsonify({'msg': f'获取权限失败: {str(e)}'}), 500 # ============================================================================== # 获取可指定审批人列表(SUPERVISOR / SUPER_ADMIN 且 status=active) # ============================================================================== @auth_bp.route('/users/approvers', methods=['GET']) @jwt_required() def get_approvers(): """ 查询角色为 SUPER_ADMIN 或 SUPERVISOR 且状态为活跃的用户列表 返回: [{id, username, email, role}] """ try: from app.models.system import SysUser users = SysUser.query.filter( SysUser.role.in_(['SUPER_ADMIN', 'SUPERVISOR']), SysUser.status == 'active' ).all() return jsonify({ 'msg': '获取成功', 'data': [ { 'id': u.id, 'username': u.username, 'email': u.email or '', 'role': u.role } for u in users ] }), 200 except Exception as e: current_app.logger.error(f"Get Approvers Failed: {str(e)}") return jsonify({'msg': f'获取审批人列表失败: {str(e)}'}), 500 # ============================================================================== # 获取当前用户个人资料(自我查看) # ============================================================================== @auth_bp.route('/me', methods=['GET']) @jwt_required() def get_my_profile(): """ 【重构】获取当前登录用户的个人资料(自我查看) - 只返回姓名/账号和所属部门 - 严格脱敏:不暴露系统角色字段 """ try: from app.models.system import SysUser user_id = get_jwt_identity() # 超级管理员(user_id=0) if user_id == 0: return jsonify({ 'msg': '获取成功', 'data': { 'id': 0, 'username': 'IRIS', 'display_name': '超级管理员(IRIS)', 'department': 'System', } }), 200 user = SysUser.query.get(user_id) if not user: return jsonify({'msg': '用户不存在'}), 404 return jsonify({ 'msg': '获取成功', 'data': { 'id': user.id, 'username': user.username.split('/')[1] if '/' in user.username else user.username, 'display_name': user.username.split('/')[0] if '/' in user.username else user.username, 'department': user.department or '-', } }), 200 except Exception as e: current_app.logger.error(f"Get Profile Failed: {str(e)}") return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500 # ============================================================================== # 自我修改密码(无需旧密码) # ============================================================================== @auth_bp.route('/me/password', methods=['PUT']) @jwt_required() def change_my_password(): """ 【重构】自我修改密码接口 - 无需管理员权限,无需旧密码 - 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码 """ try: from app.models.system import SysUser user_id = get_jwt_identity() data = request.get_json() if not data: return jsonify({'msg': '无效的请求数据'}), 400 new_password = data.get('new_password') confirm_password = data.get('confirm_password') if not new_password or not confirm_password: return jsonify({'msg': '新密码和确认新密码均不能为空'}), 400 if new_password != confirm_password: return jsonify({'msg': '新密码与确认密码不一致'}), 400 if len(new_password) < 6: return jsonify({'msg': '新密码长度不能少于6位'}), 400 # 超级管理员(user_id=0)使用硬编码密码,不存入数据库 if user_id == 0: return jsonify({'msg': '超级管理员密码由系统管理员管理,当前会话无法修改'}), 200 # 普通用户:JWT 已证明身份,直接更新新密码 user = SysUser.query.get(user_id) if not user: return jsonify({'msg': '用户不存在'}), 404 user.set_password(new_password) db.session.commit() return jsonify({'msg': '密码修改成功,请使用新密码重新登录'}), 200 except Exception as e: current_app.logger.error(f"Change Password Failed: {str(e)}") return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500 # ============================================================================== # 自我更新邮箱 # ============================================================================== @auth_bp.route('/me/email', methods=['PUT']) @jwt_required() def update_my_email(): """ 自我更新邮箱接口 - 仅更新 email 字段,与密码修改完全隔离 - 防止后端意外清空用户密码 """ try: from app.models.system import SysUser user_id = get_jwt_identity() # 超级管理员(user_id=0)不允许修改邮箱 if user_id == 0: return jsonify({'msg': '超级管理员邮箱由系统管理员管理'}), 400 data = request.get_json() if not data: return jsonify({'msg': '无效的请求数据'}), 400 email = data.get('email') if not email: return jsonify({'msg': '邮箱不能为空'}), 400 # 简单的邮箱格式校验 import re if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): return jsonify({'msg': '邮箱格式不正确'}), 400 user = SysUser.query.get(user_id) if not user: return jsonify({'msg': '用户不存在'}), 404 # 检查邮箱是否已被其他用户使用 existing = SysUser.query.filter(SysUser.email == email, SysUser.id != user_id).first() if existing: return jsonify({'msg': '该邮箱已被其他用户使用'}), 400 user.email = email db.session.commit() return jsonify({'msg': '邮箱更新成功'}), 200 except Exception as e: current_app.logger.error(f"Update Email Failed: {str(e)}") return jsonify({'msg': f'邮箱更新失败: {str(e)}'}), 500