diff --git a/inventory-backend/app/api/v1/auth.py b/inventory-backend/app/api/v1/auth.py index d3183c9..23c5ae6 100644 --- a/inventory-backend/app/api/v1/auth.py +++ b/inventory-backend/app/api/v1/auth.py @@ -1,7 +1,24 @@ # app/api/v1/auth.py +# ============================================================================== +# Flask & Extensions +# ============================================================================== from flask import Blueprint, request, jsonify, current_app -from flask_jwt_extended import jwt_required, get_jwt, get_jwt_identity +from flask_jwt_extended import ( + jwt_required, + get_jwt, + get_jwt_identity, + create_access_token, +) +from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden, NotFound, InternalServerError + +# ============================================================================== +# Database & Extensions +# ============================================================================== from app.extensions import db + +# ============================================================================== +# Service & Decorators +# ============================================================================== from app.services.auth_service import AuthService from app.utils.decorators import permission_required, audit_log @@ -53,6 +70,9 @@ def filter_item_by_permissions(item_dict, user_permissions): return item_dict +# ============================================================================== +# 登录接口 +# ============================================================================== @auth_bp.route('/login', methods=['POST']) def login(): try: @@ -74,17 +94,19 @@ def login(): return jsonify(response_data), 200 except ValueError as ve: + current_app.logger.warning(f"Login validation failed: {str(ve)}") return jsonify({'msg': str(ve)}), 401 except Exception as e: current_app.logger.error(f"Login Failed Error: {str(e)}") return jsonify({'msg': f'服务器内部错误: {str(e)}'}), 500 +# ============================================================================== +# Token 刷新接口 +# ============================================================================== @auth_bp.route('/refresh', methods=['POST']) def refresh(): - """ - 使用 refresh_token 换发新的 access_token - """ + """使用 refresh_token 换发新的 access_token""" try: data = request.get_json() if not data or not data.get('refresh_token'): @@ -99,12 +121,16 @@ def refresh(): }), 200 except ValueError as ve: + current_app.logger.warning(f"Token refresh validation failed: {str(ve)}") return jsonify({'msg': str(ve)}), 401 except Exception as e: current_app.logger.error(f"Token Refresh Error: {str(e)}") return jsonify({'msg': f'Token 刷新失败: {str(e)}'}), 500 +# ============================================================================== +# 创建用户(管理员) +# ============================================================================== @auth_bp.route('/user/create', methods=['POST']) @jwt_required() @permission_required('system_user:operation') @@ -120,7 +146,6 @@ def create_user(): user_permissions = get_current_user_permissions() # 超级管理员不过滤 if 'system_user:*' not in user_permissions: - # 字段名到权限码的映射 field_to_perm = { 'cn_name': 'system_user:username', 'username': 'system_user:username', @@ -129,13 +154,9 @@ def create_user(): 'role': 'system_user:role', 'email': 'system_user:email', } - # 对于 password 字段,如果没有对应权限但用户有操作权限,可以保留(由装饰器保证) - # 但如果连操作权限都没有,则不会进入此接口。 for field in list(data.keys()): perm_code = field_to_perm.get(field) - # 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除) if field == 'password': - # 检查用户是否有操作权限,如果有则保留 if 'system_user:operation' not in user_permissions: data.pop(field, None) continue @@ -153,7 +174,9 @@ def create_user(): return jsonify({'msg': str(e)}), 400 -# [新增] 更新用户 +# ============================================================================== +# 更新用户(管理员) +# ============================================================================== @auth_bp.route('/user/', methods=['PUT']) @jwt_required() @permission_required('system_user:operation') @@ -165,11 +188,8 @@ def create_user(): def update_user(user_id): try: data = request.get_json() - # 数据清洗:移除用户没有权限的字段 user_permissions = get_current_user_permissions() - # 超级管理员不过滤 if 'system_user:*' not in user_permissions: - # 字段名到权限码的映射 field_to_perm = { 'cn_name': 'system_user:username', 'username': 'system_user:username', @@ -180,9 +200,7 @@ def update_user(user_id): } for field in list(data.keys()): perm_code = field_to_perm.get(field) - # 密码字段特殊处理:如果没有 password 权限但用户有操作权限,仍允许(不删除) if field == 'password': - # 检查用户是否有操作权限,如果有则保留 if 'system_user:operation' not in user_permissions: data.pop(field, None) continue @@ -200,21 +218,26 @@ def update_user(user_id): return jsonify({'msg': str(e)}), 400 +# ============================================================================== +# 获取所有用户列表(管理员) +# ============================================================================== @auth_bp.route('/users', methods=['GET']) @jwt_required() @permission_required('system_user') def get_users(): try: users = AuthService.get_all_users() - # 字段级脱敏 user_permissions = get_current_user_permissions() filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users] return jsonify({'msg': '获取成功', 'data': filtered_users}), 200 except Exception as e: current_app.logger.error(f"Get Users Failed: {str(e)}") - return jsonify({'msg': '获取用户列表失败'}), 500 + return jsonify({'msg': f'获取用户列表失败: {str(e)}'}), 500 +# ============================================================================== +# 删除用户(管理员) +# ============================================================================== @auth_bp.route('/user/', methods=['DELETE']) @jwt_required() @permission_required('system_user:operation') @@ -235,6 +258,9 @@ def delete_user(user_id): return jsonify({'msg': str(e)}), 400 +# ============================================================================== +# 获取当前用户权限列表(登录后调用) +# ============================================================================== @auth_bp.route('/my-permissions', methods=['GET']) @jwt_required() def get_my_permissions(): @@ -242,16 +268,63 @@ def get_my_permissions(): try: claims = get_jwt() role = claims.get('role') - - # 调用 Service 获取权限 permissions = AuthService.get_user_permissions(role) - return jsonify({'msg': '获取成功', 'data': permissions}), 200 except Exception as e: current_app.logger.error(f"Get Permissions Failed: {str(e)}") - return jsonify({'msg': '获取权限失败'}), 500 + return jsonify({'msg': f'获取权限失败: {str(e)}'}), 500 +# ============================================================================== +# 获取当前用户个人资料(自我查看) +# ============================================================================== +@auth_bp.route('/me', methods=['GET']) +@jwt_required() +def get_my_profile(): + """ + 【重构】获取当前登录用户的个人资料(自我查看) + - 只返回姓名/账号和所属部门 + - 严格脱敏:不暴露系统角色字段 + """ + try: + from app.models.system import SysUser + + user_id = get_jwt_identity() + + # 超级管理员(user_id=0) + if user_id == 0: + return jsonify({ + 'msg': '获取成功', + 'data': { + 'id': 0, + 'username': 'IRIS', + 'display_name': '超级管理员(IRIS)', + 'department': 'System', + } + }), 200 + + user = SysUser.query.get(user_id) + if not user: + return jsonify({'msg': '用户不存在'}), 404 + + return jsonify({ + 'msg': '获取成功', + 'data': { + 'id': user.id, + 'username': user.username.split('/')[1] if '/' in user.username else user.username, + 'display_name': user.username.split('/')[0] if '/' in user.username else user.username, + 'department': user.department or '-', + } + }), 200 + + except Exception as e: + current_app.logger.error(f"Get Profile Failed: {str(e)}") + return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500 + + +# ============================================================================== +# 自我修改密码(无需旧密码) +# ============================================================================== @auth_bp.route('/me/password', methods=['PUT']) @jwt_required() def change_my_password(): @@ -259,13 +332,10 @@ def change_my_password(): 【重构】自我修改密码接口 - 无需管理员权限,无需旧密码 - 只要 JWT Token 有效(已证明当前登录身份),即可直接修改新密码 - - 使用 get_jwt_identity() 获取用户 ID,与项目其他接口保持一致 """ try: from app.models.system import SysUser - # 【关键修复】使用 get_jwt_identity() 而非 claims.get('sub'), - # 与项目其他接口(outbound.py / scrap.py 等)保持一致,避免 JWT sub 字段取不到导致 500 user_id = get_jwt_identity() data = request.get_json() @@ -301,51 +371,3 @@ def change_my_password(): except Exception as e: current_app.logger.error(f"Change Password Failed: {str(e)}") return jsonify({'msg': f'密码修改失败: {str(e)}'}), 500 - - -@auth_bp.route('/me', methods=['GET']) -@jwt_required() -def get_my_profile(): - """ - 【重构】获取当前登录用户的个人资料(自我查看) - - 只返回姓名/账号和所属部门 - - 严格脱敏:不暴露系统角色字段 - - 使用 get_jwt_identity() 获取用户 ID - """ - try: - from app.models.system import SysUser - - # 【关键修复】统一使用 get_jwt_identity() - user_id = get_jwt_identity() - - # 超级管理员(user_id=0) - if user_id == 0: - return jsonify({ - 'msg': '获取成功', - 'data': { - 'id': 0, - 'username': 'IRIS', - 'display_name': '超级管理员(IRIS)', - 'department': 'System', - # 【关键】严格脱敏:不暴露 role 字段 - } - }), 200 - - user = SysUser.query.get(user_id) - if not user: - return jsonify({'msg': '用户不存在'}), 404 - - return jsonify({ - 'msg': '获取成功', - 'data': { - 'id': user.id, - 'username': user.username.split('/')[1] if '/' in user.username else user.username, - 'display_name': user.username.split('/')[0] if '/' in user.username else user.username, - 'department': user.department or '-', - # 【关键】严格脱敏:不暴露 role 字段 - } - }), 200 - - except Exception as e: - current_app.logger.error(f"Get Profile Failed: {str(e)}") - return jsonify({'msg': f'获取个人资料失败: {str(e)}'}), 500