Files
KCGL/inventory-backend/app/services/inbound/base_service.py

952 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件路径: app/services/inbound/base_service.py
from app.extensions import db
from app.models.base import MaterialBase, MaterialWarningSetting
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# from app.models.inbound.service import StockService
from sqlalchemy import or_, and_, func, case, desc, asc, cast, Numeric, text
import traceback
import json
import io
import datetime
# 需要 pip install openpyxl
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from collections import defaultdict
class MaterialBaseService:
"""
基础物料服务层
负责处理 MaterialBase 的增删改查及搜索逻辑
"""
@staticmethod
def search_material(keyword):
"""
根据关键字搜索已启用的基础物料
(供 /api/v1/inbound/base/search 接口调用)
"""
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.common_name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
# 支持搜索公司名
MaterialBase.company_name.ilike(f'%{keyword}%')
)
)
# [修改1] 增加返回数量限制
# 原为 limit(20),现改为 1000确保前端能获取所有或足够多的数据
query = query.limit(1000)
# 获取查询结果对象列表
db_items = query.all()
# [修改2] 规格型号排序逻辑
# 要求:只考虑 '/' 前面的内容进行排序
# 使用 Python 的 sort 方法,提取 spec_model 中 '/' 前的部分
def get_sort_key(item):
if not item.spec_model:
return ""
# 如果包含 '/',取前半部分;否则取整个字符串
parts = item.spec_model.split('/')
return parts[0] if len(parts) > 0 else item.spec_model
# 执行排序
db_items.sort(key=get_sort_key)
results = []
for item in db_items:
results.append({
'id': item.id, # 必须保留ID供前端逻辑使用视觉上的隐藏请在前端处理
'companyName': item.company_name,
'name': item.name,
'commonName': item.common_name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用',
# 强制质检标记
'isInspectionRequired': bool(item.is_inspection_required)
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def _get_stock_counts(stock_query):
"""
辅助函数:安全计算库存列表的总数量
"""
total_inv = 0
total_avail = 0
try:
items = list(stock_query) # 触发查询
except:
items = []
for x in items:
# 1. 获取库存数 (兼容不同字段名)
q = getattr(x, 'stock_quantity', getattr(x, 'in_quantity', 0)) # 优先取库存,其次入库
# 2. 获取可用数
a = getattr(x, 'available_quantity', q)
try:
total_inv += float(q if q is not None else 0)
total_avail += float(a if a is not None else 0)
except:
pass
return total_inv, total_avail
@staticmethod
def get_list(page, limit, filters=None, user_permissions=None):
"""
获取基础信息列表 (带分页、高级筛选和全字段排序)
支持库存预警功能(如果用户有 view_warning 权限)
修复说明:
1. 使用子查询方案确保聚合列(total_inv)能正确参与预警排序计算
2. 高级筛选中针对聚合字段使用 HAVING 而非 WHERE
"""
try:
# 检查用户是否有查看预警的权限
has_warning_permission = False
if user_permissions:
# 超级管理员有所有权限
if '*' in user_permissions or 'material_list:*' in user_permissions:
has_warning_permission = True
elif 'material_list:view_warning' in user_permissions:
has_warning_permission = True
# 构建聚合子查询
buy_sub = db.session.query(
StockBuy.base_id,
func.sum(StockBuy.stock_quantity).label('buy_inv'),
func.sum(StockBuy.available_quantity).label('buy_avail')
).group_by(StockBuy.base_id).subquery()
semi_sub = db.session.query(
StockSemi.base_id,
func.sum(StockSemi.stock_quantity).label('semi_inv'),
func.sum(StockSemi.available_quantity).label('semi_avail')
).group_by(StockSemi.base_id).subquery()
prod_sub = db.session.query(
StockProduct.base_id,
func.sum(StockProduct.stock_quantity).label('prod_inv'),
func.sum(StockProduct.available_quantity).label('prod_avail')
).group_by(StockProduct.base_id).subquery()
# 总库存和可用数的 SQL 表达式(用于后续计算)
total_inv = func.coalesce(buy_sub.c.buy_inv, 0) + \
func.coalesce(semi_sub.c.semi_inv, 0) + \
func.coalesce(prod_sub.c.prod_inv, 0)
total_avail = func.coalesce(buy_sub.c.buy_avail, 0) + \
func.coalesce(semi_sub.c.semi_avail, 0) + \
func.coalesce(prod_sub.c.prod_avail, 0)
# 导入预警设置模型
from app.models.base import MaterialWarningSetting
# ============================================================
# 【核心修复】使用子查询方案:将聚合结果作为子查询
# 这样 total_inv 在外层查询中是一个普通列,可安全参与计算
# ============================================================
# 内层子查询:基础数据 + 聚合库存
inner_sub = db.session.query(
MaterialBase.id.label('base_id'),
MaterialBase,
total_inv.label('total_inv'),
total_avail.label('total_avail')
).outerjoin(buy_sub, MaterialBase.id == buy_sub.c.base_id) \
.outerjoin(semi_sub, MaterialBase.id == semi_sub.c.base_id) \
.outerjoin(prod_sub, MaterialBase.id == prod_sub.c.base_id)
# 将内层子查询具体化
inner_sub = inner_sub.subquery()
# 外层查询:关联预警设置,并在外层处理排序和高级筛选
query = db.session.query(
MaterialBase,
inner_sub.c.total_inv,
inner_sub.c.total_avail,
MaterialWarningSetting.is_enabled.label('warning_enabled'),
MaterialWarningSetting.yellow_threshold.label('warning_yellow'),
MaterialWarningSetting.red_threshold.label('warning_red')
).outerjoin(inner_sub, MaterialBase.id == inner_sub.c.base_id) \
.outerjoin(MaterialWarningSetting, MaterialBase.id == MaterialWarningSetting.base_id)
# 【修复】关键词搜索必须在主查询上过滤,不能在 inner_sub 上(因为使用了 Outer Join
if filters:
search_field = filters.get('searchField', 'all')
keyword = filters.get('keyword')
if keyword:
kw = f"%{keyword}%"
if search_field == 'name':
query = query.filter(MaterialBase.name.ilike(kw))
elif search_field == 'common_name':
query = query.filter(MaterialBase.common_name.ilike(kw))
elif search_field == 'spec':
query = query.filter(MaterialBase.spec_model.ilike(kw))
else:
query = query.filter(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
))
company = filters.get('company')
if company is not None and company != '':
query = query.filter(MaterialBase.company_name.ilike(company.strip()))
category = filters.get('category')
if category is not None and category != '':
query = query.filter(MaterialBase.category.ilike(category.strip()))
type_val = filters.get('type')
if type_val is not None and type_val != '':
query = query.filter(MaterialBase.material_type.ilike(type_val.strip()))
if filters.get('isEnabled') is not None:
val_str = str(filters['isEnabled']).lower()
is_active = val_str in ['1', 'true', 'yes', 't']
query = query.filter(MaterialBase.is_enabled == is_active)
# 库存状态筛选 (has_stock) - 使用子查询列
has_stock = filters.get('has_stock')
if has_stock and str(has_stock).lower() in ['true', '1', 'yes']:
query = query.filter(inner_sub.c.total_inv > 0)
# ============================================================
# 【修复3】高级筛选统一使用 WHERE (filter)
# 聚合字段同样是子查询列,直接用 filter 过滤即可
# ============================================================
advanced_filters = filters.get('advancedFilters', []) if filters else []
filter_conditions = []
if advanced_filters:
allowed_fields = {
'companyName': 'company_name',
'name': 'name',
'commonName': 'common_name',
'category': 'category',
'type': 'material_type',
'spec': 'spec_model',
'unit': 'unit',
'inventoryCount': 'total_inv',
'availableCount': 'total_avail'
}
# 聚合字段列表
aggregate_fields = {'inventoryCount', 'availableCount'}
field_permission_map = {
'companyName': 'material_list:companyName',
'name': 'material_list:name',
'commonName': 'material_list:commonName',
'category': 'material_list:category',
'type': 'material_list:type',
'spec': 'material_list:spec',
'unit': 'material_list:unit',
'inventoryCount': 'material_list:inventoryCount',
'availableCount': 'material_list:availableCount'
}
for condition in advanced_filters:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not field or not operator or value is None:
continue
db_field = allowed_fields.get(field)
if not db_field:
continue
# 权限校验
if user_permissions is not None:
perm_code = field_permission_map.get(field)
if 'material_list:*' in user_permissions:
pass
elif perm_code and perm_code not in user_permissions:
continue
# 判断是否为聚合字段
is_aggregate = field in aggregate_fields
if is_aggregate:
# 聚合字段:直接使用子查询列,通过 WHERE (filter) 过滤
col = inner_sub.c.total_inv if field == 'inventoryCount' else inner_sub.c.total_avail
try:
num_val = float(value)
except ValueError:
continue
if operator == 'eq':
filter_conditions.append(col == num_val)
elif operator == 'ne':
filter_conditions.append(col != num_val)
elif operator == 'ge':
filter_conditions.append(col >= num_val)
elif operator == 'le':
filter_conditions.append(col <= num_val)
else:
# 非聚合字段:使用 WHERE
column = getattr(MaterialBase, db_field, None)
if column is None:
continue
if operator == 'eq':
filter_conditions.append(column == value)
elif operator == 'ne':
filter_conditions.append(column != value)
elif operator == 'contains':
filter_conditions.append(column.ilike(f'%{value}%'))
elif operator == 'ge':
try:
num_val = float(value)
filter_conditions.append(column >= num_val)
except ValueError:
continue
elif operator == 'le':
try:
num_val = float(value)
filter_conditions.append(column <= num_val)
except ValueError:
continue
# 应用 WHERE 条件(统一使用 filter
if filter_conditions:
query = query.filter(and_(*filter_conditions))
# 排序处理(支持全字段)
order_by_column = filters.get('orderByColumn', '')
is_asc = filters.get('isAsc', None)
# 信任前端传递的预警排序开关(前端已基于权限注入)
enable_warning_sort = filters.get('enableWarningSort', False)
if enable_warning_sort:
print("====== [DEBUG] 成功进入预警强排逻辑 ======")
# 直接在 order_by 中进行计算排序,不污染 select 列
inv_val = inner_sub.c.total_inv
red_val = cast(MaterialWarningSetting.red_threshold, Numeric)
yellow_val = cast(MaterialWarningSetting.yellow_threshold, Numeric)
# 预警等级计算:红=2, 黄=1, 正常=0
warning_level = case(
(and_(MaterialWarningSetting.is_enabled.is_(True), inv_val <= red_val), 2),
(and_(MaterialWarningSetting.is_enabled.is_(True), inv_val <= yellow_val), 1),
else_=0
)
# 红色预警时的缺口
red_shortage = case(
(and_(MaterialWarningSetting.is_enabled.is_(True), inv_val <= red_val), red_val - inv_val),
else_=0
)
# 黄色预警时的缺口
yellow_distance = case(
(and_(MaterialWarningSetting.is_enabled.is_(True), inv_val > red_val, inv_val <= yellow_val), inv_val - red_val),
else_=999999
)
# 直接在 order_by 中使用 case() 表达式
query = query.order_by(
desc(warning_level),
desc(red_shortage),
asc(yellow_distance),
desc(inv_val),
desc(MaterialBase.id)
)
elif order_by_column:
# 字段映射 - 使用子查询列进行排序
sort_field_map = {
'companyName': MaterialBase.company_name,
'name': MaterialBase.name,
'commonName': MaterialBase.common_name,
'category': MaterialBase.category,
'type': MaterialBase.material_type,
'spec': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'inventoryCount': inner_sub.c.total_inv,
'availableCount': inner_sub.c.total_avail
}
sort_column = sort_field_map.get(order_by_column)
if sort_column is not None:
if is_asc == 'asc':
query = query.order_by(sort_column.asc())
elif is_asc == 'desc':
query = query.order_by(sort_column.desc())
else:
# 默认排序:优先按总库存数降序,当库存相同时,再按规格型号升序
query = query.order_by(inner_sub.c.total_inv.desc(), MaterialBase.spec_model.asc())
# 分页
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items_list = []
for row in pagination.items:
# 防弹解包逻辑:直接判断自身是否有 to_dict 方法
if hasattr(row, 'to_dict'):
# 说明查询只返回了 MaterialBase 单一对象
item = row
inv = avail = warning_enabled = warning_yellow = warning_red = None
else:
# 说明返回了 Row 对象 (包含多个字段)
item = row[0]
inv = row[1] if len(row) > 1 else None
avail = row[2] if len(row) > 2 else None
warning_enabled = row[3] if len(row) > 3 else False
warning_yellow = row[4] if len(row) > 4 else 0
warning_red = row[5] if len(row) > 5 else 0
# 安全兜底
if not hasattr(item, 'to_dict'):
continue
item_dict = item.to_dict()
item_dict['inventoryCount'] = float(inv) if inv is not None else 0
item_dict['availableCount'] = float(avail) if avail is not None else 0
# 处理预警信息(仅当用户有权限时)
if has_warning_permission:
item_dict['warningEnabled'] = bool(warning_enabled) if warning_enabled is not None else False
item_dict['warningYellow'] = float(warning_yellow) if warning_yellow is not None else None
item_dict['warningRed'] = float(warning_red) if warning_red is not None else None
# 计算预警状态
if warning_enabled and warning_red is not None:
invQty = item_dict['inventoryCount']
if invQty <= warning_red:
item_dict['warningStatus'] = 2 # 红色
elif warning_yellow is not None and invQty <= warning_yellow:
item_dict['warningStatus'] = 1 # 黄色
else:
item_dict['warningStatus'] = 0 # 正常
else:
item_dict['warningStatus'] = 0 # 未开启或未设置
items_list.append(item_dict)
return {"total": pagination.total, "items": items_list}
except Exception as e:
traceback.print_exc()
print(f"查询基础信息列表失败: {e}")
return {"total": 0, "items": []}
@staticmethod
def get_distinct_options():
"""
获取所有已存在的类别、类型、公司 (去重且排序)
"""
try:
# 1. 类别 (获取后在内存或前端做层级处理,这里先按字母序返回扁平列表)
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
# 对类别进行排序
sorted_categories = sorted([c[0] for c in categories])
# 2. 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([t[0] for t in types])
# 3. 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([c[0] for c in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception as e:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
@staticmethod
def create_material(data):
"""新增基础信息"""
try:
if not data.get('name') or not data.get('spec'):
raise ValueError("名称和规格型号不能为空")
exist = MaterialBase.query.filter_by(
name=data['name'],
spec_model=data['spec']
).first()
if exist:
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
# 【核心修改】:兼容前端传来的布尔值
raw_enabled = data.get('isEnabled', True)
is_enabled_val = str(raw_enabled).lower() in ['1', 'true', 'yes', 't'] if raw_enabled is not None else True
new_material = MaterialBase(
company_name=data.get('companyName'),
name=data['name'],
common_name=data.get('commonName'),
spec_model=data['spec'],
category=data.get('category'),
material_type=data.get('type'),
unit=data.get('unit'),
visibility_level=data.get('visibilityLevel'),
manual_link=json.dumps(data.get('generalManual', [])),
product_image=json.dumps(data.get('generalImage', [])),
is_enabled=is_enabled_val
)
db.session.add(new_material)
db.session.commit()
return new_material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_material(m_id, data):
"""修改基础信息"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
# 更新字段
if 'companyName' in data: material.company_name = data['companyName']
if 'name' in data: material.name = data['name']
if 'commonName' in data: material.common_name = data['commonName']
if 'spec' in data: material.spec_model = data['spec']
if 'category' in data: material.category = data['category']
if 'type' in data: material.material_type = data['type']
if 'unit' in data: material.unit = data['unit']
if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel']
if 'generalManual' in data:
material.manual_link = json.dumps(data['generalManual'])
if 'generalImage' in data:
material.product_image = json.dumps(data['generalImage'])
# 【核心修改】:兼容前端传来的布尔值
if 'isEnabled' in data:
raw_enabled = data['isEnabled']
material.is_enabled = str(raw_enabled).lower() in ['1', 'true', 'yes', 't']
db.session.commit()
return material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_material(m_id):
"""
删除基础信息 (带依赖检查)
"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
prod_usage_count = StockProduct.query.filter_by(base_id=m_id).count()
total_usage = buy_usage_count + semi_usage_count + prod_usage_count
if total_usage > 0:
raise ValueError(
f"无法删除:该基础物料正被使用中。\n"
f"- 采购库存记录: {buy_usage_count}\n"
f"- 半成品库存记录: {semi_usage_count}\n"
f"- 成品库存记录: {prod_usage_count}\n"
f"请先清理相关库存或仅‘禁用’此条目。"
)
db.session.delete(material)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"删除基础信息失败: {e}")
raise e
# ==============================================================================
# [核心修改] 统一资产统计导出(增加最高单价计算逻辑)
# ==============================================================================
@staticmethod
def export_excel(filters=None, user_permissions=None):
"""
全口径资产统计报表:
根据基础信息列表和库存表,计算出每个物料的最高历史单价并进行导出
"""
try:
# 1. 构造基础信息的筛选条件 (用于过滤库存)
filter_conditions = []
if filters:
if filters.get('keyword'):
kw = f"%{filters['keyword']}%"
filter_conditions.append(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw)
))
company = filters.get('company')
if company is not None and company != '':
filter_conditions.append(MaterialBase.company_name.ilike(company.strip()))
category = filters.get('category')
if category is not None and category != '':
filter_conditions.append(MaterialBase.category.ilike(category.strip()))
type_val = filters.get('type')
if type_val is not None and type_val != '':
filter_conditions.append(MaterialBase.material_type.ilike(type_val.strip()))
# 【核心修改】:增强布尔值解析
if filters.get('isEnabled') is not None:
val_str = str(filters['isEnabled']).lower()
is_active = val_str in ['1', 'true', 'yes', 't']
filter_conditions.append(MaterialBase.is_enabled == is_active)
# 2. 分别查询三个库存表,并 Join MaterialBase 进行筛选
# 2.1 采购库存 (StockBuy)
query_buy = db.session.query(StockBuy, MaterialBase).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_buy = query_buy.filter(cond)
list_buy = query_buy.all()
# 2.2 半成品库存 (StockSemi)
query_semi = db.session.query(StockSemi, MaterialBase).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_semi = query_semi.filter(cond)
list_semi = query_semi.all()
# 2.3 成品库存 (StockProduct)
query_product = db.session.query(StockProduct, MaterialBase).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_product = query_product.filter(cond)
list_product = query_product.all()
# ====================================================
# [核心新增] 预先计算每个 base_id 的全局最高历史单价
# 优先级:采购件 > 半成品 > 成品
# ====================================================
buy_max_prices = {}
for stock, base in list_buy:
price = float(stock.pre_tax_unit_price or 0)
if price > buy_max_prices.get(base.id, 0):
buy_max_prices[base.id] = price
semi_max_prices = {}
for stock, base in list_semi:
# 半成品的单价直接取自 manual_cost 字段(单件总成本)
price = float(stock.manual_cost or 0)
if price > semi_max_prices.get(base.id, 0):
semi_max_prices[base.id] = price
product_max_prices = {}
for stock, base in list_product:
# 成品的单价直接取自 manual_cost 字段(单件总成本)
price = float(stock.manual_cost or 0)
if price > product_max_prices.get(base.id, 0):
product_max_prices[base.id] = price
# 构造获取某个物料最高价的闭包函数
def get_highest_price(base_id):
if base_id in buy_max_prices and buy_max_prices[base_id] > 0:
return buy_max_prices[base_id]
if base_id in semi_max_prices and semi_max_prices[base_id] > 0:
return semi_max_prices[base_id]
if base_id in product_max_prices and product_max_prices[base_id] > 0:
return product_max_prices[base_id]
return 0.0
# 3. 数据整合
all_rows = []
# 处理采购件
for stock, base in list_buy:
qty = float(stock.stock_quantity or 0)
# 使用该物料的全局最高单价作为不含税单价
highest_excl_price = get_highest_price(base.id)
tax_rate = float(stock.tax_rate or 0)
# 计算含税单价和总额
highest_incl_price = highest_excl_price * (1 + tax_rate / 100.0)
total_val_excl = qty * highest_excl_price
total_val_incl = qty * highest_incl_price
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "采购件",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.supplier_name,
"date": stock.in_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": highest_excl_price,
"total_val_excl": total_val_excl,
"tax": tax_rate,
"price_incl": highest_incl_price,
"total_val": total_val_incl
})
# 处理半成品
for stock, base in list_semi:
qty = float(stock.stock_quantity or 0)
# 半成品的单价直接取自 manual_cost 字段(单件总成本)
unit_cost = float(stock.manual_cost or 0)
total_val_excl = qty * unit_cost
total_val_incl = qty * unit_cost # 半成品无税
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "半成品",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.production_manager,
"date": stock.production_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": unit_cost,
"total_val_excl": total_val_excl,
"tax": 0.0,
"price_incl": unit_cost,
"total_val": total_val_incl
})
# 处理成品
for stock, base in list_product:
qty = float(stock.stock_quantity or 0)
# 成品的单价直接取自 manual_cost 字段(单件总成本)
unit_cost = float(stock.manual_cost or 0)
total_val_excl = qty * unit_cost
total_val_incl = qty * unit_cost
ident = stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "成品",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.production_manager,
"date": stock.production_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": unit_cost,
"total_val_excl": total_val_excl,
"tax": 0.0,
"price_incl": unit_cost,
"total_val": total_val_incl
})
# 4. 排序:按公司 -> 规格型号 -> 基础ID -> 批号 排序
all_rows.sort(key=lambda x: (
x['base'].company_name or "",
x['base'].spec_model or "",
x['base'].id,
x['ident'] or ""
))
# 5. 生成 Excel
wb = Workbook()
ws = wb.active
ws.title = "库存统计"
# 表头 (严格对应你的图 5)
headers = [
"所属公司", "资产名称", "规格型号", "物料类型",
"类别一级", "类别二级", "类别三级", "类别四级", "类别五级",
"计量单位",
"库存性质", "唯一标识码 (批号/SN)", "仓库位置",
"资产来源", "入库/生产日期",
"库存数量", "可用数量",
"单价/成本 (不含税)", "资产总额 (不含税)", "税率 (%)", "单价/成本 (含税)", "资产总额 (含税)"
]
ws.append(headers)
# 确定各字段在表头中的列索引
col_idx = {}
for idx, header in enumerate(headers):
if header == "所属公司":
col_idx['companyName'] = idx
elif header == "资产名称":
col_idx['name'] = idx
elif header == "规格型号":
col_idx['spec'] = idx
elif header == "物料类型":
col_idx['type'] = idx
elif header in ("类别一级", "类别二级", "类别三级", "类别四级", "类别五级"):
col_idx.setdefault('category_cols', []).append(idx)
elif header == "计量单位":
col_idx['unit'] = idx
elif header == "库存数量":
col_idx['inventoryCount'] = idx
elif header == "可用数量":
col_idx['availableCount'] = idx
elif header == "单价/成本 (不含税)":
col_idx['price_excl'] = idx
elif header == "资产总额 (不含税)":
col_idx['total_val_excl'] = idx
elif header == "税率 (%)":
col_idx['tax'] = idx
elif header == "单价/成本 (含税)":
col_idx['price_incl'] = idx
elif header == "资产总额 (含税)":
col_idx['total_val'] = idx
# 样式
header_fill = PatternFill(start_color="D7E4BC", end_color="D7E4BC", fill_type="solid")
border_style = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'),
bottom=Side(style='thin'))
for cell in ws[1]:
cell.font = Font(bold=True, name='微软雅黑')
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.fill = header_fill
cell.border = border_style
# 字段到权限码的映射
field_to_perm = {
'companyName': 'material_list:companyName',
'name': 'material_list:name',
'spec': 'material_list:spec',
'type': 'material_list:type',
'unit': 'material_list:unit',
'category': 'material_list:category',
'inventoryCount': 'material_list:inventoryCount',
'availableCount': 'material_list:availableCount'
}
# 写入数据,并脱敏
for r in all_rows:
base = r['base']
# 类别拆分
cat_parts = (base.category or "").split('/')
while len(cat_parts) < 5:
cat_parts.append("")
# 日期格式化
date_str = r['date'].strftime('%Y-%m-%d') if isinstance(r['date'], datetime.date) else ""
row_val = [
base.company_name,
base.name,
base.spec_model,
base.material_type,
cat_parts[0], cat_parts[1], cat_parts[2], cat_parts[3], cat_parts[4],
base.unit,
r['type_name'],
r['ident'],
r['loc'],
r['source'],
date_str,
r['qty'],
r['avail'],
r['price_excl'],
r['total_val_excl'],
r['tax'],
r['price_incl'],
r['total_val']
]
# 根据用户权限脱敏
if user_permissions is not None:
for field, perm_code in field_to_perm.items():
if perm_code not in user_permissions:
if field == 'category':
for cat_idx in col_idx.get('category_cols', []):
row_val[cat_idx] = ''
elif field in col_idx:
row_val[col_idx[field]] = ''
# 联动脱敏:根据数据来源,校验对应模块的价格/成本权限
if user_permissions is not None:
# 超级管理员拥有所有权限,跳过价格脱敏
if 'material_list:*' in user_permissions:
# 拥有通配符权限,不隐藏价格列
pass
else:
has_price_perm = True
row_type = r['type_name']
# 根据数据来源检查对应模块的权限
if row_type == '采购件':
# 校验采购模块的价格权限
has_price_perm = any(p in user_permissions for p in
['inbound_buy:postTaxUnitPrice', 'inbound_buy:preTaxUnitPrice',
'inbound_buy:totalAmount'])
elif row_type == '半成品':
# 校验半成品模块的成本权限
has_price_perm = any(p in user_permissions for p in
['inbound_semi:rawMaterialCost', 'inbound_semi:manualCost'])
elif row_type == '成品':
# 校验成品模块的成本权限
has_price_perm = any(p in user_permissions for p in
['inbound_product:rawMaterialCost', 'inbound_product:manualCost'])
else:
# 未知类型,默认隐藏价格列
has_price_perm = False
# 如果没有对应模块的价格查看权限则清空涉密的5个列
if not has_price_perm:
for p_col in ['price_excl', 'total_val_excl', 'tax', 'price_incl', 'total_val']:
if p_col in col_idx:
row_val[col_idx[p_col]] = ''
ws.append(row_val)
# 列宽调整
dims = {}
for row in ws.rows:
for cell in row:
if cell.value:
dims[cell.column_letter] = max((dims.get(cell.column_letter, 0), len(str(cell.value))))
for col, value in dims.items():
ws.column_dimensions[col].width = min(value + 2, 30)
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output
except Exception as e:
traceback.print_exc()
raise e