Files
KCGL/inventory-backend/app/services/permission_service.py

532 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.models.system import SysMenu, SysElement, SysRolePermission
from app.extensions import db
from sqlalchemy.exc import SQLAlchemyError
class PermissionService:
@staticmethod
def get_permission_tree():
"""
获取完整的权限树(菜单嵌套菜单 + 菜单包含元素)
供前端权限配置页面展示
"""
# 1. 获取所有可见菜单 (按 parent_id 和 sort_order 排序,保证父子处理顺序)
menus = SysMenu.query.filter(SysMenu.is_visible == True).order_by(SysMenu.parent_id, SysMenu.sort_order).all()
# 2. 获取所有元素
elements = SysElement.query.all()
# --- 核心逻辑:构建树形结构 ---
# 3. 创建一个 lookup 字典,方便通过 ID 查找菜单节点
# 同时将 SQLAlchemy 对象转为字典,方便后续操作
menu_map = {}
for m in menus:
m_dict = m.to_dict()
m_dict['children'] = [] # 初始化 children
menu_map[m.id] = m_dict
# 4. 创建 code 到 id 的映射,用于把 element 挂载到 menu 上
# 因为 SysElement 关联的是 menu_code而不是 menu_id
code_to_id = {m.code: m.id for m in menus}
# 5. 将元素 (Elements) 挂载到对应的菜单 (Menu) 下
for el in elements:
# 找到该元素所属菜单的 ID
parent_menu_id = code_to_id.get(el.menu_code)
if parent_menu_id and parent_menu_id in menu_map:
el_dict = el.to_dict()
# 标记类型为 element前端 transformData 需要用到
el_dict['type'] = 'element'
menu_map[parent_menu_id]['children'].append(el_dict)
# 6. 将子菜单挂载到父菜单下,并构建最终的树
tree_data = []
for m in menus:
current_node = menu_map[m.id]
if m.parent_id == 0 or m.parent_id is None:
# 如果是顶级菜单,直接放入结果集
tree_data.append(current_node)
else:
# 如果是子菜单,找到它的父级,把它塞进父级的 children 里
if m.parent_id in menu_map:
menu_map[m.parent_id]['children'].append(current_node)
else:
# 如果找不到父级(比如父级被删了),为了防止数据丢失,暂时作为顶级显示
tree_data.append(current_node)
return tree_data
@staticmethod
def get_role_permissions(role_code):
"""获取指定角色拥有的所有权限Code"""
try:
# === 新增逻辑:超级管理员上帝模式 ===
if role_code == 'SUPER_ADMIN':
# 直接获取所有可见菜单和元素,无视配置表
all_menus = [m.code for m in SysMenu.query.filter(SysMenu.is_visible == True).all()]
all_elements = [e.code for e in SysElement.query.all()]
return {
'menus': all_menus,
'elements': all_elements
}
# =================================
perms = SysRolePermission.query.filter_by(role_code=role_code).all()
menu_codes = []
element_codes = []
for p in perms:
# 这里假设你的数据库存的是 target_code
if p.type == 'menu':
menu_codes.append(p.target_code)
else:
element_codes.append(p.target_code)
# 前端 handleRoleSelect 会合并这两个数组,所以分开返回没问题
return {
'menus': menu_codes,
'elements': element_codes
}
except Exception as e:
# 记录日志或处理错误
print(f"Error fetching role permissions: {e}")
return {'menus': [], 'elements': []}
@staticmethod
def assign_permissions(role_code, permissions):
"""
保存角色的权限
permissions: 前端传来的 list混合了 menu_code 和 element_code
"""
if not role_code:
raise ValueError("角色代码不能为空")
session = db.session
try:
# 1. 开启事务 (Flask-SQLAlchemy 自动管理,但明确逻辑更好)
# 2. 删除该角色旧的所有权限
SysRolePermission.query.filter_by(role_code=role_code).delete()
# 3. 准备新数据
if permissions:
# 3.1 去重
unique_codes = set(permissions)
# 3.2 预加载所有 Menu Code用于区分是 Menu 还是 Element
# 这一步很重要,因为 SysRolePermission 表需要 type 字段
all_menu_codes = {res[0] for res in session.query(SysMenu.code).all()}
new_records = []
for code in unique_codes:
if not code: continue
# 判断类型:如果 code 存在于菜单表中,就是 menu否则就是 element
p_type = 'menu' if code in all_menu_codes else 'element'
new_records.append(SysRolePermission(
role_code=role_code,
target_code=code,
type=p_type
))
# 3.3 批量插入
if new_records:
session.add_all(new_records)
# 4. 提交
session.commit()
return True
except SQLAlchemyError as e:
session.rollback()
raise e
except Exception as e:
session.rollback()
raise e
@staticmethod
def init_audit_menu():
"""
初始化审计日志菜单和超级管理员权限
防重复:只插入不存在的记录
"""
try:
# 1. 检查并创建审计日志菜单
menu_code = 'system_audit'
existing_menu = SysMenu.query.filter_by(code=menu_code).first()
if not existing_menu:
new_menu = SysMenu(
parent_id=0,
name='审计日志',
code=menu_code,
path='/system/audit',
sort_order=110,
is_visible=True
)
db.session.add(new_menu)
db.session.flush() # 获取新插入的 ID
print(f"✅ 审计日志菜单已创建 (code: {menu_code})")
else:
print(f" 审计日志菜单已存在 (code: {menu_code})")
# 2. 为超级管理员赋予审计日志菜单权限
role_code = 'SUPER_ADMIN'
existing_perm = SysRolePermission.query.filter_by(
role_code=role_code,
target_code=menu_code
).first()
if not existing_perm:
new_perm = SysRolePermission(
role_code=role_code,
target_code=menu_code,
type='menu'
)
db.session.add(new_perm)
print(f"✅ 超级管理员已赋予审计日志权限")
else:
print(f" 超级管理员已拥有审计日志权限")
# 3. 提交
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"❌ 初始化审计日志菜单失败: {str(e)}")
raise e
@staticmethod
def init_stocktake_menus():
"""
初始化盘点管理菜单和权限
包括:顶级菜单、盲盘作业、盈亏调整、以及操作权限元素
"""
try:
role_code = 'SUPER_ADMIN'
# 1. 创建顶级菜单:盘点管理
stocktake_mgmt_code = 'stocktake_mgmt'
stocktake_menu = SysMenu.query.filter_by(code=stocktake_mgmt_code).first()
if not stocktake_menu:
stocktake_menu = SysMenu(
parent_id=0,
name='盘点管理',
code=stocktake_mgmt_code,
path='/stocktake',
sort_order=30,
is_visible=True
)
db.session.add(stocktake_menu)
db.session.flush()
print(f"✅ 盘点管理顶级菜单已创建")
else:
print(f" 盘点管理顶级菜单已存在")
# 2. 创建子菜单:盲盘作业
stocktake_op_code = 'inventory_stocktake'
stocktake_op_menu = SysMenu.query.filter_by(code=stocktake_op_code).first()
if not stocktake_op_menu:
stocktake_op_menu = SysMenu(
parent_id=stocktake_menu.id,
name='盲盘作业',
code=stocktake_op_code,
path='/stocktake/operation',
sort_order=1,
is_visible=True
)
db.session.add(stocktake_op_menu)
db.session.flush()
print(f"✅ 盲盘作业菜单已创建")
else:
print(f" 盲盘作业菜单已存在")
# 3. 为盲盘作业添加操作权限元素
stocktake_op_element = SysElement.query.filter_by(
menu_code=stocktake_op_code,
code='inventory_stocktake:operation'
).first()
if not stocktake_op_element:
stocktake_op_element = SysElement(
menu_code=stocktake_op_code,
name='盲盘操作',
code='inventory_stocktake:operation',
element_type='operation'
)
db.session.add(stocktake_op_element)
print(f"✅ 盲盘作业操作权限已创建")
else:
print(f" 盲盘作业操作权限已存在")
# 4. 创建子菜单:盈亏调整
adjustment_code = 'stock_adjustment'
adjustment_menu = SysMenu.query.filter_by(code=adjustment_code).first()
if not adjustment_menu:
adjustment_menu = SysMenu(
parent_id=stocktake_menu.id,
name='盈亏调整',
code=adjustment_code,
path='/stocktake/adjustment',
sort_order=2,
is_visible=True
)
db.session.add(adjustment_menu)
db.session.flush()
print(f"✅ 盈亏调整菜单已创建")
else:
print(f" 盈亏调整菜单已存在")
# 5. 为盈亏调整添加列表权限元素 (stock_adjustment:list)
adjustment_list_element = SysElement.query.filter_by(
menu_code=adjustment_code,
code='stock_adjustment:list'
).first()
if not adjustment_list_element:
adjustment_list_element = SysElement(
menu_code=adjustment_code,
name='盈亏列表',
code='stock_adjustment:list',
element_type='element'
)
db.session.add(adjustment_list_element)
print(f"✅ 盈亏调整列表权限已创建")
else:
print(f" 盈亏调整列表权限已存在")
# 6. 为盈亏调整添加操作权限元素 (stock_adjustment:operation)
adjustment_op_element = SysElement.query.filter_by(
menu_code=adjustment_code,
code='stock_adjustment:operation'
).first()
if not adjustment_op_element:
adjustment_op_element = SysElement(
menu_code=adjustment_code,
name='盈亏操作',
code='stock_adjustment:operation',
element_type='operation'
)
db.session.add(adjustment_op_element)
print(f"✅ 盈亏调整操作权限已创建")
else:
print(f" 盈亏调整操作权限已存在")
# 7. 为超级管理员分配所有盘点相关权限
menu_codes = [stocktake_mgmt_code, stocktake_op_code, adjustment_code]
for mc in menu_codes:
existing_perm = SysRolePermission.query.filter_by(
role_code=role_code,
target_code=mc,
type='menu'
).first()
if not existing_perm:
new_perm = SysRolePermission(
role_code=role_code,
target_code=mc,
type='menu'
)
db.session.add(new_perm)
print(f"✅ 超级管理员已赋予 {mc} 菜单权限")
# 8. 分配操作权限
op_codes = ['inventory_stocktake:operation', 'stock_adjustment:list', 'stock_adjustment:operation']
for oc in op_codes:
existing_perm = SysRolePermission.query.filter_by(
role_code=role_code,
target_code=oc,
type='element'
).first()
if not existing_perm:
new_perm = SysRolePermission(
role_code=role_code,
target_code=oc,
type='element'
)
db.session.add(new_perm)
print(f"✅ 超级管理员已赋予 {oc} 操作权限")
# 9. 提交
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"❌ 初始化盘点管理菜单失败: {str(e)}")
raise e
@staticmethod
def cleanup_legacy_stocktake_menus():
"""
清理残留的旧版库存盘点菜单
如果数据库中存在挂在入库管理下的旧版库存盘点菜单,清理掉
"""
try:
# 查找可能存在的旧版库存盘点菜单(入库管理下的 stocktake
# 旧版可能在入库管理 (path like '%inventory%stocktake') 或者 code 包含 stocktake 但不是新的
legacy_menus = SysMenu.query.filter(
SysMenu.code.in_(['stocktake', 'inventory_stocktake_old'])
).all()
for menu in legacy_menus:
# 删除关联的权限
SysRolePermission.query.filter_by(target_code=menu.code).delete()
# 删除关联的元素
SysElement.query.filter_by(menu_code=menu.code).delete()
# 删除菜单
db.session.delete(menu)
print(f"🗑️ 已清理旧版库存盘点菜单: {menu.code} ({menu.name})")
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"⚠️ 清理旧版菜单失败: {str(e)}")
return False
@staticmethod
def init_all_menus():
"""
初始化所有菜单的层级结构,确保权限配置页面显示正确的树形结构
按照侧边栏顺序:基础信息 -> 入库 -> 盘点 -> 出库 -> BOM -> 借库 -> 报废 -> 系统
"""
try:
role_code = 'SUPER_ADMIN'
# 定义菜单结构 (code, name, path, parent_code, sort_order)
menu_defs = [
# 顶级菜单 (按侧边栏顺序)
('material_mgmt', '基础信息管理', '/material', None, 10),
('inventory_mgmt', '入库管理', '/inventory', None, 20),
('stocktake_mgmt', '盘点管理', '/stocktake', None, 30),
('outbound_mgmt', '出库管理', '/outbound', None, 40),
('bom_mgmt', 'BOM管理', '/bom', None, 50),
('operation_mgmt', '借库管理', '/operation', None, 60),
('scrap_mgmt', '报废管理', '/scrap', None, 70),
('system_mgmt', '系统管理', '/system', None, 80),
# 基础信息子菜单
('material_base', '基础信息', '/material/index', 'material_mgmt', 1),
# 入库管理子菜单
('inbound_buy', '采购入库', '/inventory/buy', 'inventory_mgmt', 1),
('inbound_semi', '半成品入库', '/inventory/semi', 'inventory_mgmt', 2),
('inbound_product', '成品入库', '/inventory/product', 'inventory_mgmt', 3),
('inbound_service', '服务权益', '/inventory/service', 'inventory_mgmt', 4),
('inbound_summary', '入库记录', '/inventory/summary', 'inventory_mgmt', 5),
# 盘点管理子菜单
('inventory_stocktake', '盲盘作业', '/stocktake/operation', 'stocktake_mgmt', 1),
('stock_adjustment', '盈亏调整', '/stocktake/adjustment', 'stocktake_mgmt', 2),
# 出库管理子菜单
('outbound_selection', '出库选单', '/outbound/selection', 'outbound_mgmt', 1),
('outbound_create', '扫码出库', '/outbound/create', 'outbound_mgmt', 2),
('outbound_list', '出库记录', '/outbound/index', 'outbound_mgmt', 3),
# BOM管理子菜单
('bom_manage', 'BOM配方管理', '/bom/manage', 'bom_mgmt', 1),
# 借库管理子菜单
('op_borrow', '借库操作', '/operation/borrow', 'operation_mgmt', 1),
('op_return', '归还操作', '/operation/repair', 'operation_mgmt', 2),
('op_records', '借还记录', '/operation/records', 'operation_mgmt', 3),
# 报废管理子菜单
('scrap_create', '新建报废', '/scrap/create', 'scrap_mgmt', 1),
('scrap_list', '报废记录', '/scrap/index', 'scrap_mgmt', 2),
# 系统管理子菜单
('system_user', '员工账号管理', '/system/user-create', 'system_mgmt', 1),
('system_permission', '权限分配', '/system/permission', 'system_mgmt', 2),
('system_audit', '审计日志', '/system/audit', 'system_mgmt', 3),
]
# 第一步:清理根级别的冗余子菜单(这些本应是子节点,但可能之前被错误地创建为根节点)
child_codes = [m[0] for m in menu_defs if m[3] is not None] # 所有子菜单的code
orphaned_menus = SysMenu.query.filter(
SysMenu.code.in_(child_codes),
(SysMenu.parent_id == 0) | (SysMenu.parent_id.is_(None))
).all()
for menu in orphaned_menus:
print(f"🗑️ 清理根级别冗余菜单: {menu.code} ({menu.name})")
# 删除关联的权限
SysRolePermission.query.filter_by(target_code=menu.code).delete()
db.session.delete(menu)
# 第二步:清理重复菜单(同一个 code 存在多条记录,保留 ID 最小的)
# 先找出所有可能重复的 code
duplicate_check = db.session.query(
SysMenu.code,
func.count(SysMenu.id).label('cnt')
).group_by(SysMenu.code).having(func.count(SysMenu.id) > 1).all()
for code, cnt in duplicate_check:
# 获取该 code 的所有记录,按 id 排序,保留第一条,删除其余
duplicates = SysMenu.query.filter_by(code=code).order_by(SysMenu.id).all()
# 保留第一条,删除其他
for dup in duplicates[1:]:
print(f"🗑️ 清理重复菜单: {dup.code} (id={dup.id}, name={dup.name})")
SysRolePermission.query.filter_by(target_code=dup.code).delete()
SysElement.query.filter_by(menu_code=dup.code).delete()
db.session.delete(dup)
# 第三步:强制重新设置所有子菜单的 parent_id确保没有遗漏
# 先将所有子菜单的 parent_id 设为 None然后重新设置
SysMenu.query.filter(SysMenu.code.in_(child_codes)).update({SysMenu.parent_id: None})
# 创建或更新菜单
menu_map = {} # code -> menu obj
for code, name, path, parent_code, sort_order in menu_defs:
menu = SysMenu.query.filter_by(code=code).first()
if not menu:
menu = SysMenu(code=code, name=name, path=path, sort_order=sort_order, is_visible=True)
db.session.add(menu)
db.session.flush()
print(f"✅ 菜单已创建: {name} ({code})")
else:
# 更新已有菜单的属性
menu.name = name
menu.path = path
menu.sort_order = sort_order
menu_map[code] = menu
# 设置 parent_id
for code, name, path, parent_code, sort_order in menu_defs:
if parent_code and parent_code in menu_map:
menu = menu_map[code]
parent = menu_map[parent_code]
menu.parent_id = parent.id
# 为超级管理员分配所有菜单权限
for code, name, path, parent_code, sort_order in menu_defs:
if parent_code is None: # 只分配顶级菜单
existing_perm = SysRolePermission.query.filter_by(
role_code=role_code,
target_code=code,
type='menu'
).first()
if not existing_perm:
new_perm = SysRolePermission(
role_code=role_code,
target_code=code,
type='menu'
)
db.session.add(new_perm)
db.session.commit()
print(f"✅ 所有菜单初始化完成")
return True
except Exception as e:
db.session.rollback()
print(f"❌ 初始化菜单失败: {str(e)}")
raise e