feat(backend): apply global cross-company data isolation logic across all inbound, outbound, and stock services

This commit is contained in:
DXC
2026-04-17 09:57:00 +08:00
parent 6c0e13e52d
commit 8291a89898
5 changed files with 118 additions and 21 deletions

View File

@ -213,14 +213,24 @@ class MaterialBaseService:
# ============================================================
# 【行级数据隔离】基于 JWT 中的 company_name 进行过滤
# ============================================================
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
# 获取前端传的查询参数
req_company = filters.get('company') if filters else None
if user_role != 'SUPER_ADMIN':
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 【显式拒绝越权】如果前端传了公司参数且不是当前用户的公司返回403
if req_company and req_company != user_company:
from flask import abort
@ -229,11 +239,11 @@ class MaterialBaseService:
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
# 如果用户没有所属公司字段,则只显示公司为空的记录(或不允许查看)
else:
# 超级管理员:允许跨公司视角
elif user_role == 'SUPER_ADMIN' or has_cross_company:
# 超级管理员或有跨域特权:允许跨公司视角
if req_company:
query = query.filter(MaterialBase.company_name == req_company)
# 超管没选公司则不加过滤,看到全量
# 没选公司则不加过滤,看到全量
category = filters.get('category')
if category is not None and category != '':
@ -643,17 +653,28 @@ class MaterialBaseService:
# ============================================================
# 【行级数据隔离】基于 JWT 中的 company_name 进行过滤(高级筛选)
# ============================================================
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
req_company = filters.get('company') if filters else None
if user_role != 'SUPER_ADMIN':
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 普通用户:强制隔离
if user_company:
filter_conditions.append(MaterialBase.company_name == user_company)
else:
# 超级管理员:允许跨公司视角
elif user_role == 'SUPER_ADMIN' or has_cross_company:
# 超级管理员或有跨域特权:允许跨公司视角
if req_company:
filter_conditions.append(MaterialBase.company_name == req_company)

View File

@ -357,23 +357,23 @@ class BuyInboundService:
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company 特权)
from app.services.auth_service import AuthService
user_perms = AuthService.get_user_permissions(user_role) if user_role else []
# 合并菜单和元素权限
all_perms = user_perms.get('menus', []) + user_perms.get('elements', [])
has_cross_company = 'global:cross_company' in all_perms or ('inbound_buy:*' in all_perms)
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 【显式拒绝越权】如果前端传了公司参数且不是当前用户的公司返回403
# 无特权:严禁查其他公司,强制绑定本公司
if company and company.strip() and company.strip() != user_company:
from flask import abort
abort(403, description=f'越权访问:您无权查询 {company} 的数据')
# 正常查询本公司数据
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
else:
# 超级管理员或拥有跨域特权:允许跨公司视角
elif user_role == 'SUPER_ADMIN' or has_cross_company:
# 有特权:允许下拉框传过来的 company 参数生效
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())

View File

@ -317,6 +317,32 @@ class ProductInboundService:
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
# ============================================================
# 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离
# ============================================================
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 无特权:严禁查其他公司,强制绑定本公司
if company and company.strip() and company.strip() != user_company:
from flask import abort
abort(403, description=f'越权访问:您无权查询 {company} 的数据')
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
elif user_role == 'SUPER_ADMIN' or has_cross_company:
# 有特权:允许下拉框传过来的 company 参数生效
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())

View File

@ -92,6 +92,30 @@ class RepairInboundService:
# 按接收时间升序(先进先出)+ id 升序
query = query.order_by(db.asc(TransRepair.arrival_date), db.asc(TransRepair.id))
# ============================================================
# 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离
# ============================================================
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
# 维修表需要通过 base_id 关联 MaterialBase 进行公司过滤
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 无特权:强制绑定本公司
query = query.outerjoin(MaterialBase, TransRepair.base_id == MaterialBase.id)
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)

View File

@ -408,6 +408,32 @@ class SemiInboundService:
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
# ============================================================
# 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离
# ============================================================
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 无特权:严禁查其他公司,强制绑定本公司
if company and company.strip() and company.strip() != user_company:
from flask import abort
abort(403, description=f'越权访问:您无权查询 {company} 的数据')
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
elif user_role == 'SUPER_ADMIN' or has_cross_company:
# 有特权:允许下拉框传过来的 company 参数生效
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())