Files
KCGL/inventory-backend/app/services/auth_service.py

115 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/auth_service.py
from app.models.system import SysUser
from app.extensions import db
from flask_jwt_extended import create_access_token
from app.utils.constants import UserRole
class AuthService:
# 硬编码的超级管理员凭证
SUPER_ADMIN_USER = "IRIS"
SUPER_ADMIN_PASS = "licahk"
@staticmethod
def login(data):
username = data.get('username')
password = data.get('password')
user_role = None
user_id = None
user_info = {}
# 1. 优先检查硬编码的超级管理员
if username == AuthService.SUPER_ADMIN_USER:
if password == AuthService.SUPER_ADMIN_PASS:
user_role = UserRole.SUPER_ADMIN
user_id = 0 # 虚拟ID
user_info = {
'username': username,
'role': user_role,
'department': 'System'
}
else:
raise Exception("密码错误")
# 2. 如果不是 IRIS检查数据库用户
else:
user = SysUser.query.filter_by(username=username).first()
if not user or not user.check_password(password):
raise Exception("用户名或密码错误")
if user.status != 'active':
raise Exception("账号已被禁用")
user_role = user.role
user_id = user.id
user_info = user.to_dict()
# 3. 生成 Token
access_token = create_access_token(
identity=user_id,
additional_claims={'role': user_role, 'username': username}
)
return {
'access_token': access_token,
'user': user_info
}
@staticmethod
def create_user(data, operator_role):
"""
创建新用户 (仅限管理员使用)
"""
# 简单权限控制:只有超级管理员或主管可以创建用户
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
# 检查重名
if SysUser.query.filter_by(username=data.get('username')).first():
raise Exception("用户名已存在")
# 默认角色处理
role = data.get('role')
valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')]
if role not in valid_roles:
raise Exception(f"角色无效,可选角色: {valid_roles}")
# 处理 Email 为空的情况,避免违反 Unique 约束 (视数据库设置而定,这里简单处理为空串)
email = data.get('email', '')
if email and SysUser.query.filter_by(email=email).first():
raise Exception("邮箱已被使用")
new_user = SysUser(
username=data.get('username'),
email=email,
department=data.get('department', ''),
role=role,
status='active'
)
new_user.set_password(data.get('password'))
db.session.add(new_user)
db.session.commit()
return new_user.to_dict()
@staticmethod
def get_all_users():
"""获取所有系统用户"""
users = SysUser.query.order_by(SysUser.id.desc()).all()
return [user.to_dict() for user in users]
@staticmethod
def delete_user(user_id, operator_role):
"""删除用户"""
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
db.session.delete(user)
db.session.commit()
return True