# app/services/auth_service.py from app.models.system import SysUser from app.extensions import db from flask_jwt_extended import create_access_token from app.utils.constants import UserRole class AuthService: # 硬编码的超级管理员凭证 SUPER_ADMIN_USER = "IRIS" SUPER_ADMIN_PASS = "licahk" @staticmethod def login(data): username = data.get('username') password = data.get('password') user_role = None user_id = None user_info = {} # 1. 优先检查硬编码的超级管理员 if username == AuthService.SUPER_ADMIN_USER: if password == AuthService.SUPER_ADMIN_PASS: user_role = UserRole.SUPER_ADMIN user_id = 0 # 虚拟ID user_info = { 'username': username, 'role': user_role, 'department': 'System' } else: # [修改] 使用 ValueError 表示认证失败 raise ValueError("密码错误") # 2. 如果不是 IRIS,检查数据库用户 else: user = SysUser.query.filter_by(username=username).first() # [修改] 分开判断,逻辑更清晰,且使用 ValueError if not user: raise ValueError("用户不存在") if not user.check_password(password): raise ValueError("密码错误") if user.status != 'active': raise ValueError("账号已被禁用,请联系管理员") user_role = user.role user_id = user.id user_info = user.to_dict() # 3. 生成 Token access_token = create_access_token( identity=user_id, additional_claims={'role': user_role, 'username': username} ) return { 'access_token': access_token, 'user': user_info } @staticmethod def create_user(data, operator_role): """ 创建新用户 (仅限管理员使用) """ # 简单权限控制:只有超级管理员或主管可以创建用户 if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足:只有超级管理员或主管可以创建新用户") # 检查重名 if SysUser.query.filter_by(username=data.get('username')).first(): raise Exception("用户名已存在") # 默认角色处理 role = data.get('role') valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')] if role not in valid_roles: raise Exception(f"角色无效,可选角色: {valid_roles}") # 处理 Email 为空的情况 email = data.get('email', '') if email and SysUser.query.filter_by(email=email).first(): raise Exception("邮箱已被使用") new_user = SysUser( username=data.get('username'), email=email, department=data.get('department', ''), role=role, status='active' ) new_user.set_password(data.get('password')) db.session.add(new_user) db.session.commit() return new_user.to_dict() @staticmethod def get_all_users(): """获取所有系统用户""" users = SysUser.query.order_by(SysUser.id.desc()).all() return [user.to_dict() for user in users] @staticmethod def delete_user(user_id, operator_role): """删除用户""" if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") db.session.delete(user) db.session.commit() return True