# app/services/auth_service.py from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission from app.extensions import db from sqlalchemy import func from flask_jwt_extended import create_access_token from app.utils.constants import UserRole from datetime import timedelta class AuthService: # 硬编码的超级管理员凭证 SUPER_ADMIN_USER = "IRIS" SUPER_ADMIN_PASS = "licahk" @staticmethod def login(data): # 用户登录时输入的只是账号ID (例如: zhangsan) login_input = data.get('username', '').strip() password = data.get('password') user_role = None user_id = None user_info = {} # 1. 优先检查硬编码的超级管理员 (IRIS) if login_input == AuthService.SUPER_ADMIN_USER: if password == AuthService.SUPER_ADMIN_PASS: user_role = UserRole.SUPER_ADMIN user_id = 0 user_info = { 'username': '超级管理员(IRIS)', 'account_id': 'IRIS', 'role': user_role, 'department': 'System', 'status': 'active' } else: raise ValueError("密码错误") # 2. 检查数据库用户 # 数据库存的是 "张三/zhangsan" # 登录匹配逻辑: 查找以 "/login_input" 结尾的记录 else: # 使用 like 进行后缀匹配: '%/zhangsan' user = SysUser.query.filter(SysUser.username.like(f"%/{login_input}")).first() if not user: raise ValueError("用户不存在") if not user.check_password(password): raise ValueError("密码错误") if user.status != 'active': raise ValueError("账号已被禁用,请联系管理员") user_role = user.role.upper() if user.role else None user_id = user.id user_info = user.to_dict() user_info['role'] = user_role # 3. 生成 Token # Token 中 identity 存数据库ID,claims 存登录账号ID account_id = user_info.get('account_id', login_input) access_token = create_access_token( identity=user_id, additional_claims={ 'role': user_role, 'username': account_id, # 存纯账号ID 'display_name': user_info.get('username') # 存显示名 }, expires_delta=timedelta(days=7) ) return { 'access_token': access_token, 'user': user_info } @staticmethod def create_user(data, operator_role): """ 创建新用户 data 包含: cn_name(张三), username(zhangsan), ... """ if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足:只有超级管理员或主管可以创建新用户") cn_name = data.get('cn_name') pinyin_base = data.get('username') # 前端传来的基础拼音,如 zhangsan if not cn_name or not pinyin_base: raise Exception("姓名和账号不能为空") role_raw = data.get('role') role = role_raw.upper() if role_raw else None # 验证角色合法性 valid_roles = [ v for k, v in UserRole.__dict__.items() if not k.startswith('__') and isinstance(v, str) ] if role not in valid_roles: raise Exception(f"角色无效") if operator_role == UserRole.SUPERVISOR and role == UserRole.SUPER_ADMIN: raise Exception("权限不足:主管无法创建超级管理员") email = data.get('email', '') if email and SysUser.query.filter_by(email=email).first(): raise Exception("邮箱已被使用") # === 核心逻辑: 自动处理账号重复 (zhangsan -> zhangsan1 -> zhangsan2) === final_account_id = pinyin_base counter = 1 # 如果重复,从1开始累加 while True: # 检查数据库是否存在以 "/final_account_id" 结尾的记录 existing = SysUser.query.filter( (SysUser.username.like(f"%/{final_account_id}")) | (SysUser.username == final_account_id) ).first() if not existing: break # 找到了可用的ID,跳出循环 # 如果存在,使用 base + counter final_account_id = f"{pinyin_base}{counter}" counter += 1 # 拼接最终存储格式: 张三/zhangsan1 full_username_storage = f"{cn_name}/{final_account_id}" new_user = SysUser( username=full_username_storage, # 存组合串 email=email, department=data.get('department', ''), role=role, status='active' ) new_user.set_password(data.get('password')) db.session.add(new_user) db.session.commit() # 返回时,最好把生成的ID告诉前端 return new_user.to_dict() @staticmethod def update_user(user_id, data, operator_role): """ 更新用户信息 注意: 这里暂时不允许修改用户名/账号,因为涉及 split 逻辑较复杂,且通常账号不开通后不改 """ if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") # 1. 更新基本信息 if 'role' in data: valid_roles = [ v for k, v in UserRole.__dict__.items() if not k.startswith('__') and isinstance(v, str) ] new_role_raw = data['role'] new_role = new_role_raw.upper() if new_role_raw else None if new_role not in valid_roles: raise Exception(f"角色无效") if operator_role == UserRole.SUPERVISOR and new_role == UserRole.SUPER_ADMIN: raise Exception("权限不足") user.role = new_role if 'department' in data: user.department = data['department'] if 'email' in data: email = data['email'] if email and email != user.email: existing = SysUser.query.filter_by(email=email).first() if existing: raise Exception("该邮箱已被其他用户使用") user.email = email if 'status' in data: user.status = data['status'] new_password = data.get('password') if new_password and str(new_password).strip(): if len(new_password) < 6: raise Exception("密码长度至少6位") user.set_password(new_password) db.session.commit() return user.to_dict() @staticmethod def get_all_users(): """获取所有系统用户""" users = SysUser.query.order_by(SysUser.id.desc()).all() return [user.to_dict() for user in users] @staticmethod def delete_user(user_id, operator_role): """删除用户""" if operator_role != UserRole.SUPER_ADMIN: raise Exception("权限不足:只有超级管理员可以删除用户") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") db.session.delete(user) db.session.commit() return True @staticmethod def get_user_permissions(role_code): """ 获取指定角色的所有权限代码列表 返回格式: { 'menus': ['inbound_buy', 'system_user'], 'elements': ['inbound_buy:unit_price', ...] } """ # 超级管理员返回所有权限(通配符) from app.utils.constants import UserRole if role_code and role_code.upper() == UserRole.SUPER_ADMIN: # 返回通配符,表示拥有所有菜单和元素权限 return { 'menus': ['*'], 'elements': ['*'] } # 1. 查菜单权限 menu_perms = SysRolePermission.query.filter( func.upper(SysRolePermission.role_code) == role_code.upper(), SysRolePermission.type == 'menu' ).all() menu_codes = [p.target_code for p in menu_perms] # 2. 查元素(列)权限 # 注意:这里我们只返回用户拥有的。前端逻辑是:"如果列配置了Key且用户没这个Key,则隐藏" element_perms = SysRolePermission.query.filter( func.upper(SysRolePermission.role_code) == role_code.upper(), SysRolePermission.type == 'element' ).all() # 这里的 target_code 就是列的 code (如 unit_price) # 为了防止不同页面有相同列名导致的混淆,我们之前数据库设计是做了隔离的 # 但为了前端处理方便,我们直接返回列的 code 集合 element_codes = [p.target_code for p in element_perms] return { 'menus': menu_codes, 'elements': element_codes }