Files
KCGL/inventory-backend/app/services/dify_permission_service.py

284 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/dify_permission_service.py
"""
Dify 智能客服权限服务层
职责:
1. 从 JWT Token / g.dify_user_role 解析用户角色
2. 根据角色查询 sys_role_permission 表,获取用户拥有的 target_code
3. AI 专属的动态脱敏策略:
- SUPER_ADMIN无条件放行所有数据
- SALES / INBOUND剔除 price, cost, supplier 等敏感字段
- 跨模块越权查询:直接阻断,返回角色专属的错误信息给大模型
"""
from flask import g, current_app
from flask_jwt_extended import decode_token
from app.models.system import SysRolePermission
from app.services.auth_service import AuthService
from app.utils.constants import UserRole
from sqlalchemy import func
# ==============================================================================
# 角色敏感字段定义(按角色黑名单)
# ==============================================================================
# 每个角色在 AI 对话场景下,禁止查看的字段集合
ROLE_SENSITIVE_FIELDS = {
# 入库员:禁止查看采购相关金额
'INBOUND': frozenset([
'purchase_price',
'cost',
'price',
'supplier',
'supplier_id',
'unit_price',
'total_price',
'order_price',
'last_purchase_price',
'avg_purchase_price',
'supplier_name',
]),
# 销售员:禁止查看成本/采购相关数据
'SALES': frozenset([
'purchase_price',
'cost',
'unit_cost',
'supplier',
'supplier_id',
'unit_price',
'last_purchase_price',
'avg_purchase_price',
'supplier_name',
'stock_cost',
'total_cost',
'supply_price',
]),
# 采购员:禁止查看销售毛利相关数据
'PURCHASER': frozenset([
'sale_price',
'retail_price',
'suggested_price',
'margin',
'profit',
'profit_rate',
]),
}
# 跨模块越权查询拦截配置
# key: 角色, value: { 尝试查询的字段: 给 AI 的回复 }
ROLE_FORBIDDEN_QUERIES = {
'INBOUND': {
'purchase_price': '抱歉,您当前的角色(入库员)无权查看采购金额数据。',
'cost': '抱歉,您当前的角色(入库员)无权查看采购成本数据。',
'supplier': '抱歉,您当前的角色(入库员)无权查看供应商数据。',
'unit_price': '抱歉,您当前的角色(入库员)无权查看采购单价数据。',
'last_purchase_price': '抱歉,您当前的角色(入库员)无权查看历史采购价数据。',
},
'SALES': {
'purchase_price': '抱歉,您当前的角色(销售员)无权查看采购成本数据。',
'cost': '抱歉,您当前的角色(销售员)无权查看成本数据。',
'supplier': '抱歉,您当前的角色(销售员)无权查看供应商信息。',
'unit_cost': '抱歉,您当前的角色(销售员)无权查看单位成本数据。',
'last_purchase_price': '抱歉,您当前的角色(销售员)无权查看历史采购价数据。',
},
'PURCHASER': {
'sale_price': '抱歉,您当前的角色(采购员)无权查看销售价格数据。',
'retail_price': '抱歉,您当前的角色(采购员)无权查看零售价数据。',
'margin': '抱歉,您当前的角色(采购员)无权查看毛利数据。',
'profit': '抱歉,您当前的角色(采购员)无权查看利润数据。',
},
}
class DifyPermissionService:
"""Dify AI 专属权限服务"""
@staticmethod
def get_user_role(token: str = None) -> str:
"""
从 JWT Token 或 g.dify_user_role 解析用户角色
返回标准化的大写角色码(如 'INBOUND', 'SALES', 'SUPER_ADMIN'
"""
user_role = None
# 优先从 g 对象获取(由 dify_auth_required 存入)
if hasattr(g, 'dify_user_role') and g.dify_user_role:
return g.dify_user_role
# 从传入的 token 解码
if token:
try:
claims = decode_token(token)
user_role = claims.get('role')
except Exception as e:
current_app.logger.warning(f"[DifyPermission] token 解码失败: {e}")
return ''
return (user_role or '').upper()
@staticmethod
def is_super_admin(role: str = None) -> bool:
"""判断是否为超级管理员"""
if not role:
role = DifyPermissionService.get_user_role()
return role.upper() == UserRole.SUPER_ADMIN if role else False
@staticmethod
def get_role_permissions(role: str = None) -> dict:
"""
获取指定角色的所有权限代码列表
内部调用 AuthService.get_user_permissions()
"""
if not role:
role = DifyPermissionService.get_user_role()
return AuthService.get_user_permissions(role)
@staticmethod
def get_target_codes(role: str = None) -> list:
"""
获取用户角色在 sys_role_permission 表中的所有 target_code
包括 menu 和 element 两种类型
"""
if not role:
role = DifyPermissionService.get_user_role()
if not role:
return []
# 超级管理员拥有所有权限
if DifyPermissionService.is_super_admin(role):
return ['*']
# 从数据库查询
try:
perms = SysRolePermission.query.filter(
func.upper(SysRolePermission.role_code) == role.upper()
).all()
return [p.target_code for p in perms]
except Exception as e:
current_app.logger.error(f"[DifyPermission] 查询 target_code 失败: {e}")
return []
@staticmethod
def has_permission(permission_code: str, role: str = None) -> bool:
"""
检查用户是否拥有指定权限码
超级管理员永远返回 True
"""
if DifyPermissionService.is_super_admin(role):
return True
if not role:
role = DifyPermissionService.get_user_role()
perms = DifyPermissionService.get_role_permissions(role)
all_perms = perms.get('menus', []) + perms.get('elements', [])
return permission_code in all_perms
@staticmethod
def check_forbidden_query(query_fields: list, role: str = None) -> dict:
"""
检查用户尝试查询的字段是否越权。
参数:
query_fields: 用户查询涉及的字段名列表(可能包含敏感字段)
role: 用户角色(可选,默认从 token/g 解析)
返回:
{
'blocked': bool, # 是否被拦截
'message': str | None, # AI 应返回给用户的错误信息(如果有)
}
"""
if DifyPermissionService.is_super_admin(role):
return {'blocked': False, 'message': None}
if not role:
role = DifyPermissionService.get_user_role()
if not role:
return {
'blocked': True,
'message': '无法识别您的身份,请重新登录后再试。'
}
# 获取该角色的越权拦截配置
forbidden_map = ROLE_FORBIDDEN_QUERIES.get(role.upper(), {})
if not forbidden_map:
return {'blocked': False, 'message': None}
# 规范化字段名(小写比较)
query_fields_lower = {f.lower() for f in query_fields}
# 检查是否命中越权字段
for forbidden_field, msg in forbidden_map.items():
if forbidden_field.lower() in query_fields_lower:
current_app.logger.warning(
f"[DifyPermission] 越权查询拦截: role={role}, field={forbidden_field}"
)
return {'blocked': True, 'message': msg}
return {'blocked': False, 'message': None}
@staticmethod
def filter_sensitive_fields(data: dict, role: str = None) -> dict:
"""
根据用户角色对字典数据进行敏感字段脱敏。
- SUPER_ADMIN不过滤返回原数据
- SALES / INBOUND / PURCHASER按角色黑名单剔除敏感字段
参数:
data: 原始数据字典
role: 用户角色
返回:
脱敏后的数据字典(敏感字段被置为 None
"""
if DifyPermissionService.is_super_admin(role):
return data
if not role:
role = DifyPermissionService.get_user_role()
if not role:
return data
# 获取该角色的敏感字段黑名单
sensitive = ROLE_SENSITIVE_FIELDS.get(role.upper(), frozenset())
# 如果没有敏感字段定义,不过滤
if not sensitive:
return data
# 深拷贝,避免修改原数据
import copy
result = copy.deepcopy(data)
# 将敏感字段置为 None
for field in sensitive:
if field in result:
result[field] = None
# 如果有 children 数组(子件列表),递归脱敏
if isinstance(result.get('children'), list):
result['children'] = [
DifyPermissionService.filter_sensitive_fields(child, role)
for child in result['children']
]
return result
@staticmethod
def filter_sensitive_fields_in_list(data_list: list, role: str = None) -> list:
"""
对列表数据批量应用敏感字段脱敏
"""
if DifyPermissionService.is_super_admin(role):
return data_list
if not role:
role = DifyPermissionService.get_user_role()
return [
DifyPermissionService.filter_sensitive_fields(item, role)
for item in data_list
]