Files
KCGL/inventory-backend/app/services/auth_service.py

192 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/auth_service.py
from app.models.system import SysUser
from app.extensions import db
from flask_jwt_extended import create_access_token
from app.utils.constants import UserRole
from datetime import timedelta
class AuthService:
# 硬编码的“初始”超级管理员凭证 (用于系统初始化或数据库被锁死时)
SUPER_ADMIN_USER = "IRIS"
SUPER_ADMIN_PASS = "licahk"
@staticmethod
def login(data):
username = data.get('username')
password = data.get('password')
user_role = None
user_id = None
user_info = {}
# 1. 优先检查硬编码的超级管理员 (IRIS)
if username == AuthService.SUPER_ADMIN_USER:
if password == AuthService.SUPER_ADMIN_PASS:
# 显式指定角色为 SUPER_ADMIN
user_role = UserRole.SUPER_ADMIN
user_id = 0 # 虚拟ID区分于数据库ID
user_info = {
'username': username,
'role': user_role,
'department': 'System',
'status': 'active'
}
else:
raise ValueError("密码错误")
# 2. 如果不是硬编码用户,检查数据库用户
else:
user = SysUser.query.filter_by(username=username).first()
if not user:
raise ValueError("用户不存在")
if not user.check_password(password):
raise ValueError("密码错误")
if user.status != 'active':
raise ValueError("账号已被禁用,请联系管理员")
user_role = user.role
user_id = user.id
user_info = user.to_dict()
# 3. 生成 Token (设置为7天过期)
access_token = create_access_token(
identity=user_id,
additional_claims={'role': user_role, 'username': username},
expires_delta=timedelta(days=7)
)
return {
'access_token': access_token,
'user': user_info
}
@staticmethod
def create_user(data, operator_role):
"""
创建新用户
权限控制:只有超级管理员(SUPER_ADMIN) 或 主管(SUPERVISOR) 可以创建
"""
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
# 检查用户名是否重复
if SysUser.query.filter_by(username=data.get('username')).first():
raise Exception("用户名已存在")
role = data.get('role')
# 验证角色合法性
# [核心修复] 过滤掉 __开头的属性 和 ROLE_MAP 字典,只保留字符串类型的角色定义
valid_roles = [
v for k, v in UserRole.__dict__.items()
if not k.startswith('__') and isinstance(v, str)
]
if role not in valid_roles:
raise Exception(f"角色无效")
# 如果操作者只是 SUPERVISOR (主管),不允许他创建 SUPER_ADMIN (超管)
if operator_role == UserRole.SUPERVISOR and role == UserRole.SUPER_ADMIN:
raise Exception("权限不足:主管无法创建超级管理员")
email = data.get('email', '')
if email and SysUser.query.filter_by(email=email).first():
raise Exception("邮箱已被使用")
new_user = SysUser(
username=data.get('username'),
email=email,
department=data.get('department', ''),
role=role,
status='active'
)
new_user.set_password(data.get('password'))
db.session.add(new_user)
db.session.commit()
return new_user.to_dict()
@staticmethod
def update_user(user_id, data, operator_role):
"""
更新用户信息
"""
# 权限校验
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以修改用户信息")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
# 1. 更新角色 (Role)
if 'role' in data:
# [核心修复] 同样添加类型检查
valid_roles = [
v for k, v in UserRole.__dict__.items()
if not k.startswith('__') and isinstance(v, str)
]
new_role = data['role']
if new_role not in valid_roles:
raise Exception(f"角色无效")
# 防止主管把自己或其他用户提升为超管
if operator_role == UserRole.SUPERVISOR and new_role == UserRole.SUPER_ADMIN:
raise Exception("权限不足:主管无法分配超级管理员权限")
user.role = new_role
# 2. 更新部门
if 'department' in data:
user.department = data['department']
# 3. 更新邮箱
if 'email' in data:
email = data['email']
if email and email != user.email:
existing = SysUser.query.filter_by(email=email).first()
if existing:
raise Exception("该邮箱已被其他用户使用")
user.email = email
# 4. 更新状态 (Status) - 例如禁用用户
if 'status' in data:
user.status = data['status']
# 5. 更新密码
new_password = data.get('password')
if new_password and str(new_password).strip():
if len(new_password) < 6:
raise Exception("密码长度至少6位")
user.set_password(new_password)
db.session.commit()
return user.to_dict()
@staticmethod
def get_all_users():
"""获取所有系统用户"""
# 按照 ID 倒序排列
users = SysUser.query.order_by(SysUser.id.desc()).all()
return [user.to_dict() for user in users]
@staticmethod
def delete_user(user_id, operator_role):
"""删除用户"""
# 只有超级管理员可以执行物理删除
if operator_role != UserRole.SUPER_ADMIN:
raise Exception("权限不足:只有超级管理员可以删除用户,建议使用禁用功能")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
db.session.delete(user)
db.session.commit()
return True