Files
KCGL/inventory-backend/app/services/auth_service.py
dxc 5beb373677 fix: standardize operator role to uppercase for permission checks
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
2026-02-27 17:11:29 +08:00

266 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/auth_service.py
from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission
from app.extensions import db
from sqlalchemy import func
from flask_jwt_extended import create_access_token
from app.utils.constants import UserRole
from datetime import timedelta
class AuthService:
# 硬编码的超级管理员凭证
SUPER_ADMIN_USER = "IRIS"
SUPER_ADMIN_PASS = "licahk"
@staticmethod
def login(data):
# 用户登录时输入的只是账号ID (例如: zhangsan)
login_input = data.get('username', '').strip()
password = data.get('password')
user_role = None
user_id = None
user_info = {}
# 1. 优先检查硬编码的超级管理员 (IRIS)
if login_input == AuthService.SUPER_ADMIN_USER:
if password == AuthService.SUPER_ADMIN_PASS:
user_role = UserRole.SUPER_ADMIN
user_id = 0
user_info = {
'username': '超级管理员(IRIS)',
'account_id': 'IRIS',
'role': user_role,
'department': 'System',
'status': 'active'
}
else:
raise ValueError("密码错误")
# 2. 检查数据库用户
# 数据库存的是 "张三/zhangsan"
# 登录匹配逻辑: 查找以 "/login_input" 结尾的记录
else:
# 使用 like 进行后缀匹配: '%/zhangsan'
user = SysUser.query.filter(SysUser.username.like(f"%/{login_input}")).first()
if not user:
raise ValueError("用户不存在")
if not user.check_password(password):
raise ValueError("密码错误")
if user.status != 'active':
raise ValueError("账号已被禁用,请联系管理员")
user_role = user.role.upper() if user.role else None
user_id = user.id
user_info = user.to_dict()
user_info['role'] = user_role
# 3. 生成 Token
# Token 中 identity 存数据库IDclaims 存登录账号ID
account_id = user_info.get('account_id', login_input)
access_token = create_access_token(
identity=user_id,
additional_claims={
'role': user_role,
'username': account_id, # 存纯账号ID
'display_name': user_info.get('username') # 存显示名
},
expires_delta=timedelta(days=7)
)
return {
'access_token': access_token,
'user': user_info
}
@staticmethod
def create_user(data, operator_role):
"""
创建新用户
data 包含: cn_name(张三), username(zhangsan), ...
"""
# 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
cn_name = data.get('cn_name')
pinyin_base = data.get('username') # 前端传来的基础拼音,如 zhangsan
if not cn_name or not pinyin_base:
raise Exception("姓名和账号不能为空")
role_raw = data.get('role')
role = role_raw.upper() if role_raw else None
# 验证角色合法性
valid_roles = [
v for k, v in UserRole.__dict__.items()
if not k.startswith('__') and isinstance(v, str)
]
if role not in valid_roles:
raise Exception(f"角色无效")
if operator_role_upper == UserRole.SUPERVISOR and role == UserRole.SUPER_ADMIN:
raise Exception("权限不足:主管无法创建超级管理员")
email = data.get('email', '')
if email and SysUser.query.filter_by(email=email).first():
raise Exception("邮箱已被使用")
# === 核心逻辑: 自动处理账号重复 (zhangsan -> zhangsan1 -> zhangsan2) ===
final_account_id = pinyin_base
counter = 1 # 如果重复从1开始累加
while True:
# 检查数据库是否存在以 "/final_account_id" 结尾的记录
existing = SysUser.query.filter(
(SysUser.username.like(f"%/{final_account_id}")) |
(SysUser.username == final_account_id)
).first()
if not existing:
break # 找到了可用的ID跳出循环
# 如果存在,使用 base + counter
final_account_id = f"{pinyin_base}{counter}"
counter += 1
# 拼接最终存储格式: 张三/zhangsan1
full_username_storage = f"{cn_name}/{final_account_id}"
new_user = SysUser(
username=full_username_storage, # 存组合串
email=email,
department=data.get('department', ''),
role=role,
status='active'
)
new_user.set_password(data.get('password'))
db.session.add(new_user)
db.session.commit()
# 返回时最好把生成的ID告诉前端
return new_user.to_dict()
@staticmethod
def update_user(user_id, data, operator_role):
"""
更新用户信息
注意: 这里暂时不允许修改用户名/账号,因为涉及 split 逻辑较复杂,且通常账号不开通后不改
"""
# 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
# 1. 更新基本信息
if 'role' in data:
valid_roles = [
v for k, v in UserRole.__dict__.items()
if not k.startswith('__') and isinstance(v, str)
]
new_role_raw = data['role']
new_role = new_role_raw.upper() if new_role_raw else None
if new_role not in valid_roles:
raise Exception(f"角色无效")
if operator_role_upper == UserRole.SUPERVISOR and new_role == UserRole.SUPER_ADMIN:
raise Exception("权限不足")
user.role = new_role
if 'department' in data:
user.department = data['department']
if 'email' in data:
email = data['email']
if email and email != user.email:
existing = SysUser.query.filter_by(email=email).first()
if existing:
raise Exception("该邮箱已被其他用户使用")
user.email = email
if 'status' in data:
user.status = data['status']
new_password = data.get('password')
if new_password and str(new_password).strip():
if len(new_password) < 6:
raise Exception("密码长度至少6位")
user.set_password(new_password)
db.session.commit()
return user.to_dict()
@staticmethod
def get_all_users():
"""获取所有系统用户"""
users = SysUser.query.order_by(SysUser.id.desc()).all()
return [user.to_dict() for user in users]
@staticmethod
def delete_user(user_id, operator_role):
"""删除用户"""
# 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper != UserRole.SUPER_ADMIN:
raise Exception("权限不足:只有超级管理员可以删除用户")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
db.session.delete(user)
db.session.commit()
return True
@staticmethod
def get_user_permissions(role_code):
"""
获取指定角色的所有权限代码列表
返回格式: {
'menus': ['inbound_buy', 'system_user'],
'elements': ['inbound_buy:unit_price', ...]
}
"""
# 超级管理员返回所有权限(通配符)
from app.utils.constants import UserRole
if role_code and role_code.upper() == UserRole.SUPER_ADMIN:
# 返回通配符,表示拥有所有菜单和元素权限
return {
'menus': ['*'],
'elements': ['*']
}
# 1. 查菜单权限
menu_perms = SysRolePermission.query.filter(
func.upper(SysRolePermission.role_code) == role_code.upper(),
SysRolePermission.type == 'menu'
).all()
menu_codes = [p.target_code for p in menu_perms]
# 2. 查元素(列)权限
# 注意:这里我们只返回用户拥有的。前端逻辑是:"如果列配置了Key且用户没这个Key则隐藏"
element_perms = SysRolePermission.query.filter(
func.upper(SysRolePermission.role_code) == role_code.upper(),
SysRolePermission.type == 'element'
).all()
# 这里的 target_code 就是列的 code (如 unit_price)
# 为了防止不同页面有相同列名导致的混淆,我们之前数据库设计是做了隔离的
# 但为了前端处理方便,我们直接返回列的 code 集合
element_codes = [p.target_code for p in element_perms]
return {
'menus': menu_codes,
'elements': element_codes
}