Files
KCGL/inventory-backend/app/services/inbound/base_service.py

946 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件路径: app/services/inbound/base_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# from app.models.inbound.service import StockService
from sqlalchemy import or_, and_, func
import traceback
import json
import io
import datetime
# 需要 pip install openpyxl
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from collections import defaultdict
class MaterialBaseService:
"""
基础物料服务层
负责处理 MaterialBase 的增删改查及搜索逻辑
"""
@staticmethod
def search_material(keyword):
"""
根据关键字搜索已启用的基础物料
(供 /api/v1/inbound/base/search 接口调用)
"""
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.common_name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
# 支持搜索公司名
MaterialBase.company_name.ilike(f'%{keyword}%')
)
)
# [修改1] 增加返回数量限制
# 原为 limit(20),现改为 1000确保前端能获取所有或足够多的数据
query = query.limit(1000)
# 获取查询结果对象列表
db_items = query.all()
# [修改2] 规格型号排序逻辑
# 要求:只考虑 '/' 前面的内容进行排序
# 使用 Python 的 sort 方法,提取 spec_model 中 '/' 前的部分
def get_sort_key(item):
if not item.spec_model:
return ""
# 如果包含 '/',取前半部分;否则取整个字符串
parts = item.spec_model.split('/')
return parts[0] if len(parts) > 0 else item.spec_model
# 执行排序
db_items.sort(key=get_sort_key)
results = []
for item in db_items:
results.append({
'id': item.id, # 必须保留ID供前端逻辑使用视觉上的隐藏请在前端处理
'companyName': item.company_name,
'name': item.name,
'commonName': item.common_name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def _get_stock_counts(stock_query):
"""
辅助函数:安全计算库存列表的总数量
"""
total_inv = 0
total_avail = 0
try:
items = list(stock_query) # 触发查询
except:
items = []
for x in items:
# 1. 获取库存数 (兼容不同字段名)
q = getattr(x, 'stock_quantity', getattr(x, 'in_quantity', 0)) # 优先取库存,其次入库
# 2. 获取可用数
a = getattr(x, 'available_quantity', q)
try:
total_inv += float(q if q is not None else 0)
total_avail += float(a if a is not None else 0)
except:
pass
return total_inv, total_avail
@staticmethod
def get_list(page, limit, filters=None, user_permissions=None):
"""
获取基础信息列表 (带分页、高级筛选和全字段排序)
支持库存预警功能(如果用户有 view_warning 权限)
"""
try:
# 检查用户是否有查看预警的权限
has_warning_permission = False
if user_permissions:
# 超级管理员有所有权限
if '*' in user_permissions or 'material_list:*' in user_permissions:
has_warning_permission = True
elif 'material_list:view_warning' in user_permissions:
has_warning_permission = True
# 构建聚合子查询
buy_sub = db.session.query(
StockBuy.base_id,
func.sum(StockBuy.stock_quantity).label('buy_inv'),
func.sum(StockBuy.available_quantity).label('buy_avail')
).group_by(StockBuy.base_id).subquery()
semi_sub = db.session.query(
StockSemi.base_id,
func.sum(StockSemi.stock_quantity).label('semi_inv'),
func.sum(StockSemi.available_quantity).label('semi_avail')
).group_by(StockSemi.base_id).subquery()
prod_sub = db.session.query(
StockProduct.base_id,
func.sum(StockProduct.stock_quantity).label('prod_inv'),
func.sum(StockProduct.available_quantity).label('prod_avail')
).group_by(StockProduct.base_id).subquery()
# 总库存和可用数的 SQL 表达式
total_inv = func.coalesce(buy_sub.c.buy_inv, 0) + \
func.coalesce(semi_sub.c.semi_inv, 0) + \
func.coalesce(prod_sub.c.prod_inv, 0)
total_avail = func.coalesce(buy_sub.c.buy_avail, 0) + \
func.coalesce(semi_sub.c.semi_avail, 0) + \
func.coalesce(prod_sub.c.prod_avail, 0)
# 导入预警设置模型
from app.models.base import MaterialWarningSetting
# 主查询,关联聚合子查询和预警设置
query = db.session.query(
MaterialBase,
total_inv.label('total_inv'),
total_avail.label('total_avail'),
MaterialWarningSetting.is_enabled.label('warning_enabled'),
MaterialWarningSetting.yellow_threshold.label('warning_yellow'),
MaterialWarningSetting.red_threshold.label('warning_red')
).outerjoin(buy_sub, MaterialBase.id == buy_sub.c.base_id) \
.outerjoin(semi_sub, MaterialBase.id == semi_sub.c.base_id) \
.outerjoin(prod_sub, MaterialBase.id == prod_sub.c.base_id) \
.outerjoin(MaterialWarningSetting, MaterialBase.id == MaterialWarningSetting.base_id)
if filters:
# 1. 关键词精准搜索(支持指定字段)
search_field = filters.get('searchField', 'all')
keyword = filters.get('keyword')
if keyword:
kw = f"%{keyword}%"
if search_field == 'name':
query = query.filter(MaterialBase.name.ilike(kw))
elif search_field == 'common_name':
query = query.filter(MaterialBase.common_name.ilike(kw))
elif search_field == 'spec':
query = query.filter(MaterialBase.spec_model.ilike(kw))
else: # 'all' 默认全局模糊匹配
query = query.filter(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
))
# 2. 精确筛选
company = filters.get('company')
if company is not None and company != '':
query = query.filter(MaterialBase.company_name.ilike(company.strip()))
category = filters.get('category')
if category is not None and category != '':
query = query.filter(MaterialBase.category.ilike(category.strip()))
type_val = filters.get('type')
if type_val is not None and type_val != '':
query = query.filter(MaterialBase.material_type.ilike(type_val.strip()))
# 【核心修改】:增强布尔值解析
if filters.get('isEnabled') is not None:
val_str = str(filters['isEnabled']).lower()
is_active = val_str in ['1', 'true', 'yes', 't']
# 必须使用 filter() 而非 filter_by(),因为 query 是 join 后的复杂查询
query = query.filter(MaterialBase.is_enabled == is_active)
# 【新增】:库存状态筛选 (has_stock)
has_stock = filters.get('has_stock')
if has_stock and str(has_stock).lower() in ['true', '1', 'yes']:
query = query.filter(total_inv > 0)
# 3. 高级动态筛选
advanced_filters = filters.get('advancedFilters', [])
if advanced_filters:
allowed_fields = {
'companyName': 'company_name',
'name': 'name',
'commonName': 'common_name',
'category': 'category',
'type': 'material_type',
'spec': 'spec_model',
'unit': 'unit',
'inventoryCount': total_inv,
'availableCount': total_avail
}
# 字段到权限码的映射
field_permission_map = {
'companyName': 'material_list:companyName',
'name': 'material_list:name',
'commonName': 'material_list:commonName',
'category': 'material_list:category',
'type': 'material_list:type',
'spec': 'material_list:spec',
'unit': 'material_list:unit',
'inventoryCount': 'material_list:inventoryCount',
'availableCount': 'material_list:availableCount'
}
filter_conditions = []
for condition in advanced_filters:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not field or not operator or value is None:
continue
db_field = allowed_fields.get(field)
if not db_field:
continue
# 权限校验
if user_permissions is not None:
perm_code = field_permission_map.get(field)
if 'material_list:*' in user_permissions:
# 超级管理员拥有全部权限
pass
elif perm_code and perm_code not in user_permissions:
# 无权限,跳过该条件
continue
# 对于聚合字段 (inventoryCount, availableCount),需要使用子查询别名
if isinstance(db_field, type(total_inv)):
column = db_field
else:
column = getattr(MaterialBase, db_field, None)
if column is None:
continue
# 处理操作符
# 对于数值型列(聚合字段)只支持 eq, ne, ge, le
if isinstance(column, type(total_inv)):
# 数值型列
try:
num_val = float(value)
except ValueError:
# 转换失败则跳过该条件
continue
if operator == 'eq':
filter_conditions.append(column == num_val)
elif operator == 'ne':
filter_conditions.append(column != num_val)
elif operator == 'ge':
filter_conditions.append(column >= num_val)
elif operator == 'le':
filter_conditions.append(column <= num_val)
# 对于 contains 操作符,数值型列不支持,忽略
else:
# 字符串型列
if operator == 'eq':
filter_conditions.append(column == value)
elif operator == 'ne':
filter_conditions.append(column != value)
elif operator == 'contains':
filter_conditions.append(column.ilike(f'%{value}%'))
elif operator == 'ge':
try:
num_val = float(value)
filter_conditions.append(column >= num_val)
except ValueError:
continue
elif operator == 'le':
try:
num_val = float(value)
filter_conditions.append(column <= num_val)
except ValueError:
continue
if filter_conditions:
query = query.filter(and_(*filter_conditions))
# 排序处理(支持全字段)
order_by_column = filters.get('orderByColumn', '')
is_asc = filters.get('isAsc', None)
# 检查是否启用了预警智能排序
enable_warning_sort = has_warning_permission and filters.get('enableWarningSort', False)
if enable_warning_sort:
# 预警智能排序:先按预警状态排序,再按缺口/余量排序
# 使用 CASE 表达式计算预警状态
# 状态: 2=红(库存<=red), 1=黄(red<库存<=yellow), 0=正常/未开启
from sqlalchemy import case
# 计算预警状态
warning_status = case(
(
(MaterialWarningSetting.is_enabled == True) &
(total_inv <= MaterialWarningSetting.red_threshold),
2 # 红色预警
),
(
(MaterialWarningSetting.is_enabled == True) &
(total_inv > MaterialWarningSetting.red_threshold) &
(total_inv <= MaterialWarningSetting.yellow_threshold),
1 # 黄色预警
),
else_=0 # 正常或未开启
).label('warning_status')
# 红色组内部:按缺口降序 (red_threshold - inventory)
red_gap = case(
(
(MaterialWarningSetting.is_enabled == True) &
(total_inv <= MaterialWarningSetting.red_threshold),
MaterialWarningSetting.red_threshold - total_inv
),
else_=0
).label('red_gap')
# 黄色组内部:按余量升序 (inventory - red_threshold),离红线越近越靠前
yellow_gap = case(
(
(MaterialWarningSetting.is_enabled == True) &
(total_inv > MaterialWarningSetting.red_threshold) &
(total_inv <= MaterialWarningSetting.yellow_threshold),
total_inv - MaterialWarningSetting.red_threshold
),
else_=999999 # 正常物料放在最后
).label('yellow_gap')
# 添加排序字段到查询
query = query.add_columns(warning_status, red_gap, yellow_gap)
# 强制排序规则:预警状态降序 -> 红色缺口降序 -> 黄色余量升序 -> 规格型号升序
query = query.order_by(
warning_status.desc(),
red_gap.desc(),
yellow_gap.asc(),
MaterialBase.spec_model.asc()
)
elif order_by_column:
# 字段映射
sort_field_map = {
'companyName': MaterialBase.company_name,
'name': MaterialBase.name,
'commonName': MaterialBase.common_name,
'category': MaterialBase.category,
'type': MaterialBase.material_type,
'spec': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'inventoryCount': total_inv,
'availableCount': total_avail
}
sort_column = sort_field_map.get(order_by_column)
if sort_column is not None:
if is_asc == 'asc':
query = query.order_by(sort_column.asc())
elif is_asc == 'desc':
query = query.order_by(sort_column.desc())
else:
# 默认排序:优先按总库存数降序,当库存相同时,再按规格型号升序
query = query.order_by(total_inv.desc(), MaterialBase.spec_model.asc())
# 分页
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items_list = []
for row in pagination.items:
# 防弹解包逻辑:直接判断自身是否有 to_dict 方法
if hasattr(row, 'to_dict'):
# 说明查询只返回了 MaterialBase 单一对象
item = row
inv = avail = warning_enabled = warning_yellow = warning_red = None
else:
# 说明返回了 Row 对象 (包含多个字段)
item = row[0]
inv = row[1] if len(row) > 1 else None
avail = row[2] if len(row) > 2 else None
warning_enabled = row[3] if len(row) > 3 else False
warning_yellow = row[4] if len(row) > 4 else 0
warning_red = row[5] if len(row) > 5 else 0
# 安全兜底
if not hasattr(item, 'to_dict'):
continue
item_dict = item.to_dict()
item_dict['inventoryCount'] = float(inv) if inv is not None else 0
item_dict['availableCount'] = float(avail) if avail is not None else 0
# 处理预警信息(仅当用户有权限时)
if has_warning_permission:
item_dict['warningEnabled'] = bool(warning_enabled) if warning_enabled is not None else False
item_dict['warningYellow'] = float(warning_yellow) if warning_yellow is not None else None
item_dict['warningRed'] = float(warning_red) if warning_red is not None else None
# 计算预警状态
if warning_enabled and warning_red is not None:
invQty = item_dict['inventoryCount']
if invQty <= warning_red:
item_dict['warningStatus'] = 2 # 红色
elif warning_yellow is not None and invQty <= warning_yellow:
item_dict['warningStatus'] = 1 # 黄色
else:
item_dict['warningStatus'] = 0 # 正常
else:
item_dict['warningStatus'] = 0 # 未开启或未设置
items_list.append(item_dict)
return {"total": pagination.total, "items": items_list}
except Exception as e:
traceback.print_exc()
print(f"查询基础信息列表失败: {e}")
return {"total": 0, "items": []}
@staticmethod
def get_distinct_options():
"""
获取所有已存在的类别、类型、公司 (去重且排序)
"""
try:
# 1. 类别 (获取后在内存或前端做层级处理,这里先按字母序返回扁平列表)
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
# 对类别进行排序
sorted_categories = sorted([c[0] for c in categories])
# 2. 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([t[0] for t in types])
# 3. 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([c[0] for c in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception as e:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
@staticmethod
def create_material(data):
"""新增基础信息"""
try:
if not data.get('name') or not data.get('spec'):
raise ValueError("名称和规格型号不能为空")
exist = MaterialBase.query.filter_by(
name=data['name'],
spec_model=data['spec']
).first()
if exist:
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
# 【核心修改】:兼容前端传来的布尔值
raw_enabled = data.get('isEnabled', True)
is_enabled_val = str(raw_enabled).lower() in ['1', 'true', 'yes', 't'] if raw_enabled is not None else True
new_material = MaterialBase(
company_name=data.get('companyName'),
name=data['name'],
common_name=data.get('commonName'),
spec_model=data['spec'],
category=data.get('category'),
material_type=data.get('type'),
unit=data.get('unit'),
visibility_level=data.get('visibilityLevel'),
manual_link=json.dumps(data.get('generalManual', [])),
product_image=json.dumps(data.get('generalImage', [])),
is_enabled=is_enabled_val
)
db.session.add(new_material)
db.session.commit()
return new_material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_material(m_id, data):
"""修改基础信息"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
# 更新字段
if 'companyName' in data: material.company_name = data['companyName']
if 'name' in data: material.name = data['name']
if 'commonName' in data: material.common_name = data['commonName']
if 'spec' in data: material.spec_model = data['spec']
if 'category' in data: material.category = data['category']
if 'type' in data: material.material_type = data['type']
if 'unit' in data: material.unit = data['unit']
if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel']
if 'generalManual' in data:
material.manual_link = json.dumps(data['generalManual'])
if 'generalImage' in data:
material.product_image = json.dumps(data['generalImage'])
# 【核心修改】:兼容前端传来的布尔值
if 'isEnabled' in data:
raw_enabled = data['isEnabled']
material.is_enabled = str(raw_enabled).lower() in ['1', 'true', 'yes', 't']
db.session.commit()
return material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_material(m_id):
"""
删除基础信息 (带依赖检查)
"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
prod_usage_count = StockProduct.query.filter_by(base_id=m_id).count()
total_usage = buy_usage_count + semi_usage_count + prod_usage_count
if total_usage > 0:
raise ValueError(
f"无法删除:该基础物料正被使用中。\n"
f"- 采购库存记录: {buy_usage_count}\n"
f"- 半成品库存记录: {semi_usage_count}\n"
f"- 成品库存记录: {prod_usage_count}\n"
f"请先清理相关库存或仅‘禁用’此条目。"
)
db.session.delete(material)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"删除基础信息失败: {e}")
raise e
# ==============================================================================
# [核心修改] 统一资产统计导出(增加最高单价计算逻辑)
# ==============================================================================
@staticmethod
def export_excel(filters=None, user_permissions=None):
"""
全口径资产统计报表:
根据基础信息列表和库存表,计算出每个物料的最高历史单价并进行导出
"""
try:
# 1. 构造基础信息的筛选条件 (用于过滤库存)
filter_conditions = []
if filters:
if filters.get('keyword'):
kw = f"%{filters['keyword']}%"
filter_conditions.append(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw)
))
company = filters.get('company')
if company is not None and company != '':
filter_conditions.append(MaterialBase.company_name.ilike(company.strip()))
category = filters.get('category')
if category is not None and category != '':
filter_conditions.append(MaterialBase.category.ilike(category.strip()))
type_val = filters.get('type')
if type_val is not None and type_val != '':
filter_conditions.append(MaterialBase.material_type.ilike(type_val.strip()))
# 【核心修改】:增强布尔值解析
if filters.get('isEnabled') is not None:
val_str = str(filters['isEnabled']).lower()
is_active = val_str in ['1', 'true', 'yes', 't']
filter_conditions.append(MaterialBase.is_enabled == is_active)
# 2. 分别查询三个库存表,并 Join MaterialBase 进行筛选
# 2.1 采购库存 (StockBuy)
query_buy = db.session.query(StockBuy, MaterialBase).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_buy = query_buy.filter(cond)
list_buy = query_buy.all()
# 2.2 半成品库存 (StockSemi)
query_semi = db.session.query(StockSemi, MaterialBase).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_semi = query_semi.filter(cond)
list_semi = query_semi.all()
# 2.3 成品库存 (StockProduct)
query_product = db.session.query(StockProduct, MaterialBase).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
)
for cond in filter_conditions:
query_product = query_product.filter(cond)
list_product = query_product.all()
# ====================================================
# [核心新增] 预先计算每个 base_id 的全局最高历史单价
# 优先级:采购件 > 半成品 > 成品
# ====================================================
buy_max_prices = {}
for stock, base in list_buy:
price = float(stock.pre_tax_unit_price or 0)
if price > buy_max_prices.get(base.id, 0):
buy_max_prices[base.id] = price
semi_max_prices = {}
for stock, base in list_semi:
# 半成品的单价直接取自 manual_cost 字段(单件总成本)
price = float(stock.manual_cost or 0)
if price > semi_max_prices.get(base.id, 0):
semi_max_prices[base.id] = price
product_max_prices = {}
for stock, base in list_product:
# 成品的单价直接取自 manual_cost 字段(单件总成本)
price = float(stock.manual_cost or 0)
if price > product_max_prices.get(base.id, 0):
product_max_prices[base.id] = price
# 构造获取某个物料最高价的闭包函数
def get_highest_price(base_id):
if base_id in buy_max_prices and buy_max_prices[base_id] > 0:
return buy_max_prices[base_id]
if base_id in semi_max_prices and semi_max_prices[base_id] > 0:
return semi_max_prices[base_id]
if base_id in product_max_prices and product_max_prices[base_id] > 0:
return product_max_prices[base_id]
return 0.0
# 3. 数据整合
all_rows = []
# 处理采购件
for stock, base in list_buy:
qty = float(stock.stock_quantity or 0)
# 使用该物料的全局最高单价作为不含税单价
highest_excl_price = get_highest_price(base.id)
tax_rate = float(stock.tax_rate or 0)
# 计算含税单价和总额
highest_incl_price = highest_excl_price * (1 + tax_rate / 100.0)
total_val_excl = qty * highest_excl_price
total_val_incl = qty * highest_incl_price
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "采购件",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.supplier_name,
"date": stock.in_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": highest_excl_price,
"total_val_excl": total_val_excl,
"tax": tax_rate,
"price_incl": highest_incl_price,
"total_val": total_val_incl
})
# 处理半成品
for stock, base in list_semi:
qty = float(stock.stock_quantity or 0)
# 半成品的单价直接取自 manual_cost 字段(单件总成本)
unit_cost = float(stock.manual_cost or 0)
total_val_excl = qty * unit_cost
total_val_incl = qty * unit_cost # 半成品无税
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "半成品",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.production_manager,
"date": stock.production_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": unit_cost,
"total_val_excl": total_val_excl,
"tax": 0.0,
"price_incl": unit_cost,
"total_val": total_val_incl
})
# 处理成品
for stock, base in list_product:
qty = float(stock.stock_quantity or 0)
# 成品的单价直接取自 manual_cost 字段(单件总成本)
unit_cost = float(stock.manual_cost or 0)
total_val_excl = qty * unit_cost
total_val_incl = qty * unit_cost
ident = stock.serial_number or stock.barcode or stock.sku
all_rows.append({
"base": base,
"type_name": "成品",
"ident": ident,
"loc": stock.warehouse_location,
"source": stock.production_manager,
"date": stock.production_date,
"qty": qty,
"avail": float(stock.available_quantity or 0),
"price_excl": unit_cost,
"total_val_excl": total_val_excl,
"tax": 0.0,
"price_incl": unit_cost,
"total_val": total_val_incl
})
# 4. 排序:按公司 -> 规格型号 -> 基础ID -> 批号 排序
all_rows.sort(key=lambda x: (
x['base'].company_name or "",
x['base'].spec_model or "",
x['base'].id,
x['ident'] or ""
))
# 5. 生成 Excel
wb = Workbook()
ws = wb.active
ws.title = "库存统计"
# 表头 (严格对应你的图 5)
headers = [
"所属公司", "资产名称", "规格型号", "物料类型",
"类别一级", "类别二级", "类别三级", "类别四级", "类别五级",
"计量单位",
"库存性质", "唯一标识码 (批号/SN)", "仓库位置",
"资产来源", "入库/生产日期",
"库存数量", "可用数量",
"单价/成本 (不含税)", "资产总额 (不含税)", "税率 (%)", "单价/成本 (含税)", "资产总额 (含税)"
]
ws.append(headers)
# 确定各字段在表头中的列索引
col_idx = {}
for idx, header in enumerate(headers):
if header == "所属公司":
col_idx['companyName'] = idx
elif header == "资产名称":
col_idx['name'] = idx
elif header == "规格型号":
col_idx['spec'] = idx
elif header == "物料类型":
col_idx['type'] = idx
elif header in ("类别一级", "类别二级", "类别三级", "类别四级", "类别五级"):
col_idx.setdefault('category_cols', []).append(idx)
elif header == "计量单位":
col_idx['unit'] = idx
elif header == "库存数量":
col_idx['inventoryCount'] = idx
elif header == "可用数量":
col_idx['availableCount'] = idx
elif header == "单价/成本 (不含税)":
col_idx['price_excl'] = idx
elif header == "资产总额 (不含税)":
col_idx['total_val_excl'] = idx
elif header == "税率 (%)":
col_idx['tax'] = idx
elif header == "单价/成本 (含税)":
col_idx['price_incl'] = idx
elif header == "资产总额 (含税)":
col_idx['total_val'] = idx
# 样式
header_fill = PatternFill(start_color="D7E4BC", end_color="D7E4BC", fill_type="solid")
border_style = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'),
bottom=Side(style='thin'))
for cell in ws[1]:
cell.font = Font(bold=True, name='微软雅黑')
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.fill = header_fill
cell.border = border_style
# 字段到权限码的映射
field_to_perm = {
'companyName': 'material_list:companyName',
'name': 'material_list:name',
'spec': 'material_list:spec',
'type': 'material_list:type',
'unit': 'material_list:unit',
'category': 'material_list:category',
'inventoryCount': 'material_list:inventoryCount',
'availableCount': 'material_list:availableCount'
}
# 写入数据,并脱敏
for r in all_rows:
base = r['base']
# 类别拆分
cat_parts = (base.category or "").split('/')
while len(cat_parts) < 5:
cat_parts.append("")
# 日期格式化
date_str = r['date'].strftime('%Y-%m-%d') if isinstance(r['date'], datetime.date) else ""
row_val = [
base.company_name,
base.name,
base.spec_model,
base.material_type,
cat_parts[0], cat_parts[1], cat_parts[2], cat_parts[3], cat_parts[4],
base.unit,
r['type_name'],
r['ident'],
r['loc'],
r['source'],
date_str,
r['qty'],
r['avail'],
r['price_excl'],
r['total_val_excl'],
r['tax'],
r['price_incl'],
r['total_val']
]
# 根据用户权限脱敏
if user_permissions is not None:
for field, perm_code in field_to_perm.items():
if perm_code not in user_permissions:
if field == 'category':
for cat_idx in col_idx.get('category_cols', []):
row_val[cat_idx] = ''
elif field in col_idx:
row_val[col_idx[field]] = ''
# 联动脱敏:根据数据来源,校验对应模块的价格/成本权限
if user_permissions is not None:
# 超级管理员拥有所有权限,跳过价格脱敏
if 'material_list:*' in user_permissions:
# 拥有通配符权限,不隐藏价格列
pass
else:
has_price_perm = True
row_type = r['type_name']
# 根据数据来源检查对应模块的权限
if row_type == '采购件':
# 校验采购模块的价格权限
has_price_perm = any(p in user_permissions for p in
['inbound_buy:postTaxUnitPrice', 'inbound_buy:preTaxUnitPrice',
'inbound_buy:totalAmount'])
elif row_type == '半成品':
# 校验半成品模块的成本权限
has_price_perm = any(p in user_permissions for p in
['inbound_semi:rawMaterialCost', 'inbound_semi:manualCost'])
elif row_type == '成品':
# 校验成品模块的成本权限
has_price_perm = any(p in user_permissions for p in
['inbound_product:rawMaterialCost', 'inbound_product:manualCost'])
else:
# 未知类型,默认隐藏价格列
has_price_perm = False
# 如果没有对应模块的价格查看权限则清空涉密的5个列
if not has_price_perm:
for p_col in ['price_excl', 'total_val_excl', 'tax', 'price_incl', 'total_val']:
if p_col in col_idx:
row_val[col_idx[p_col]] = ''
ws.append(row_val)
# 列宽调整
dims = {}
for row in ws.rows:
for cell in row:
if cell.value:
dims[cell.column_letter] = max((dims.get(cell.column_letter, 0), len(str(cell.value))))
for col, value in dims.items():
ws.column_dimensions[col].width = min(value + 2, 30)
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output
except Exception as e:
traceback.print_exc()
raise e