# app/services/auth_service.py from app.models.system import SysUser from app.extensions import db from flask_jwt_extended import create_access_token, get_jwt_identity, get_jwt from werkzeug.security import check_password_hash from app.utils.constants import UserRole class AuthService: # 硬编码的超级管理员凭证 SUPER_ADMIN_USER = "IRIS" SUPER_ADMIN_PASS = "licahk" @staticmethod def login(data): username = data.get('username') password = data.get('password') user_role = None user_id = None user_info = {} # 1. 优先检查硬编码的超级管理员 if username == AuthService.SUPER_ADMIN_USER: if password == AuthService.SUPER_ADMIN_PASS: user_role = UserRole.SUPER_ADMIN user_id = 0 # 虚拟ID user_info = { 'username': username, 'role': user_role, 'department': 'System' } else: raise Exception("密码错误") # 2. 如果不是 IRIS,检查数据库用户 else: user = SysUser.query.filter_by(username=username).first() if not user or not user.check_password(password): raise Exception("用户名或密码错误") if user.status != 'active': raise Exception("账号已被禁用") user_role = user.role user_id = user.id user_info = user.to_dict() # 3. 生成 Token,将角色写入 claims (关键步骤:用于后期权限控制) # identity 存 ID,additional_claims 存角色 access_token = create_access_token( identity=user_id, additional_claims={'role': user_role, 'username': username} ) return { 'access_token': access_token, 'user': user_info } @staticmethod def create_user(data, operator_role): """ 创建新用户 (仅限管理员使用) :param data: 新用户数据 :param operator_role: 当前操作人的角色 (从 Token 获取) """ # 简单权限控制:只有超级管理员或主管可以创建用户 if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足:只有超级管理员或主管可以创建新用户") # 检查重名 if SysUser.query.filter_by(username=data.get('username')).first(): raise Exception("用户名已存在") # 默认角色处理 role = data.get('role') # 验证角色是否合法 valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')] if role not in valid_roles: raise Exception(f"角色无效,可选角色: {valid_roles}") new_user = SysUser( username=data.get('username'), email=data.get('email', ''), # 允许为空 department=data.get('department', ''), role=role, status='active' ) new_user.set_password(data.get('password')) db.session.add(new_user) db.session.commit() return new_user.to_dict()