# app/services/auth_service.py from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission from app.extensions import db, redis_client from sqlalchemy import func from flask_jwt_extended import create_access_token, create_refresh_token, get_jwt_identity from flask import current_app from app.utils.constants import UserRole from datetime import timedelta import json def _store_token_to_redis(user_id, token, token_type='access'): """ 将 Token 存储到 Redis,用于单设备登录互踢 键名: user_token_{user_id} 值: token 过期时间: 与 JWT 过期时间一致 """ if redis_client is None: current_app.logger.warning("Redis not available, skipping token storage") return False try: # 获取 JWT 过期时间配置 if token_type == 'access': expires_delta = current_app.config.get('JWT_ACCESS_TOKEN_EXPIRES', timedelta(hours=2)) else: expires_delta = current_app.config.get('JWT_REFRESH_TOKEN_EXPIRES', timedelta(days=7)) # 转换为秒数(兼容 timedelta 和 int 类型) if isinstance(expires_delta, timedelta): expires_seconds = int(expires_delta.total_seconds()) else: expires_seconds = int(expires_delta) # 已经是秒数(整数) # 存储到 Redis key = f"user_token_{user_id}" redis_client.setex(key, expires_seconds, token) return True except Exception as e: current_app.logger.error(f"Failed to store token to Redis: {e}") return False def _get_token_from_redis(user_id): """从 Redis 获取当前有效的 Token""" if redis_client is None: return None try: key = f"user_token_{user_id}" return redis_client.get(key) except Exception as e: current_app.logger.error(f"Failed to get token from Redis: {e}") return None class AuthService: # 硬编码的超级管理员凭证 SUPER_ADMIN_USER = "IRIS" SUPER_ADMIN_PASS = "licahk" @staticmethod def login(data): # 用户登录时输入的只是账号ID (例如: zhangsan) login_input = data.get('username', '').strip() password = data.get('password') user_role = None user_id = None user_info = {} # 1. 优先检查硬编码的超级管理员 (IRIS) if login_input == AuthService.SUPER_ADMIN_USER: if password == AuthService.SUPER_ADMIN_PASS: user_role = UserRole.SUPER_ADMIN user_id = 0 user_info = { 'username': '超级管理员(IRIS)', 'account_id': 'IRIS', 'role': user_role, 'department': 'System', 'status': 'active' } else: raise ValueError("密码错误") # 2. 检查数据库用户 # 数据库存的是 "张三/zhangsan" # 登录匹配逻辑: 查找以 "/login_input" 结尾的记录 else: # 使用 like 进行后缀匹配: '%/zhangsan' user = SysUser.query.filter(SysUser.username.like(f"%/{login_input}")).first() if not user: raise ValueError("用户不存在") if not user.check_password(password): raise ValueError("密码错误") if user.status != 'active': raise ValueError("账号已被禁用,请联系管理员") user_role = user.role.upper() if user.role else None user_id = user.id user_info = user.to_dict() user_info['role'] = user_role # 获取用户所属公司(存于 department 字段) user_company = user.department or '' # 3. 生成 Token # Token 中 identity 存数据库ID,claims 存登录账号ID account_id = user_info.get('account_id', login_input) # Access Token: 2小时有效期 access_token = create_access_token( identity=user_id, additional_claims={ 'role': user_role, 'username': account_id, # 存纯账号ID 'display_name': user_info.get('username'), # 存显示名 'company_name': user_company # 存所属公司 } ) # Refresh Token: 7天有效期 refresh_token = create_refresh_token( identity=user_id, additional_claims={ 'role': user_role, 'username': account_id, 'display_name': user_info.get('display_name', account_id), 'company_name': user_company } ) # 4. 存储 Token 到 Redis (用于单设备登录互踢) # 使用 str(user_id) 作为 Redis key 的一部分,因为 user_id=0 是超级管理员 _store_token_to_redis(str(user_id), access_token, 'access') _store_token_to_redis(str(user_id), refresh_token, 'refresh') return { 'access_token': access_token, 'refresh_token': refresh_token, 'user': user_info } @staticmethod def refresh_access_token(refresh_token_str): """ 使用 refresh_token 换发新的 access_token """ from flask_jwt_extended import decode_token from flask import current_app try: # 解码 refresh_token(不验证过期,仅获取 claims) decoded = decode_token(refresh_token_str) user_id = decoded.get('sub') role = decoded.get('role') username = decoded.get('username') if not user_id: raise ValueError("无效的 refresh_token") # 重新查询数据库获取用户的 display_name,避免刷新后丢失 from app.models.system import SysUser user = SysUser.query.get(user_id) if user: user_info = user.to_dict() display_name = user_info.get('display_name', username) else: display_name = username # 生成新的 access_token new_access_token = create_access_token( identity=user_id, additional_claims={ 'role': role, 'username': username, 'display_name': display_name } ) return { 'access_token': new_access_token } except Exception as e: raise ValueError(f"Token 刷新失败: {str(e)}") @staticmethod def create_user(data, operator_role): """ 创建新用户 data 包含: cn_name(张三), username(zhangsan), ... """ # 标准化操作者角色为全大写 operator_role_upper = operator_role.upper() if operator_role else None if operator_role_upper not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足:只有超级管理员或主管可以创建新用户") cn_name = data.get('cn_name') pinyin_base = data.get('username') # 前端传来的基础拼音,如 zhangsan if not cn_name or not pinyin_base: raise Exception("姓名和账号不能为空") # 后端兜底正则校验:允许中英数,禁止纯数字,无特殊字符 import re name_pattern = re.compile(r'^(?!\d+$)[a-zA-Z0-9\u4e00-\u9fa5]+$') if not name_pattern.match(cn_name): raise Exception("姓名格式错误:仅支持中英文和数字,不能为纯数字,且不支持特殊字符") if not name_pattern.match(pinyin_base): raise Exception("账号格式错误:仅支持中英文和数字,不能为纯数字,且不支持特殊字符") role_raw = data.get('role') role = role_raw.upper() if role_raw else None # 验证角色合法性 valid_roles = [ v for k, v in UserRole.__dict__.items() if not k.startswith('__') and isinstance(v, str) ] if role not in valid_roles: raise Exception(f"角色无效") if operator_role_upper == UserRole.SUPERVISOR and role == UserRole.SUPER_ADMIN: raise Exception("权限不足:主管无法创建超级管理员") email = data.get('email', '') or None # 空字符串转 None,避免 unique 冲突 if email and SysUser.query.filter_by(email=email).first(): raise Exception("邮箱已被使用") # === 核心逻辑: 自动处理账号重复 (zhangsan -> zhangsan1 -> zhangsan2) === final_account_id = pinyin_base counter = 1 # 如果重复,从1开始累加 while True: # 检查数据库是否存在以 "/final_account_id" 结尾的记录 existing = SysUser.query.filter( (SysUser.username.like(f"%/{final_account_id}")) | (SysUser.username == final_account_id) ).first() if not existing: break # 找到了可用的ID,跳出循环 # 如果存在,使用 base + counter final_account_id = f"{pinyin_base}{counter}" counter += 1 # 拼接最终存储格式: 张三/zhangsan1 full_username_storage = f"{cn_name}/{final_account_id}" new_user = SysUser( username=full_username_storage, # 存组合串 email=email, department=data.get('department', ''), role=role, status='active' ) new_user.set_password(data.get('password')) db.session.add(new_user) db.session.commit() # 返回时,最好把生成的ID告诉前端 return new_user.to_dict() @staticmethod def batch_create_users(data_list, operator_role): """ 批量创建新用户。复用 create_user 的核心防重逻辑。 """ results = [] for data in data_list: try: # 复用单条创建逻辑,它自带张三/zhangsan1的防重机制 new_user_dict = AuthService.create_user(data, operator_role) results.append({ "cn_name": data.get('cn_name'), "account_id": new_user_dict.get('account_id'), "status": "success" }) except Exception as e: results.append({ "cn_name": data.get('cn_name'), "error": str(e), "status": "fail" }) return results @staticmethod def update_user(user_id, data, operator_role): """ 更新用户信息 注意: 这里暂时不允许修改用户名/账号,因为涉及 split 逻辑较复杂,且通常账号不开通后不改 """ # 标准化操作者角色为全大写 operator_role_upper = operator_role.upper() if operator_role else None if operator_role_upper not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]: raise Exception("权限不足") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") # 1. 更新基本信息 if 'role' in data: valid_roles = [ v for k, v in UserRole.__dict__.items() if not k.startswith('__') and isinstance(v, str) ] new_role_raw = data['role'] new_role = new_role_raw.upper() if new_role_raw else None if new_role not in valid_roles: raise Exception(f"角色无效") if operator_role_upper == UserRole.SUPERVISOR and new_role == UserRole.SUPER_ADMIN: raise Exception("权限不足") user.role = new_role if 'department' in data: user.department = data['department'] if 'email' in data: email = data['email'] if email and email != user.email: existing = SysUser.query.filter_by(email=email).first() if existing: raise Exception("该邮箱已被其他用户使用") user.email = email if 'status' in data: user.status = data['status'] new_password = data.get('password') if new_password and str(new_password).strip(): if len(new_password) < 6: raise Exception("密码长度至少6位") user.set_password(new_password) db.session.commit() return user.to_dict() @staticmethod def get_all_users(): """获取所有系统用户""" users = SysUser.query.order_by(SysUser.id.desc()).all() return [user.to_dict() for user in users] @staticmethod def delete_user(user_id, operator_role): """删除用户""" # 标准化操作者角色为全大写 operator_role_upper = operator_role.upper() if operator_role else None if operator_role_upper != UserRole.SUPER_ADMIN: raise Exception("权限不足:只有超级管理员可以删除用户") user = SysUser.query.get(user_id) if not user: raise Exception("用户不存在") # 提前获取用户名用于审计日志 username = user.username db.session.delete(user) db.session.commit() return username @staticmethod def get_user_permissions(role_code): """ 获取指定角色的所有权限代码列表 返回格式: { 'menus': ['inbound_buy', 'system_user'], 'elements': ['inbound_buy:unit_price', ...] } """ # 防御性编程:role_code 为空时直接返回空权限,避免后续 SQL 崩溃 if not role_code: return {'menus': [], 'elements': []} # 超级管理员返回所有权限(通配符) from app.utils.constants import UserRole if role_code.upper() == UserRole.SUPER_ADMIN: # 返回通配符,表示拥有所有菜单和元素权限 return { 'menus': ['*'], 'elements': ['*'] } # 1. 查菜单权限 # 使用 func.upper() 处理数据库字段的大小写 menu_perms = SysRolePermission.query.filter( func.upper(SysRolePermission.role_code) == role_code.upper(), SysRolePermission.type == 'menu' ).all() menu_codes = [p.target_code for p in menu_perms] # 2. 查元素(列)权限 # 注意:这里我们只返回用户拥有的。前端逻辑是:"如果列配置了Key且用户没这个Key,则隐藏" element_perms = SysRolePermission.query.filter( func.upper(SysRolePermission.role_code) == role_code.upper(), SysRolePermission.type == 'element' ).all() element_codes = [p.target_code for p in element_perms] # 调试日志:输出查询结果便于排查字段权限问题 from flask import current_app current_app.logger.info( f"[权限查询] role={role_code}, 查询到菜单权限={menu_codes}, 元素权限={element_codes}" ) return { 'menus': menu_codes, 'elements': element_codes }