Files
KCGL/inventory-backend/app/services/auth_service.py

410 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/auth_service.py
from app.models.system import SysUser, SysRolePermission # <== 引入 SysRolePermission
from app.extensions import db, redis_client
from sqlalchemy import func
from flask_jwt_extended import create_access_token, create_refresh_token, get_jwt_identity
from flask import current_app
from app.utils.constants import UserRole
from datetime import timedelta
import json
def _store_token_to_redis(user_id, token, token_type='access'):
"""
将 Token 存储到 Redis用于单设备登录互踢
键名: user_token_{user_id}
值: token
过期时间: 与 JWT 过期时间一致
"""
if redis_client is None:
current_app.logger.warning("Redis not available, skipping token storage")
return False
try:
# 获取 JWT 过期时间配置
if token_type == 'access':
expires_delta = current_app.config.get('JWT_ACCESS_TOKEN_EXPIRES', timedelta(hours=2))
else:
expires_delta = current_app.config.get('JWT_REFRESH_TOKEN_EXPIRES', timedelta(days=7))
# 转换为秒数(兼容 timedelta 和 int 类型)
if isinstance(expires_delta, timedelta):
expires_seconds = int(expires_delta.total_seconds())
else:
expires_seconds = int(expires_delta) # 已经是秒数(整数)
# 存储到 Redis
key = f"user_token_{user_id}"
redis_client.setex(key, expires_seconds, token)
return True
except Exception as e:
current_app.logger.error(f"Failed to store token to Redis: {e}")
return False
def _get_token_from_redis(user_id):
"""从 Redis 获取当前有效的 Token"""
if redis_client is None:
return None
try:
key = f"user_token_{user_id}"
return redis_client.get(key)
except Exception as e:
current_app.logger.error(f"Failed to get token from Redis: {e}")
return None
class AuthService:
# 硬编码的超级管理员凭证
SUPER_ADMIN_USER = "IRIS"
SUPER_ADMIN_PASS = "licahk"
@staticmethod
def login(data):
# 用户登录时输入的只是账号ID (例如: zhangsan)
login_input = data.get('username', '').strip()
password = data.get('password')
user_role = None
user_id = None
user_info = {}
# 1. 优先检查硬编码的超级管理员 (IRIS)
if login_input == AuthService.SUPER_ADMIN_USER:
if password == AuthService.SUPER_ADMIN_PASS:
user_role = UserRole.SUPER_ADMIN
user_id = 0
user_info = {
'username': '超级管理员(IRIS)',
'account_id': 'IRIS',
'role': user_role,
'department': 'System',
'status': 'active'
}
else:
raise ValueError("密码错误")
# 2. 检查数据库用户
# 数据库存的是 "张三/zhangsan"
# 登录匹配逻辑: 查找以 "/login_input" 结尾的记录
else:
# 使用 like 进行后缀匹配: '%/zhangsan'
user = SysUser.query.filter(SysUser.username.like(f"%/{login_input}")).first()
if not user:
raise ValueError("用户不存在")
if not user.check_password(password):
raise ValueError("密码错误")
if user.status != 'active':
raise ValueError("账号已被禁用,请联系管理员")
user_role = user.role.upper() if user.role else None
user_id = user.id
user_info = user.to_dict()
user_info['role'] = user_role
# 获取用户所属公司(存于 department 字段)
user_company = user.department or ''
# 3. 生成 Token
# Token 中 identity 存数据库IDclaims 存登录账号ID
account_id = user_info.get('account_id', login_input)
# Access Token: 2小时有效期
access_token = create_access_token(
identity=user_id,
additional_claims={
'role': user_role,
'username': account_id, # 存纯账号ID
'display_name': user_info.get('username'), # 存显示名
'company_name': user_company # 存所属公司
}
)
# Refresh Token: 7天有效期
refresh_token = create_refresh_token(
identity=user_id,
additional_claims={
'role': user_role,
'username': account_id,
'display_name': user_info.get('display_name', account_id),
'company_name': user_company
}
)
# 4. 存储 Token 到 Redis (用于单设备登录互踢)
# 使用 str(user_id) 作为 Redis key 的一部分,因为 user_id=0 是超级管理员
_store_token_to_redis(str(user_id), access_token, 'access')
_store_token_to_redis(str(user_id), refresh_token, 'refresh')
return {
'access_token': access_token,
'refresh_token': refresh_token,
'user': user_info
}
@staticmethod
def refresh_access_token(refresh_token_str):
"""
使用 refresh_token 换发新的 access_token
"""
from flask_jwt_extended import decode_token
from flask import current_app
try:
# 解码 refresh_token不验证过期仅获取 claims
decoded = decode_token(refresh_token_str)
user_id = decoded.get('sub')
role = decoded.get('role')
username = decoded.get('username')
if not user_id:
raise ValueError("无效的 refresh_token")
# 重新查询数据库获取用户的 display_name避免刷新后丢失
from app.models.system import SysUser
user = SysUser.query.get(user_id)
if user:
user_info = user.to_dict()
display_name = user_info.get('display_name', username)
else:
display_name = username
# 生成新的 access_token
new_access_token = create_access_token(
identity=user_id,
additional_claims={
'role': role,
'username': username,
'display_name': display_name
}
)
return {
'access_token': new_access_token
}
except Exception as e:
raise ValueError(f"Token 刷新失败: {str(e)}")
@staticmethod
def create_user(data, operator_role):
"""
创建新用户
data 包含: cn_name(张三), username(zhangsan), ...
"""
# 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
cn_name = data.get('cn_name')
pinyin_base = data.get('username') # 前端传来的基础拼音,如 zhangsan
if not cn_name or not pinyin_base:
raise Exception("姓名和账号不能为空")
role_raw = data.get('role')
role = role_raw.upper() if role_raw else None
# 验证角色合法性
valid_roles = [
v for k, v in UserRole.__dict__.items()
if not k.startswith('__') and isinstance(v, str)
]
if role not in valid_roles:
raise Exception(f"角色无效")
if operator_role_upper == UserRole.SUPERVISOR and role == UserRole.SUPER_ADMIN:
raise Exception("权限不足:主管无法创建超级管理员")
email = data.get('email', '')
if email and SysUser.query.filter_by(email=email).first():
raise Exception("邮箱已被使用")
# === 核心逻辑: 自动处理账号重复 (zhangsan -> zhangsan1 -> zhangsan2) ===
final_account_id = pinyin_base
counter = 1 # 如果重复从1开始累加
while True:
# 检查数据库是否存在以 "/final_account_id" 结尾的记录
existing = SysUser.query.filter(
(SysUser.username.like(f"%/{final_account_id}")) |
(SysUser.username == final_account_id)
).first()
if not existing:
break # 找到了可用的ID跳出循环
# 如果存在,使用 base + counter
final_account_id = f"{pinyin_base}{counter}"
counter += 1
# 拼接最终存储格式: 张三/zhangsan1
full_username_storage = f"{cn_name}/{final_account_id}"
new_user = SysUser(
username=full_username_storage, # 存组合串
email=email,
department=data.get('department', ''),
role=role,
status='active'
)
new_user.set_password(data.get('password'))
db.session.add(new_user)
db.session.commit()
# 返回时最好把生成的ID告诉前端
return new_user.to_dict()
@staticmethod
def batch_create_users(data_list, operator_role):
"""
批量创建新用户。复用 create_user 的核心防重逻辑。
"""
results = []
for data in data_list:
try:
# 复用单条创建逻辑,它自带张三/zhangsan1的防重机制
new_user_dict = AuthService.create_user(data, operator_role)
results.append({
"cn_name": data.get('cn_name'),
"account_id": new_user_dict.get('account_id'),
"status": "success"
})
except Exception as e:
results.append({
"cn_name": data.get('cn_name'),
"error": str(e),
"status": "fail"
})
return results
@staticmethod
def update_user(user_id, data, operator_role):
"""
更新用户信息
注意: 这里暂时不允许修改用户名/账号,因为涉及 split 逻辑较复杂,且通常账号不开通后不改
"""
# 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
raise Exception("权限不足")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
# 1. 更新基本信息
if 'role' in data:
valid_roles = [
v for k, v in UserRole.__dict__.items()
if not k.startswith('__') and isinstance(v, str)
]
new_role_raw = data['role']
new_role = new_role_raw.upper() if new_role_raw else None
if new_role not in valid_roles:
raise Exception(f"角色无效")
if operator_role_upper == UserRole.SUPERVISOR and new_role == UserRole.SUPER_ADMIN:
raise Exception("权限不足")
user.role = new_role
if 'department' in data:
user.department = data['department']
if 'email' in data:
email = data['email']
if email and email != user.email:
existing = SysUser.query.filter_by(email=email).first()
if existing:
raise Exception("该邮箱已被其他用户使用")
user.email = email
if 'status' in data:
user.status = data['status']
new_password = data.get('password')
if new_password and str(new_password).strip():
if len(new_password) < 6:
raise Exception("密码长度至少6位")
user.set_password(new_password)
db.session.commit()
return user.to_dict()
@staticmethod
def get_all_users():
"""获取所有系统用户"""
users = SysUser.query.order_by(SysUser.id.desc()).all()
return [user.to_dict() for user in users]
@staticmethod
def delete_user(user_id, operator_role):
"""删除用户"""
# 标准化操作者角色为全大写
operator_role_upper = operator_role.upper() if operator_role else None
if operator_role_upper != UserRole.SUPER_ADMIN:
raise Exception("权限不足:只有超级管理员可以删除用户")
user = SysUser.query.get(user_id)
if not user:
raise Exception("用户不存在")
# 提前获取用户名用于审计日志
username = user.username
db.session.delete(user)
db.session.commit()
return username
@staticmethod
def get_user_permissions(role_code):
"""
获取指定角色的所有权限代码列表
返回格式: {
'menus': ['inbound_buy', 'system_user'],
'elements': ['inbound_buy:unit_price', ...]
}
"""
# 防御性编程role_code 为空时直接返回空权限,避免后续 SQL 崩溃
if not role_code:
return {'menus': [], 'elements': []}
# 超级管理员返回所有权限(通配符)
from app.utils.constants import UserRole
if role_code.upper() == UserRole.SUPER_ADMIN:
# 返回通配符,表示拥有所有菜单和元素权限
return {
'menus': ['*'],
'elements': ['*']
}
# 1. 查菜单权限
# 使用 func.upper() 处理数据库字段的大小写
menu_perms = SysRolePermission.query.filter(
func.upper(SysRolePermission.role_code) == role_code.upper(),
SysRolePermission.type == 'menu'
).all()
menu_codes = [p.target_code for p in menu_perms]
# 2. 查元素(列)权限
# 注意:这里我们只返回用户拥有的。前端逻辑是:"如果列配置了Key且用户没这个Key则隐藏"
element_perms = SysRolePermission.query.filter(
func.upper(SysRolePermission.role_code) == role_code.upper(),
SysRolePermission.type == 'element'
).all()
element_codes = [p.target_code for p in element_perms]
# 调试日志:输出查询结果便于排查字段权限问题
from flask import current_app
current_app.logger.info(
f"[权限查询] role={role_code}, 查询到菜单权限={menu_codes}, 元素权限={element_codes}"
)
return {
'menus': menu_codes,
'elements': element_codes
}