From 8291a89898ec6c59bf2d77ce3b8b494d70321249 Mon Sep 17 00:00:00 2001 From: DXC Date: Fri, 17 Apr 2026 09:57:00 +0800 Subject: [PATCH] feat(backend): apply global cross-company data isolation logic across all inbound, outbound, and stock services --- .../app/services/inbound/base_service.py | 35 +++++++++++++++---- .../app/services/inbound/buy_service.py | 20 +++++------ .../app/services/inbound/product_service.py | 30 ++++++++++++++-- .../app/services/inbound/repair_service.py | 24 +++++++++++++ .../app/services/inbound/semi_service.py | 30 ++++++++++++++-- 5 files changed, 118 insertions(+), 21 deletions(-) diff --git a/inventory-backend/app/services/inbound/base_service.py b/inventory-backend/app/services/inbound/base_service.py index 2b055e2..ac50d8d 100644 --- a/inventory-backend/app/services/inbound/base_service.py +++ b/inventory-backend/app/services/inbound/base_service.py @@ -213,14 +213,24 @@ class MaterialBaseService: # ============================================================ # 【行级数据隔离】基于 JWT 中的 company_name 进行过滤 # ============================================================ + from flask_jwt_extended import get_jwt + claims = get_jwt() user_role = claims.get('role', '').upper() if claims.get('role') else '' user_company = claims.get('company_name', '') + # 获取用户权限列表(用于检查 global:cross_company_op 特权) + from app.api.v1.inbound.base import get_current_user_permissions + user_perms = get_current_user_permissions() or [] + normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms) + + # 检查是否拥有全局特权或超管角色 + has_cross_company = 'globalcrosscompanyop' in normalized_perms + # 获取前端传的查询参数 req_company = filters.get('company') if filters else None - if user_role != 'SUPER_ADMIN': + if user_role != 'SUPER_ADMIN' and not has_cross_company: # 【显式拒绝越权】如果前端传了公司参数,且不是当前用户的公司,返回403 if req_company and req_company != user_company: from flask import abort @@ -229,11 +239,11 @@ class MaterialBaseService: if user_company: query = query.filter(MaterialBase.company_name == user_company) # 如果用户没有所属公司字段,则只显示公司为空的记录(或不允许查看) - else: - # 超级管理员:允许跨公司视角 + elif user_role == 'SUPER_ADMIN' or has_cross_company: + # 超级管理员或有跨域特权:允许跨公司视角 if req_company: query = query.filter(MaterialBase.company_name == req_company) - # 超管没选公司则不加过滤,看到全量 + # 没选公司则不加过滤,看到全量 category = filters.get('category') if category is not None and category != '': @@ -643,17 +653,28 @@ class MaterialBaseService: # ============================================================ # 【行级数据隔离】基于 JWT 中的 company_name 进行过滤(高级筛选) # ============================================================ + from flask_jwt_extended import get_jwt + claims = get_jwt() user_role = claims.get('role', '').upper() if claims.get('role') else '' user_company = claims.get('company_name', '') + + # 获取用户权限列表(用于检查 global:cross_company_op 特权) + from app.api.v1.inbound.base import get_current_user_permissions + user_perms = get_current_user_permissions() or [] + normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms) + + # 检查是否拥有全局特权或超管角色 + has_cross_company = 'globalcrosscompanyop' in normalized_perms + req_company = filters.get('company') if filters else None - if user_role != 'SUPER_ADMIN': + if user_role != 'SUPER_ADMIN' and not has_cross_company: # 普通用户:强制隔离 if user_company: filter_conditions.append(MaterialBase.company_name == user_company) - else: - # 超级管理员:允许跨公司视角 + elif user_role == 'SUPER_ADMIN' or has_cross_company: + # 超级管理员或有跨域特权:允许跨公司视角 if req_company: filter_conditions.append(MaterialBase.company_name == req_company) diff --git a/inventory-backend/app/services/inbound/buy_service.py b/inventory-backend/app/services/inbound/buy_service.py index 83f92cd..f210e03 100644 --- a/inventory-backend/app/services/inbound/buy_service.py +++ b/inventory-backend/app/services/inbound/buy_service.py @@ -357,23 +357,23 @@ class BuyInboundService: user_role = claims.get('role', '').upper() if claims.get('role') else '' user_company = claims.get('company_name', '') - # 获取用户权限列表(用于检查 global:cross_company 特权) - from app.services.auth_service import AuthService - user_perms = AuthService.get_user_permissions(user_role) if user_role else [] - # 合并菜单和元素权限 - all_perms = user_perms.get('menus', []) + user_perms.get('elements', []) - has_cross_company = 'global:cross_company' in all_perms or ('inbound_buy:*' in all_perms) + # 获取用户权限列表(用于检查 global:cross_company_op 特权) + from app.api.v1.inbound.base import get_current_user_permissions + user_perms = get_current_user_permissions() or [] + normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms) + + # 检查是否拥有全局特权或超管角色 + has_cross_company = 'globalcrosscompanyop' in normalized_perms if user_role != 'SUPER_ADMIN' and not has_cross_company: - # 【显式拒绝越权】如果前端传了公司参数,且不是当前用户的公司,返回403 + # 无特权:严禁查其他公司,强制绑定本公司 if company and company.strip() and company.strip() != user_company: from flask import abort abort(403, description=f'越权访问:您无权查询 {company} 的数据') - # 正常查询本公司数据 if user_company: query = query.filter(MaterialBase.company_name == user_company) - else: - # 超级管理员或拥有跨域特权:允许跨公司视角 + elif user_role == 'SUPER_ADMIN' or has_cross_company: + # 有特权:允许下拉框传过来的 company 参数生效 if company and company.strip(): query = query.filter(MaterialBase.company_name == company.strip()) diff --git a/inventory-backend/app/services/inbound/product_service.py b/inventory-backend/app/services/inbound/product_service.py index 0e5b615..de57d4f 100644 --- a/inventory-backend/app/services/inbound/product_service.py +++ b/inventory-backend/app/services/inbound/product_service.py @@ -317,8 +317,34 @@ class ProductInboundService: if material_type and material_type.strip(): query = query.filter(MaterialBase.material_type == material_type.strip()) - if company and company.strip(): - query = query.filter(MaterialBase.company_name == company.strip()) + # ============================================================ + # 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离 + # ============================================================ + from flask_jwt_extended import get_jwt + + claims = get_jwt() + user_role = claims.get('role', '').upper() if claims.get('role') else '' + user_company = claims.get('company_name', '') + + # 获取用户权限列表(用于检查 global:cross_company_op 特权) + from app.api.v1.inbound.base import get_current_user_permissions + user_perms = get_current_user_permissions() or [] + normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms) + + # 检查是否拥有全局特权或超管角色 + has_cross_company = 'globalcrosscompanyop' in normalized_perms + + if user_role != 'SUPER_ADMIN' and not has_cross_company: + # 无特权:严禁查其他公司,强制绑定本公司 + if company and company.strip() and company.strip() != user_company: + from flask import abort + abort(403, description=f'越权访问:您无权查询 {company} 的数据') + if user_company: + query = query.filter(MaterialBase.company_name == user_company) + elif user_role == 'SUPER_ADMIN' or has_cross_company: + # 有特权:允许下拉框传过来的 company 参数生效 + if company and company.strip(): + query = query.filter(MaterialBase.company_name == company.strip()) if not statuses: statuses = ['在库', '借库'] diff --git a/inventory-backend/app/services/inbound/repair_service.py b/inventory-backend/app/services/inbound/repair_service.py index 0bd8021..1c4a690 100644 --- a/inventory-backend/app/services/inbound/repair_service.py +++ b/inventory-backend/app/services/inbound/repair_service.py @@ -91,6 +91,30 @@ class RepairInboundService: # 按接收时间升序(先进先出)+ id 升序 query = query.order_by(db.asc(TransRepair.arrival_date), db.asc(TransRepair.id)) + + # ============================================================ + # 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离 + # ============================================================ + from flask_jwt_extended import get_jwt + + claims = get_jwt() + user_role = claims.get('role', '').upper() if claims.get('role') else '' + user_company = claims.get('company_name', '') + + # 获取用户权限列表(用于检查 global:cross_company_op 特权) + from app.api.v1.inbound.base import get_current_user_permissions + user_perms = get_current_user_permissions() or [] + normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms) + + # 检查是否拥有全局特权或超管角色 + has_cross_company = 'globalcrosscompanyop' in normalized_perms + + # 维修表需要通过 base_id 关联 MaterialBase 进行公司过滤 + if user_role != 'SUPER_ADMIN' and not has_cross_company: + # 无特权:强制绑定本公司 + query = query.outerjoin(MaterialBase, TransRepair.base_id == MaterialBase.id) + if user_company: + query = query.filter(MaterialBase.company_name == user_company) # 分页 pagination = query.paginate(page=page, per_page=page_size, error_out=False) diff --git a/inventory-backend/app/services/inbound/semi_service.py b/inventory-backend/app/services/inbound/semi_service.py index 0b4c887..12dfc27 100644 --- a/inventory-backend/app/services/inbound/semi_service.py +++ b/inventory-backend/app/services/inbound/semi_service.py @@ -408,8 +408,34 @@ class SemiInboundService: if material_type and material_type.strip(): query = query.filter(MaterialBase.material_type == material_type.strip()) - if company and company.strip(): - query = query.filter(MaterialBase.company_name == company.strip()) + # ============================================================ + # 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离 + # ============================================================ + from flask_jwt_extended import get_jwt + + claims = get_jwt() + user_role = claims.get('role', '').upper() if claims.get('role') else '' + user_company = claims.get('company_name', '') + + # 获取用户权限列表(用于检查 global:cross_company_op 特权) + from app.api.v1.inbound.base import get_current_user_permissions + user_perms = get_current_user_permissions() or [] + normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms) + + # 检查是否拥有全局特权或超管角色 + has_cross_company = 'globalcrosscompanyop' in normalized_perms + + if user_role != 'SUPER_ADMIN' and not has_cross_company: + # 无特权:严禁查其他公司,强制绑定本公司 + if company and company.strip() and company.strip() != user_company: + from flask import abort + abort(403, description=f'越权访问:您无权查询 {company} 的数据') + if user_company: + query = query.filter(MaterialBase.company_name == user_company) + elif user_role == 'SUPER_ADMIN' or has_cross_company: + # 有特权:允许下拉框传过来的 company 参数生效 + if company and company.strip(): + query = query.filter(MaterialBase.company_name == company.strip()) if not statuses: statuses = ['在库', '借库']