Files
KCGL/inventory-backend/app/services/auth_service.py
2026-02-04 13:30:07 +08:00

95 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/auth_service.py
from app.models.system import SysUser
from app.extensions import db
from flask_jwt_extended import create_access_token, get_jwt_identity, get_jwt
from werkzeug.security import check_password_hash
from app.utils.constants import UserRole
class AuthService:
# 硬编码的超级管理员凭证
SUPER_ADMIN_USER = "IRIS"
SUPER_ADMIN_PASS = "licahk"
@staticmethod
def login(data):
username = data.get('username')
password = data.get('password')
user_role = None
user_id = None
user_info = {}
# 1. 优先检查硬编码的超级管理员
if username == AuthService.SUPER_ADMIN_USER:
if password == AuthService.SUPER_ADMIN_PASS:
user_role = UserRole.SUPER_ADMIN
user_id = 0 # 虚拟ID
user_info = {
'username': username,
'role': user_role,
'department': 'System'
}
else:
raise Exception("密码错误")
# 2. 如果不是 IRIS检查数据库用户
else:
user = SysUser.query.filter_by(username=username).first()
if not user or not user.check_password(password):
raise Exception("用户名或密码错误")
if user.status != 'active':
raise Exception("账号已被禁用")
user_role = user.role
user_id = user.id
user_info = user.to_dict()
# 3. 生成 Token将角色写入 claims (关键步骤:用于后期权限控制)
# identity 存 IDadditional_claims 存角色
access_token = create_access_token(
identity=user_id,
additional_claims={'role': user_role, 'username': username}
)
return {
'access_token': access_token,
'user': user_info
}
@staticmethod
def create_user(data, operator_role):
"""
创建新用户 (仅限管理员使用)
:param data: 新用户数据
:param operator_role: 当前操作人的角色 (从 Token 获取)
"""
# 简单权限控制:只有超级管理员或主管可以创建用户
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
# 检查重名
if SysUser.query.filter_by(username=data.get('username')).first():
raise Exception("用户名已存在")
# 默认角色处理
role = data.get('role')
# 验证角色是否合法
valid_roles = [v for k, v in UserRole.__dict__.items() if not k.startswith('__')]
if role not in valid_roles:
raise Exception(f"角色无效,可选角色: {valid_roles}")
new_user = SysUser(
username=data.get('username'),
email=data.get('email', ''), # 允许为空
department=data.get('department', ''),
role=role,
status='active'
)
new_user.set_password(data.get('password'))
db.session.add(new_user)
db.session.commit()
return new_user.to_dict()