Files
KCGL/inventory-backend/app/services/inbound/base_service.py
dxc d78ef22251 fix: prevent price data leak in inventory export
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
2026-02-28 11:32:21 +08:00

665 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件路径: app/services/inbound/base_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# from app.models.inbound.service import StockService
from sqlalchemy import or_, and_
import traceback
import json
import io
import datetime
# 需要 pip install openpyxl
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
class MaterialBaseService:
"""
基础物料服务层
负责处理 MaterialBase 的增删改查及搜索逻辑
"""
@staticmethod
def search_material(keyword):
"""
根据关键字搜索已启用的基础物料
(供 /api/v1/inbound/base/search 接口调用)
"""
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.common_name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
# 支持搜索公司名
MaterialBase.company_name.ilike(f'%{keyword}%')
)
)
# [修改1] 增加返回数量限制
# 原为 limit(20),现改为 1000确保前端能获取所有或足够多的数据
query = query.limit(1000)
# 获取查询结果对象列表
db_items = query.all()
# [修改2] 规格型号排序逻辑
# 要求:只考虑 '/' 前面的内容进行排序
# 使用 Python 的 sort 方法,提取 spec_model 中 '/' 前的部分
def get_sort_key(item):
if not item.spec_model:
return ""
# 如果包含 '/',取前半部分;否则取整个字符串
parts = item.spec_model.split('/')
return parts[0] if len(parts) > 0 else item.spec_model
# 执行排序
db_items.sort(key=get_sort_key)
results = []
for item in db_items:
results.append({
'id': item.id, # 必须保留ID供前端逻辑使用视觉上的隐藏请在前端处理
'companyName': item.company_name,
'name': item.name,
'commonName': item.common_name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def _get_stock_counts(stock_query):
"""
辅助函数:安全计算库存列表的总数量
"""
total_inv = 0
total_avail = 0
try:
items = list(stock_query) # 触发查询
except:
items = []
for x in items:
# 1. 获取库存数 (兼容不同字段名)
q = getattr(x, 'stock_quantity', getattr(x, 'in_quantity', 0)) # 优先取库存,其次入库
# 2. 获取可用数
a = getattr(x, 'available_quantity', q)
try:
total_inv += float(q if q is not None else 0)
total_avail += float(a if a is not None else 0)
except:
pass
return total_inv, total_avail
@staticmethod
def get_list(page, limit, filters=None):
"""
获取基础信息列表 (带分页和筛选)
"""
from sqlalchemy import func
try:
# 构建聚合子查询
buy_sub = db.session.query(
StockBuy.base_id,
func.sum(StockBuy.stock_quantity).label('buy_inv'),
func.sum(StockBuy.available_quantity).label('buy_avail')
).group_by(StockBuy.base_id).subquery()
semi_sub = db.session.query(
StockSemi.base_id,
func.sum(StockSemi.stock_quantity).label('semi_inv'),
func.sum(StockSemi.available_quantity).label('semi_avail')
).group_by(StockSemi.base_id).subquery()
prod_sub = db.session.query(
StockProduct.base_id,
func.sum(StockProduct.stock_quantity).label('prod_inv'),
func.sum(StockProduct.available_quantity).label('prod_avail')
).group_by(StockProduct.base_id).subquery()
# 总库存和可用数的 SQL 表达式
total_inv = func.coalesce(buy_sub.c.buy_inv, 0) + \
func.coalesce(semi_sub.c.semi_inv, 0) + \
func.coalesce(prod_sub.c.prod_inv, 0)
total_avail = func.coalesce(buy_sub.c.buy_avail, 0) + \
func.coalesce(semi_sub.c.semi_avail, 0) + \
func.coalesce(prod_sub.c.prod_avail, 0)
# 主查询,关联聚合子查询
query = db.session.query(
MaterialBase,
total_inv.label('total_inv'),
total_avail.label('total_avail')
).outerjoin(buy_sub, MaterialBase.id == buy_sub.c.base_id)\
.outerjoin(semi_sub, MaterialBase.id == semi_sub.c.base_id)\
.outerjoin(prod_sub, MaterialBase.id == prod_sub.c.base_id)
if filters:
# 1. 关键词模糊搜索
if filters.get('keyword'):
kw = f"%{filters['keyword']}%"
query = query.filter(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
))
# 2. 精确筛选
if filters.get('company'):
query = query.filter_by(company_name=filters['company'])
if filters.get('category'):
query = query.filter_by(category=filters['category'])
if filters.get('type'):
query = query.filter_by(material_type=filters['type'])
if filters.get('isEnabled') is not None:
is_active = bool(int(filters['isEnabled']))
query = query.filter_by(is_enabled=is_active)
# 排序处理
order_by_column = filters.get('orderByColumn', '')
is_asc = filters.get('isAsc', None)
if order_by_column == 'inventoryCount':
if is_asc == 'asc':
query = query.order_by(total_inv.asc())
else:
query = query.order_by(total_inv.desc())
elif order_by_column == 'availableCount':
if is_asc == 'asc':
query = query.order_by(total_avail.asc())
else:
query = query.order_by(total_avail.desc())
else:
# 默认排序:优先按总库存数降序,当库存相同时,再按规格型号升序
query = query.order_by(total_inv.desc(), MaterialBase.spec_model.asc())
# 分页
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items_list = []
for item, inv, avail in pagination.items:
item_dict = item.to_dict()
item_dict['inventoryCount'] = float(inv) if inv is not None else 0.0
item_dict['availableCount'] = float(avail) if avail is not None else 0.0
items_list.append(item_dict)
return {"total": pagination.total, "items": items_list}
except Exception as e:
traceback.print_exc()
print(f"查询基础信息列表失败: {e}")
return {"total": 0, "items": []}
@staticmethod
def get_distinct_options():
"""
获取所有已存在的类别、类型、公司 (去重且排序)
"""
try:
# 1. 类别 (获取后在内存或前端做层级处理,这里先按字母序返回扁平列表)
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
# 对类别进行排序
sorted_categories = sorted([c[0] for c in categories])
# 2. 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([t[0] for t in types])
# 3. 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([c[0] for c in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception as e:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
@staticmethod
def create_material(data):
"""新增基础信息"""
try:
if not data.get('name') or not data.get('spec'):
raise ValueError("名称和规格型号不能为空")
exist = MaterialBase.query.filter_by(
name=data['name'],
spec_model=data['spec']
).first()
if exist:
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
new_material = MaterialBase(
# [修改] 移除了 'IRIS' 默认值
company_name=data.get('companyName'),
name=data['name'],
common_name=data.get('commonName'),
spec_model=data['spec'],
category=data.get('category'),
material_type=data.get('type'),
unit=data.get('unit'),
visibility_level=data.get('visibilityLevel'),
manual_link=json.dumps(data.get('generalManual', [])),
product_image=json.dumps(data.get('generalImage', [])),
is_enabled=True if data.get('isEnabled', 1) == 1 else False
)
db.session.add(new_material)
db.session.commit()
return new_material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_material(m_id, data):
"""修改基础信息"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
# 更新字段
if 'companyName' in data: material.company_name = data['companyName']
if 'name' in data: material.name = data['name']
if 'commonName' in data: material.common_name = data['commonName']
if 'spec' in data: material.spec_model = data['spec']
if 'category' in data: material.category = data['category']
if 'type' in data: material.material_type = data['type']
if 'unit' in data: material.unit = data['unit']
if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel']
if 'generalManual' in data:
material.manual_link = json.dumps(data['generalManual'])
if 'generalImage' in data:
material.product_image = json.dumps(data['generalImage'])
if 'isEnabled' in data:
material.is_enabled = bool(int(data['isEnabled']))
db.session.commit()
return material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_material(m_id):
"""
删除基础信息 (带依赖检查)
"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
prod_usage_count = StockProduct.query.filter_by(base_id=m_id).count()
total_usage = buy_usage_count + semi_usage_count + prod_usage_count
if total_usage > 0:
raise ValueError(
f"无法删除:该基础物料正被使用中。\n"
f"- 采购库存记录: {buy_usage_count}\n"
f"- 半成品库存记录: {semi_usage_count}\n"
f"- 成品库存记录: {prod_usage_count}\n"
f"请先清理相关库存或仅‘禁用’此条目。"
)
db.session.delete(material)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"删除基础信息失败: {e}")
raise e
# ==============================================================================
# [核心修改] 统一资产统计导出(增加用户权限脱敏)
# ==============================================================================
@staticmethod
def export_excel(filters=None, user_permissions=None):
"""
全口径资产统计报表:
逻辑:先查库存表 (Buy, Semi, Product),关联基础信息,只导出实际存在的库存记录。
并根据用户权限对字段进行脱敏。
"""
try:
# 1. 构造基础信息的筛选条件 (用于过滤库存)
filter_conditions = []
if filters:
if filters.get('keyword'):
kw = f"%{filters['keyword']}%"
filter_conditions.append(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw)
))
if filters.get('company'):
filter_conditions.append(MaterialBase.company_name == filters['company'])
if filters.get('category'):
filter_conditions.append(MaterialBase.category == filters['category'])
if filters.get('type'):
filter_conditions.append(MaterialBase.material_type == filters['type'])
if filters.get('isEnabled') is not None:
is_active = bool(int(filters['isEnabled']))
filter_conditions.append(MaterialBase.is_enabled == is_active)
# 2. 分别查询三个库存表,并 Join MaterialBase 进行筛选
# 2.1 采购库存 (StockBuy)
query_buy = db.session.query(StockBuy, MaterialBase).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_buy = query_buy.filter(cond)
list_buy = query_buy.all()
# 2.2 半成品库存 (StockSemi)
query_semi = db.session.query(StockSemi, MaterialBase).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_semi = query_semi.filter(cond)
list_semi = query_semi.all()
# 2.3 成品库存 (StockProduct)
query_product = db.session.query(StockProduct, MaterialBase).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_product = query_product.filter(cond)
list_product = query_product.all()
# 3. 数据整合
all_rows = []
# 处理采购件
for stock, base in list_buy:
# 价格计算
unit_price = float(stock.pre_tax_unit_price or 0)
tax_rate = float(stock.tax_rate or 0)
price_incl = float(stock.post_tax_unit_price or (unit_price * (1 + tax_rate / 100.0)))
qty = float(stock.stock_quantity or 0)
# 计算不含税总价 = 数量 * 不含税单价
total_val_excl = qty * unit_price
# 计算含税总价 = 数量 * 含税单价
total_val_incl = qty * price_incl
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "采购件",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.supplier_name,
"date": stock.in_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": unit_price,
"total_val_excl": total_val_excl,
"tax": tax_rate,
"price_incl": price_incl,
"total_val": total_val_incl
})
# 处理半成品
for stock, base in list_semi:
cost = float(stock.raw_material_cost or 0) + float(stock.manual_cost or 0)
qty = float(stock.stock_quantity or 0)
# 半成品不含税总价 = 数量 * 成本
total_val_excl = qty * cost
# 含税总价同上 (税率0)
total_val_incl = qty * cost
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "半成品",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.production_manager,
"date": stock.production_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": cost,
"total_val_excl": total_val_excl,
"tax": 0.0,
"price_incl": cost,
"total_val": total_val_incl
})
# 处理成品
for stock, base in list_product:
cost = float(stock.raw_material_cost or 0) + float(stock.manual_cost or 0)
qty = float(stock.stock_quantity or 0)
total_val_excl = qty * cost
total_val_incl = qty * cost
ident = stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "成品",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.production_manager,
"date": stock.production_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": cost,
"total_val_excl": total_val_excl,
"tax": 0.0,
"price_incl": cost,
"total_val": total_val_incl
})
# 4. 排序:按公司 -> 规格型号 -> 基础ID -> 批号 排序
all_rows.sort(key=lambda x: (
x['base'].company_name or "",
x['base'].spec_model or "",
x['base'].id,
x['ident'] or ""
))
# 5. 生成 Excel
wb = Workbook()
ws = wb.active
ws.title = "库存统计"
# 表头
headers = [
"所属公司", "资产名称", "规格型号", "物料类型",
"类别一级", "类别二级", "类别三级", "类别四级", "类别五级",
"计量单位",
"库存性质", "唯一标识码 (批号/SN)", "仓库位置",
"资产来源", "入库/生产日期",
"库存数量", "可用数量",
"单价/成本 (不含税)", "资产总额 (不含税)", "税率 (%)", "单价/成本 (含税)", "资产总额 (含税)"
]
ws.append(headers)
# 确定各字段在表头中的列索引
col_idx = {}
for idx, header in enumerate(headers):
if header == "所属公司":
col_idx['companyName'] = idx
elif header == "资产名称":
col_idx['name'] = idx
elif header == "规格型号":
col_idx['spec'] = idx
elif header == "物料类型":
col_idx['type'] = idx
elif header in ("类别一级","类别二级","类别三级","类别四级","类别五级"):
col_idx.setdefault('category_cols', []).append(idx)
elif header == "计量单位":
col_idx['unit'] = idx
elif header == "库存数量":
col_idx['inventoryCount'] = idx
elif header == "可用数量":
col_idx['availableCount'] = idx
elif header == "单价/成本 (不含税)":
col_idx['price_excl'] = idx
elif header == "资产总额 (不含税)":
col_idx['total_val_excl'] = idx
elif header == "税率 (%)":
col_idx['tax'] = idx
elif header == "单价/成本 (含税)":
col_idx['price_incl'] = idx
elif header == "资产总额 (含税)":
col_idx['total_val'] = idx
# 样式
header_fill = PatternFill(start_color="D7E4BC", end_color="D7E4BC", fill_type="solid")
border_style = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'),
bottom=Side(style='thin'))
for cell in ws[1]:
cell.font = Font(bold=True, name='微软雅黑')
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.fill = header_fill
cell.border = border_style
# 字段到权限码的映射
field_to_perm = {
'companyName': 'material_list:companyName',
'name': 'material_list:name',
'spec': 'material_list:spec',
'type': 'material_list:type',
'unit': 'material_list:unit',
'category': 'material_list:category',
'inventoryCount': 'material_list:inventoryCount',
'availableCount': 'material_list:availableCount'
}
# 写入数据,并脱敏
for r in all_rows:
base = r['base']
# 类别拆分
cat_parts = (base.category or "").split('/')
while len(cat_parts) < 5:
cat_parts.append("")
# 日期格式化
date_str = r['date'].strftime('%Y-%m-%d') if isinstance(r['date'], datetime.date) else ""
row_val = [
base.company_name,
base.name,
base.spec_model,
base.material_type,
cat_parts[0], cat_parts[1], cat_parts[2], cat_parts[3], cat_parts[4],
base.unit,
r['type_name'],
r['ident'],
r['loc'],
r['source'],
date_str,
r['qty'],
r['avail'],
r['price_excl'],
r['total_val_excl'],
r['tax'],
r['price_incl'],
r['total_val']
]
# 根据用户权限脱敏
if user_permissions is not None:
for field, perm_code in field_to_perm.items():
if perm_code not in user_permissions:
if field == 'category':
for cat_idx in col_idx.get('category_cols', []):
row_val[cat_idx] = ''
elif field in col_idx:
row_val[col_idx[field]] = ''
# 联动脱敏:根据数据来源,校验对应模块的价格/成本权限
if user_permissions is not None:
# 超级管理员拥有所有权限,跳过价格脱敏
if 'material_list:*' in user_permissions:
# 拥有通配符权限,不隐藏价格列
pass
else:
has_price_perm = True
row_type = r['type_name']
# 根据数据来源检查对应模块的权限
if row_type == '采购件':
# 校验采购模块的价格权限
has_price_perm = any(p in user_permissions for p in ['inbound_buy:postTaxUnitPrice', 'inbound_buy:preTaxUnitPrice', 'inbound_buy:totalAmount'])
elif row_type == '半成品':
# 校验半成品模块的成本权限
has_price_perm = any(p in user_permissions for p in ['inbound_semi:rawMaterialCost', 'inbound_semi:manualCost'])
elif row_type == '成品':
# 校验成品模块的成本权限
has_price_perm = any(p in user_permissions for p in ['inbound_product:rawMaterialCost', 'inbound_product:manualCost'])
else:
# 未知类型,默认隐藏价格列
has_price_perm = False
# 如果没有对应模块的价格查看权限则清空涉密的5个列
if not has_price_perm:
for p_col in ['price_excl', 'total_val_excl', 'tax', 'price_incl', 'total_val']:
if p_col in col_idx:
row_val[col_idx[p_col]] = ''
ws.append(row_val)
# 列宽调整
dims = {}
for row in ws.rows:
for cell in row:
if cell.value:
dims[cell.column_letter] = max((dims.get(cell.column_letter, 0), len(str(cell.value))))
for col, value in dims.items():
ws.column_dimensions[col].width = min(value + 2, 30)
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output
except Exception as e:
traceback.print_exc()
raise e