# app/services/auth_service.py from app.models.system import SysUser from app.extensions import db from flask_jwt_extended import create_access_token from app.utils.constants import UserRole class AuthService: # 硬编码的超级管理员凭证 SUPER_ADMIN_USER = "IRIS" SUPER_ADMIN_PASS = "licahk" @staticmethod def login(data): username = data.get('username') password = data.get('password') user_role = None user_id = None user_info = {} # 1. 优先检查硬编码的超级管理员 if username == AuthService.SUPER_ADMIN_USER: if password == AuthService.SUPER_ADMIN_PASS: user_role = UserRole.SUPER_ADMIN user_id = 0 # 虚拟ID user_info = { 'username': username, 'role': user_role, 'department': 'System' } else: raise ValueError("密码错误") # 2. 如果不是 IRIS,检查数据库用户 else: user = SysUser.query.filter_by(username=username).first() if not user: raise ValueError("用户不存在") if not user.check_password(password): raise ValueError("密码错误") if user.status != 'active': raise ValueError("账号已被禁用,请联系管理员") user_role = user.role user_id = user.id user_info = user.to_dict() # 3. 生成 Token access_token = create_access_token( identity=user_id, additional_claims={'role': user_role, 'username': username} ) return { 'access_token': access_token, 'user': user_info } @staticmethod def create_user(data, operator_role): """ 创建新用户 (仅限管理员使用) """ if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足:只有超级管理员或主管可以创建新用户") if SysUser.query.filter_by(username=data.get('username')).first(): raise Exception("用户名已存在") role = data.get('role') valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')] if role not in valid_roles: raise Exception(f"角色无效,可选角色: {valid_roles}") email = data.get('email', '') if email and SysUser.query.filter_by(email=email).first(): raise Exception("邮箱已被使用") new_user = SysUser( username=data.get('username'), email=email, department=data.get('department', ''), role=role, status='active' ) new_user.set_password(data.get('password')) db.session.add(new_user) db.session.commit() return new_user.to_dict() @staticmethod def update_user(user_id, data, operator_role): """ [新增] 更新用户信息 """ if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足:只有超级管理员或主管可以修改用户信息") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") # 1. 更新基本信息 if 'role' in data: valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')] if data['role'] not in valid_roles: raise Exception(f"角色无效") user.role = data['role'] if 'department' in data: user.department = data['department'] if 'email' in data: # 如果修改了邮箱,且新邮箱已被其他人使用 email = data['email'] if email and email != user.email: existing = SysUser.query.filter_by(email=email).first() if existing: raise Exception("该邮箱已被其他用户使用") user.email = email # 2. 如果提供了密码,则重置密码;否则保持原密码 new_password = data.get('password') if new_password and str(new_password).strip(): if len(new_password) < 6: raise Exception("密码长度至少6位") user.set_password(new_password) db.session.commit() return user.to_dict() @staticmethod def get_all_users(): """获取所有系统用户""" users = SysUser.query.order_by(SysUser.id.desc()).all() return [user.to_dict() for user in users] @staticmethod def delete_user(user_id, operator_role): """删除用户""" if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") db.session.delete(user) db.session.commit() return True