修改登录,实现中文名称显示,以及修改登录逻辑
This commit is contained in:
@ -2,27 +2,25 @@
|
||||
from app.extensions import db
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
from datetime import datetime
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
|
||||
class SysUser(db.Model):
|
||||
"""
|
||||
系统用户表
|
||||
对应数据库: sys_user
|
||||
username 字段存储格式约定: "真实姓名/登录账号" (例如: 张三/zhangsan)
|
||||
"""
|
||||
__tablename__ = 'sys_user'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
username = db.Column(db.String(100), nullable=False)
|
||||
username = db.Column(db.String(100), nullable=False) # 存储 "张三/zhangsan"
|
||||
email = db.Column(db.String(100), unique=True)
|
||||
department = db.Column(db.String(100))
|
||||
role = db.Column(db.String(50)) # 存储 UserRole 的值,如 'SUPER_ADMIN'
|
||||
role = db.Column(db.String(50))
|
||||
status = db.Column(db.String(20), default='active')
|
||||
password_hash = db.Column(db.Text)
|
||||
|
||||
# [关键] 对应数据库的 created_at 字段
|
||||
# 如果数据库报错 column not found,请务必执行 SQL: ALTER TABLE sys_user ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
||||
created_at = db.Column(db.DateTime, server_default=func.now(), default=datetime.now)
|
||||
# created_at 已在数据库脚本中移除,此处不再定义
|
||||
|
||||
def set_password(self, password):
|
||||
"""生成加密密码"""
|
||||
@ -33,15 +31,34 @@ class SysUser(db.Model):
|
||||
return check_password_hash(self.password_hash, password)
|
||||
|
||||
def to_dict(self):
|
||||
"""序列化为字典,供接口返回使用"""
|
||||
"""
|
||||
序列化为字典
|
||||
数据库存的是 '张三/zhangsan'
|
||||
前端需要的是 '张三(zhangsan)'
|
||||
"""
|
||||
raw_name = self.username
|
||||
display_name = raw_name
|
||||
account_id = raw_name
|
||||
|
||||
# 解析存储格式: Name/ID
|
||||
if '/' in raw_name:
|
||||
parts = raw_name.split('/')
|
||||
real_name = parts[0]
|
||||
acc_id = parts[1]
|
||||
# 格式化为前端展示格式: 张三(zhangsan)
|
||||
display_name = f"{real_name}({acc_id})"
|
||||
# 单独提取账号ID (如果前端需要单独用)
|
||||
account_id = acc_id
|
||||
|
||||
return {
|
||||
'id': self.id,
|
||||
'username': self.username,
|
||||
'username': display_name, # 列表显示: 张三(zhangsan)
|
||||
'raw_username': self.username, # 原始数据
|
||||
'account_id': account_id, # 纯账号ID: zhangsan
|
||||
'email': self.email,
|
||||
'department': self.department,
|
||||
'role': self.role,
|
||||
'status': self.status,
|
||||
'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else ''
|
||||
'status': self.status
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -7,13 +7,14 @@ from datetime import timedelta
|
||||
|
||||
|
||||
class AuthService:
|
||||
# 硬编码的“初始”超级管理员凭证 (用于系统初始化或数据库被锁死时)
|
||||
# 硬编码的超级管理员凭证
|
||||
SUPER_ADMIN_USER = "IRIS"
|
||||
SUPER_ADMIN_PASS = "licahk"
|
||||
|
||||
@staticmethod
|
||||
def login(data):
|
||||
username = data.get('username')
|
||||
# 用户登录时输入的只是账号ID (例如: zhangsan)
|
||||
login_input = data.get('username', '').strip()
|
||||
password = data.get('password')
|
||||
|
||||
user_role = None
|
||||
@ -21,13 +22,13 @@ class AuthService:
|
||||
user_info = {}
|
||||
|
||||
# 1. 优先检查硬编码的超级管理员 (IRIS)
|
||||
if username == AuthService.SUPER_ADMIN_USER:
|
||||
if login_input == AuthService.SUPER_ADMIN_USER:
|
||||
if password == AuthService.SUPER_ADMIN_PASS:
|
||||
# 显式指定角色为 SUPER_ADMIN
|
||||
user_role = UserRole.SUPER_ADMIN
|
||||
user_id = 0 # 虚拟ID,区分于数据库ID
|
||||
user_id = 0
|
||||
user_info = {
|
||||
'username': username,
|
||||
'username': '超级管理员(IRIS)',
|
||||
'account_id': 'IRIS',
|
||||
'role': user_role,
|
||||
'department': 'System',
|
||||
'status': 'active'
|
||||
@ -35,9 +36,12 @@ class AuthService:
|
||||
else:
|
||||
raise ValueError("密码错误")
|
||||
|
||||
# 2. 如果不是硬编码用户,检查数据库用户
|
||||
# 2. 检查数据库用户
|
||||
# 数据库存的是 "张三/zhangsan"
|
||||
# 登录匹配逻辑: 查找以 "/login_input" 结尾的记录
|
||||
else:
|
||||
user = SysUser.query.filter_by(username=username).first()
|
||||
# 使用 like 进行后缀匹配: '%/zhangsan'
|
||||
user = SysUser.query.filter(SysUser.username.like(f"%/{login_input}")).first()
|
||||
|
||||
if not user:
|
||||
raise ValueError("用户不存在")
|
||||
@ -52,10 +56,17 @@ class AuthService:
|
||||
user_id = user.id
|
||||
user_info = user.to_dict()
|
||||
|
||||
# 3. 生成 Token (设置为7天过期)
|
||||
# 3. 生成 Token
|
||||
# Token 中 identity 存数据库ID,claims 存登录账号ID
|
||||
account_id = user_info.get('account_id', login_input)
|
||||
|
||||
access_token = create_access_token(
|
||||
identity=user_id,
|
||||
additional_claims={'role': user_role, 'username': username},
|
||||
additional_claims={
|
||||
'role': user_role,
|
||||
'username': account_id, # 存纯账号ID
|
||||
'display_name': user_info.get('username') # 存显示名
|
||||
},
|
||||
expires_delta=timedelta(days=7)
|
||||
)
|
||||
|
||||
@ -68,19 +79,20 @@ class AuthService:
|
||||
def create_user(data, operator_role):
|
||||
"""
|
||||
创建新用户
|
||||
权限控制:只有超级管理员(SUPER_ADMIN) 或 主管(SUPERVISOR) 可以创建
|
||||
data 包含: cn_name(张三), username(zhangsan), ...
|
||||
"""
|
||||
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
|
||||
raise Exception("权限不足:只有超级管理员或主管可以创建新用户")
|
||||
|
||||
# 检查用户名是否重复
|
||||
if SysUser.query.filter_by(username=data.get('username')).first():
|
||||
raise Exception("用户名已存在")
|
||||
cn_name = data.get('cn_name')
|
||||
pinyin_base = data.get('username') # 前端传来的基础拼音,如 zhangsan
|
||||
|
||||
if not cn_name or not pinyin_base:
|
||||
raise Exception("姓名和账号不能为空")
|
||||
|
||||
role = data.get('role')
|
||||
|
||||
# 验证角色合法性
|
||||
# [核心修复] 过滤掉 __开头的属性 和 ROLE_MAP 字典,只保留字符串类型的角色定义
|
||||
valid_roles = [
|
||||
v for k, v in UserRole.__dict__.items()
|
||||
if not k.startswith('__') and isinstance(v, str)
|
||||
@ -89,7 +101,6 @@ class AuthService:
|
||||
if role not in valid_roles:
|
||||
raise Exception(f"角色无效")
|
||||
|
||||
# 如果操作者只是 SUPERVISOR (主管),不允许他创建 SUPER_ADMIN (超管)
|
||||
if operator_role == UserRole.SUPERVISOR and role == UserRole.SUPER_ADMIN:
|
||||
raise Exception("权限不足:主管无法创建超级管理员")
|
||||
|
||||
@ -97,8 +108,29 @@ class AuthService:
|
||||
if email and SysUser.query.filter_by(email=email).first():
|
||||
raise Exception("邮箱已被使用")
|
||||
|
||||
# === 核心逻辑: 自动处理账号重复 (zhangsan -> zhangsan1 -> zhangsan2) ===
|
||||
final_account_id = pinyin_base
|
||||
counter = 1 # 如果重复,从1开始累加
|
||||
|
||||
while True:
|
||||
# 检查数据库是否存在以 "/final_account_id" 结尾的记录
|
||||
existing = SysUser.query.filter(
|
||||
(SysUser.username.like(f"%/{final_account_id}")) |
|
||||
(SysUser.username == final_account_id)
|
||||
).first()
|
||||
|
||||
if not existing:
|
||||
break # 找到了可用的ID,跳出循环
|
||||
|
||||
# 如果存在,使用 base + counter
|
||||
final_account_id = f"{pinyin_base}{counter}"
|
||||
counter += 1
|
||||
|
||||
# 拼接最终存储格式: 张三/zhangsan1
|
||||
full_username_storage = f"{cn_name}/{final_account_id}"
|
||||
|
||||
new_user = SysUser(
|
||||
username=data.get('username'),
|
||||
username=full_username_storage, # 存组合串
|
||||
email=email,
|
||||
department=data.get('department', ''),
|
||||
role=role,
|
||||
@ -109,44 +141,38 @@ class AuthService:
|
||||
db.session.add(new_user)
|
||||
db.session.commit()
|
||||
|
||||
# 返回时,最好把生成的ID告诉前端
|
||||
return new_user.to_dict()
|
||||
|
||||
@staticmethod
|
||||
def update_user(user_id, data, operator_role):
|
||||
"""
|
||||
更新用户信息
|
||||
注意: 这里暂时不允许修改用户名/账号,因为涉及 split 逻辑较复杂,且通常账号不开通后不改
|
||||
"""
|
||||
# 权限校验
|
||||
if operator_role not in [UserRole.SUPER_ADMIN, UserRole.SUPERVISOR]:
|
||||
raise Exception("权限不足:只有超级管理员或主管可以修改用户信息")
|
||||
raise Exception("权限不足")
|
||||
|
||||
user = SysUser.query.get(user_id)
|
||||
if not user:
|
||||
raise Exception("用户不存在")
|
||||
|
||||
# 1. 更新角色 (Role)
|
||||
# 1. 更新基本信息
|
||||
if 'role' in data:
|
||||
# [核心修复] 同样添加类型检查
|
||||
valid_roles = [
|
||||
v for k, v in UserRole.__dict__.items()
|
||||
if not k.startswith('__') and isinstance(v, str)
|
||||
]
|
||||
new_role = data['role']
|
||||
|
||||
if new_role not in valid_roles:
|
||||
raise Exception(f"角色无效")
|
||||
|
||||
# 防止主管把自己或其他用户提升为超管
|
||||
if operator_role == UserRole.SUPERVISOR and new_role == UserRole.SUPER_ADMIN:
|
||||
raise Exception("权限不足:主管无法分配超级管理员权限")
|
||||
|
||||
raise Exception("权限不足")
|
||||
user.role = new_role
|
||||
|
||||
# 2. 更新部门
|
||||
if 'department' in data:
|
||||
user.department = data['department']
|
||||
|
||||
# 3. 更新邮箱
|
||||
if 'email' in data:
|
||||
email = data['email']
|
||||
if email and email != user.email:
|
||||
@ -155,11 +181,9 @@ class AuthService:
|
||||
raise Exception("该邮箱已被其他用户使用")
|
||||
user.email = email
|
||||
|
||||
# 4. 更新状态 (Status) - 例如禁用用户
|
||||
if 'status' in data:
|
||||
user.status = data['status']
|
||||
|
||||
# 5. 更新密码
|
||||
new_password = data.get('password')
|
||||
if new_password and str(new_password).strip():
|
||||
if len(new_password) < 6:
|
||||
@ -172,16 +196,14 @@ class AuthService:
|
||||
@staticmethod
|
||||
def get_all_users():
|
||||
"""获取所有系统用户"""
|
||||
# 按照 ID 倒序排列
|
||||
users = SysUser.query.order_by(SysUser.id.desc()).all()
|
||||
return [user.to_dict() for user in users]
|
||||
|
||||
@staticmethod
|
||||
def delete_user(user_id, operator_role):
|
||||
"""删除用户"""
|
||||
# 只有超级管理员可以执行物理删除
|
||||
if operator_role != UserRole.SUPER_ADMIN:
|
||||
raise Exception("权限不足:只有超级管理员可以删除用户,建议使用禁用功能")
|
||||
raise Exception("权限不足:只有超级管理员可以删除用户")
|
||||
|
||||
user = SysUser.query.get(user_id)
|
||||
if not user:
|
||||
|
||||
Reference in New Issue
Block a user