Files
KCGL/inventory-backend/app/services/auth_service.py
2026-02-04 15:55:20 +08:00

156 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/auth_service.py
from app.models.system import SysUser
from app.extensions import db
from flask_jwt_extended import create_access_token
from app.utils.constants import UserRole
class AuthService:
# 硬编码的超级管理员凭证
SUPER_ADMIN_USER = "IRIS"
SUPER_ADMIN_PASS = "licahk"
@staticmethod
def login(data):
username = data.get('username')
password = data.get('password')
user_role = None
user_id = None
user_info = {}
# 1. 优先检查硬编码的超级管理员
if username == AuthService.SUPER_ADMIN_USER:
if password == AuthService.SUPER_ADMIN_PASS:
user_role = UserRole.SUPER_ADMIN
user_id = 0 # 虚拟ID
user_info = {
'username': username,
'role': user_role,
'department': 'System'
}
else:
raise ValueError("密码错误")
# 2. 如果不是 IRIS检查数据库用户
else:
user = SysUser.query.filter_by(username=username).first()
if not user:
raise ValueError("用户不存在")
if not user.check_password(password):
raise ValueError("密码错误")
if user.status != 'active':
raise ValueError("账号已被禁用,请联系管理员")
user_role = user.role
user_id = user.id
user_info = user.to_dict()
# 3. 生成 Token
access_token = create_access_token(
identity=user_id,
additional_claims={'role': user_role, 'username': username}
)
return {
'access_token': access_token,
'user': user_info
}
@staticmethod
def create_user(data, operator_role):
"""
创建新用户 (仅限管理员使用)
"""
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
if SysUser.query.filter_by(username=data.get('username')).first():
raise Exception("用户名已存在")
role = data.get('role')
valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')]
if role not in valid_roles:
raise Exception(f"角色无效,可选角色: {valid_roles}")
email = data.get('email', '')
if email and SysUser.query.filter_by(email=email).first():
raise Exception("邮箱已被使用")
new_user = SysUser(
username=data.get('username'),
email=email,
department=data.get('department', ''),
role=role,
status='active'
)
new_user.set_password(data.get('password'))
db.session.add(new_user)
db.session.commit()
return new_user.to_dict()
@staticmethod
def update_user(user_id, data, operator_role):
"""
[新增] 更新用户信息
"""
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以修改用户信息")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
# 1. 更新基本信息
if 'role' in data:
valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')]
if data['role'] not in valid_roles:
raise Exception(f"角色无效")
user.role = data['role']
if 'department' in data:
user.department = data['department']
if 'email' in data:
# 如果修改了邮箱,且新邮箱已被其他人使用
email = data['email']
if email and email != user.email:
existing = SysUser.query.filter_by(email=email).first()
if existing:
raise Exception("该邮箱已被其他用户使用")
user.email = email
# 2. 如果提供了密码,则重置密码;否则保持原密码
new_password = data.get('password')
if new_password and str(new_password).strip():
if len(new_password) < 6:
raise Exception("密码长度至少6位")
user.set_password(new_password)
db.session.commit()
return user.to_dict()
@staticmethod
def get_all_users():
"""获取所有系统用户"""
users = SysUser.query.order_by(SysUser.id.desc()).all()
return [user.to_dict() for user in users]
@staticmethod
def delete_user(user_id, operator_role):
"""删除用户"""
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
db.session.delete(user)
db.session.commit()
return True