feat(security): implement strict row-level data isolation based on user company

This commit is contained in:
DXC
2026-04-14 08:38:50 +08:00
parent 81bfb29b50
commit 0e8ddd0851
3 changed files with 60 additions and 11 deletions

View File

@ -104,6 +104,8 @@ class AuthService:
user_id = user.id user_id = user.id
user_info = user.to_dict() user_info = user.to_dict()
user_info['role'] = user_role user_info['role'] = user_role
# 获取用户所属公司(存于 department 字段)
user_company = user.department or ''
# 3. 生成 Token # 3. 生成 Token
# Token 中 identity 存数据库IDclaims 存登录账号ID # Token 中 identity 存数据库IDclaims 存登录账号ID
@ -115,7 +117,8 @@ class AuthService:
additional_claims={ additional_claims={
'role': user_role, 'role': user_role,
'username': account_id, # 存纯账号ID 'username': account_id, # 存纯账号ID
'display_name': user_info.get('username') # 存显示名 'display_name': user_info.get('username'), # 存显示名
'company_name': user_company # 存所属公司
} }
) )
@ -125,7 +128,8 @@ class AuthService:
additional_claims={ additional_claims={
'role': user_role, 'role': user_role,
'username': account_id, 'username': account_id,
'display_name': user_info.get('display_name', account_id) 'display_name': user_info.get('display_name', account_id),
'company_name': user_company
} }
) )

View File

@ -1,5 +1,6 @@
# 文件路径: app/services/inbound/base_service.py # 文件路径: app/services/inbound/base_service.py
from flask_jwt_extended import get_jwt
from app.extensions import db from app.extensions import db
from app.models.base import MaterialBase, MaterialWarningSetting from app.models.base import MaterialBase, MaterialWarningSetting
from app.models.inbound.buy import StockBuy from app.models.inbound.buy import StockBuy
@ -209,9 +210,26 @@ class MaterialBaseService:
MaterialBase.spec_model.ilike(kw) MaterialBase.spec_model.ilike(kw)
)) ))
company = filters.get('company') # ============================================================
if company is not None and company != '': # 【行级数据隔离】基于 JWT 中的 company_name 进行过滤
query = query.filter(MaterialBase.company_name.ilike(company.strip())) # ============================================================
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取前端传的查询参数
req_company = filters.get('company') if filters else None
if user_role != 'SUPER_ADMIN':
# 普通用户:强制隔离!无视前端传的 company 参数
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
# 如果用户没有所属公司字段,则只显示公司为空的记录(或不允许查看)
else:
# 超级管理员:允许跨公司视角
if req_company:
query = query.filter(MaterialBase.company_name == req_company)
# 超管没选公司则不加过滤,看到全量
category = filters.get('category') category = filters.get('category')
if category is not None and category != '': if category is not None and category != '':
@ -618,9 +636,23 @@ class MaterialBaseService:
MaterialBase.spec_model.ilike(kw), MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw) MaterialBase.company_name.ilike(kw)
)) ))
company = filters.get('company') # ============================================================
if company is not None and company != '': # 【行级数据隔离】基于 JWT 中的 company_name 进行过滤(高级筛选)
filter_conditions.append(MaterialBase.company_name.ilike(company.strip())) # ============================================================
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
req_company = filters.get('company') if filters else None
if user_role != 'SUPER_ADMIN':
# 普通用户:强制隔离
if user_company:
filter_conditions.append(MaterialBase.company_name == user_company)
else:
# 超级管理员:允许跨公司视角
if req_company:
filter_conditions.append(MaterialBase.company_name == req_company)
category = filters.get('category') category = filters.get('category')
if category is not None and category != '': if category is not None and category != '':
filter_conditions.append(MaterialBase.category.ilike(category.strip())) filter_conditions.append(MaterialBase.category.ilike(category.strip()))

View File

@ -1,4 +1,5 @@
# inventory-backend/app/services/inbound/buy_service.py # inventory-backend/app/services/inbound/buy_service.py
from flask_jwt_extended import get_jwt
from app.extensions import db from app.extensions import db
from app.models.inbound.buy import StockBuy from app.models.inbound.buy import StockBuy
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -347,9 +348,21 @@ class BuyInboundService:
if material_type and material_type.strip(): if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip()) query = query.filter(MaterialBase.material_type == material_type.strip())
# 3.1 公司独立搜索 [新增] # ============================================================
if company and company.strip(): # 【行级数据隔离】基于 JWT 中的 company_name 进行过滤
query = query.filter(MaterialBase.company_name == company.strip()) # ============================================================
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
if user_role != 'SUPER_ADMIN':
# 普通用户:强制隔离!无视前端传的 company 参数
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
else:
# 超级管理员:允许跨公司视角
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())
# 4. 状态筛选 # 4. 状态筛选
if not statuses: statuses = ['在库', '借库'] if not statuses: statuses = ['在库', '借库']