821 lines
36 KiB
Python
821 lines
36 KiB
Python
# 文件路径: app/services/inbound/base_service.py
|
||
|
||
from app.extensions import db
|
||
from app.models.base import MaterialBase
|
||
from app.models.inbound.buy import StockBuy
|
||
from app.models.inbound.semi import StockSemi
|
||
from app.models.inbound.product import StockProduct
|
||
# from app.models.inbound.service import StockService
|
||
from sqlalchemy import or_, and_, func
|
||
import traceback
|
||
import json
|
||
import io
|
||
import datetime
|
||
# 需要 pip install openpyxl
|
||
from openpyxl import Workbook
|
||
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
|
||
from collections import defaultdict
|
||
|
||
|
||
class MaterialBaseService:
|
||
"""
|
||
基础物料服务层
|
||
负责处理 MaterialBase 的增删改查及搜索逻辑
|
||
"""
|
||
|
||
@staticmethod
|
||
def search_material(keyword):
|
||
"""
|
||
根据关键字搜索已启用的基础物料
|
||
(供 /api/v1/inbound/base/search 接口调用)
|
||
"""
|
||
try:
|
||
if not keyword:
|
||
return []
|
||
|
||
query = MaterialBase.query.filter(
|
||
MaterialBase.is_enabled == True,
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.common_name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%'),
|
||
# 支持搜索公司名
|
||
MaterialBase.company_name.ilike(f'%{keyword}%')
|
||
)
|
||
)
|
||
|
||
# [修改1] 增加返回数量限制
|
||
# 原为 limit(20),现改为 1000,确保前端能获取所有(或足够多)的数据
|
||
query = query.limit(1000)
|
||
|
||
# 获取查询结果对象列表
|
||
db_items = query.all()
|
||
|
||
# [修改2] 规格型号排序逻辑
|
||
# 要求:只考虑 '/' 前面的内容进行排序
|
||
# 使用 Python 的 sort 方法,提取 spec_model 中 '/' 前的部分
|
||
def get_sort_key(item):
|
||
if not item.spec_model:
|
||
return ""
|
||
# 如果包含 '/',取前半部分;否则取整个字符串
|
||
parts = item.spec_model.split('/')
|
||
return parts[0] if len(parts) > 0 else item.spec_model
|
||
|
||
# 执行排序
|
||
db_items.sort(key=get_sort_key)
|
||
|
||
results = []
|
||
for item in db_items:
|
||
results.append({
|
||
'id': item.id, # 必须保留ID供前端逻辑使用,视觉上的隐藏请在前端处理
|
||
'companyName': item.company_name,
|
||
'name': item.name,
|
||
'commonName': item.common_name,
|
||
'spec': item.spec_model,
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type,
|
||
'status': '启用'
|
||
})
|
||
return results
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
@staticmethod
|
||
def _get_stock_counts(stock_query):
|
||
"""
|
||
辅助函数:安全计算库存列表的总数量
|
||
"""
|
||
total_inv = 0
|
||
total_avail = 0
|
||
|
||
try:
|
||
items = list(stock_query) # 触发查询
|
||
except:
|
||
items = []
|
||
|
||
for x in items:
|
||
# 1. 获取库存数 (兼容不同字段名)
|
||
q = getattr(x, 'stock_quantity', getattr(x, 'in_quantity', 0)) # 优先取库存,其次入库
|
||
# 2. 获取可用数
|
||
a = getattr(x, 'available_quantity', q)
|
||
|
||
try:
|
||
total_inv += float(q if q is not None else 0)
|
||
total_avail += float(a if a is not None else 0)
|
||
except:
|
||
pass
|
||
|
||
return total_inv, total_avail
|
||
|
||
@staticmethod
|
||
def get_list(page, limit, filters=None, user_permissions=None):
|
||
"""
|
||
获取基础信息列表 (带分页、高级筛选和全字段排序)
|
||
"""
|
||
try:
|
||
# 构建聚合子查询
|
||
buy_sub = db.session.query(
|
||
StockBuy.base_id,
|
||
func.sum(StockBuy.stock_quantity).label('buy_inv'),
|
||
func.sum(StockBuy.available_quantity).label('buy_avail')
|
||
).group_by(StockBuy.base_id).subquery()
|
||
|
||
semi_sub = db.session.query(
|
||
StockSemi.base_id,
|
||
func.sum(StockSemi.stock_quantity).label('semi_inv'),
|
||
func.sum(StockSemi.available_quantity).label('semi_avail')
|
||
).group_by(StockSemi.base_id).subquery()
|
||
|
||
prod_sub = db.session.query(
|
||
StockProduct.base_id,
|
||
func.sum(StockProduct.stock_quantity).label('prod_inv'),
|
||
func.sum(StockProduct.available_quantity).label('prod_avail')
|
||
).group_by(StockProduct.base_id).subquery()
|
||
|
||
# 总库存和可用数的 SQL 表达式
|
||
total_inv = func.coalesce(buy_sub.c.buy_inv, 0) + \
|
||
func.coalesce(semi_sub.c.semi_inv, 0) + \
|
||
func.coalesce(prod_sub.c.prod_inv, 0)
|
||
total_avail = func.coalesce(buy_sub.c.buy_avail, 0) + \
|
||
func.coalesce(semi_sub.c.semi_avail, 0) + \
|
||
func.coalesce(prod_sub.c.prod_avail, 0)
|
||
|
||
# 主查询,关联聚合子查询
|
||
query = db.session.query(
|
||
MaterialBase,
|
||
total_inv.label('total_inv'),
|
||
total_avail.label('total_avail')
|
||
).outerjoin(buy_sub, MaterialBase.id == buy_sub.c.base_id) \
|
||
.outerjoin(semi_sub, MaterialBase.id == semi_sub.c.base_id) \
|
||
.outerjoin(prod_sub, MaterialBase.id == prod_sub.c.base_id)
|
||
|
||
if filters:
|
||
# 1. 关键词模糊搜索
|
||
if filters.get('keyword'):
|
||
kw = f"%{filters['keyword']}%"
|
||
query = query.filter(or_(
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.common_name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw)
|
||
))
|
||
|
||
# 2. 精确筛选
|
||
company = filters.get('company')
|
||
if company is not None and company != '':
|
||
query = query.filter(MaterialBase.company_name.ilike(company.strip()))
|
||
|
||
category = filters.get('category')
|
||
if category is not None and category != '':
|
||
query = query.filter(MaterialBase.category.ilike(category.strip()))
|
||
|
||
type_val = filters.get('type')
|
||
if type_val is not None and type_val != '':
|
||
query = query.filter(MaterialBase.material_type.ilike(type_val.strip()))
|
||
|
||
# 【核心修改】:增强布尔值解析
|
||
if filters.get('isEnabled') is not None:
|
||
val_str = str(filters['isEnabled']).lower()
|
||
is_active = val_str in ['1', 'true', 'yes', 't']
|
||
# 必须使用 filter() 而非 filter_by(),因为 query 是 join 后的复杂查询
|
||
query = query.filter(MaterialBase.is_enabled == is_active)
|
||
|
||
# 3. 高级动态筛选
|
||
advanced_filters = filters.get('advancedFilters', [])
|
||
if advanced_filters:
|
||
allowed_fields = {
|
||
'companyName': 'company_name',
|
||
'name': 'name',
|
||
'commonName': 'common_name',
|
||
'category': 'category',
|
||
'type': 'material_type',
|
||
'spec': 'spec_model',
|
||
'unit': 'unit',
|
||
'inventoryCount': total_inv,
|
||
'availableCount': total_avail
|
||
}
|
||
# 字段到权限码的映射
|
||
field_permission_map = {
|
||
'companyName': 'material_list:companyName',
|
||
'name': 'material_list:name',
|
||
'commonName': 'material_list:commonName',
|
||
'category': 'material_list:category',
|
||
'type': 'material_list:type',
|
||
'spec': 'material_list:spec',
|
||
'unit': 'material_list:unit',
|
||
'inventoryCount': 'material_list:inventoryCount',
|
||
'availableCount': 'material_list:availableCount'
|
||
}
|
||
filter_conditions = []
|
||
for condition in advanced_filters:
|
||
field = condition.get('field')
|
||
operator = condition.get('operator')
|
||
value = condition.get('value')
|
||
if not field or not operator or value is None:
|
||
continue
|
||
db_field = allowed_fields.get(field)
|
||
if not db_field:
|
||
continue
|
||
# 权限校验
|
||
if user_permissions is not None:
|
||
perm_code = field_permission_map.get(field)
|
||
if 'material_list:*' in user_permissions:
|
||
# 超级管理员拥有全部权限
|
||
pass
|
||
elif perm_code and perm_code not in user_permissions:
|
||
# 无权限,跳过该条件
|
||
continue
|
||
# 对于聚合字段 (inventoryCount, availableCount),需要使用子查询别名
|
||
if isinstance(db_field, type(total_inv)):
|
||
column = db_field
|
||
else:
|
||
column = getattr(MaterialBase, db_field, None)
|
||
if column is None:
|
||
continue
|
||
# 处理操作符
|
||
# 对于数值型列(聚合字段)只支持 eq, ne, ge, le
|
||
if isinstance(column, type(total_inv)):
|
||
# 数值型列
|
||
try:
|
||
num_val = float(value)
|
||
except ValueError:
|
||
# 转换失败则跳过该条件
|
||
continue
|
||
if operator == 'eq':
|
||
filter_conditions.append(column == num_val)
|
||
elif operator == 'ne':
|
||
filter_conditions.append(column != num_val)
|
||
elif operator == 'ge':
|
||
filter_conditions.append(column >= num_val)
|
||
elif operator == 'le':
|
||
filter_conditions.append(column <= num_val)
|
||
# 对于 contains 操作符,数值型列不支持,忽略
|
||
else:
|
||
# 字符串型列
|
||
if operator == 'eq':
|
||
filter_conditions.append(column == value)
|
||
elif operator == 'ne':
|
||
filter_conditions.append(column != value)
|
||
elif operator == 'contains':
|
||
filter_conditions.append(column.ilike(f'%{value}%'))
|
||
elif operator == 'ge':
|
||
try:
|
||
num_val = float(value)
|
||
filter_conditions.append(column >= num_val)
|
||
except ValueError:
|
||
continue
|
||
elif operator == 'le':
|
||
try:
|
||
num_val = float(value)
|
||
filter_conditions.append(column <= num_val)
|
||
except ValueError:
|
||
continue
|
||
if filter_conditions:
|
||
query = query.filter(and_(*filter_conditions))
|
||
|
||
# 排序处理(支持全字段)
|
||
order_by_column = filters.get('orderByColumn', '')
|
||
is_asc = filters.get('isAsc', None)
|
||
if order_by_column:
|
||
# 字段映射
|
||
sort_field_map = {
|
||
'companyName': MaterialBase.company_name,
|
||
'name': MaterialBase.name,
|
||
'commonName': MaterialBase.common_name,
|
||
'category': MaterialBase.category,
|
||
'type': MaterialBase.material_type,
|
||
'spec': MaterialBase.spec_model,
|
||
'unit': MaterialBase.unit,
|
||
'inventoryCount': total_inv,
|
||
'availableCount': total_avail
|
||
}
|
||
sort_column = sort_field_map.get(order_by_column)
|
||
if sort_column is not None:
|
||
if is_asc == 'asc':
|
||
query = query.order_by(sort_column.asc())
|
||
elif is_asc == 'desc':
|
||
query = query.order_by(sort_column.desc())
|
||
else:
|
||
# 默认排序:优先按总库存数降序,当库存相同时,再按规格型号升序
|
||
query = query.order_by(total_inv.desc(), MaterialBase.spec_model.asc())
|
||
|
||
# 分页
|
||
pagination = query.paginate(page=page, per_page=limit, error_out=False)
|
||
|
||
items_list = []
|
||
for item, inv, avail in pagination.items:
|
||
item_dict = item.to_dict()
|
||
item_dict['inventoryCount'] = float(inv) if inv is not None else 0.0
|
||
item_dict['availableCount'] = float(avail) if avail is not None else 0.0
|
||
items_list.append(item_dict)
|
||
|
||
return {"total": pagination.total, "items": items_list}
|
||
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
print(f"查询基础信息列表失败: {e}")
|
||
return {"total": 0, "items": []}
|
||
|
||
@staticmethod
|
||
def get_distinct_options():
|
||
"""
|
||
获取所有已存在的类别、类型、公司 (去重且排序)
|
||
"""
|
||
try:
|
||
# 1. 类别 (获取后在内存或前端做层级处理,这里先按字母序返回扁平列表)
|
||
categories = db.session.query(MaterialBase.category) \
|
||
.filter(MaterialBase.category != None, MaterialBase.category != '') \
|
||
.distinct().all()
|
||
|
||
# 对类别进行排序
|
||
sorted_categories = sorted([c[0] for c in categories])
|
||
|
||
# 2. 类型
|
||
types = db.session.query(MaterialBase.material_type) \
|
||
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
|
||
.distinct().all()
|
||
sorted_types = sorted([t[0] for t in types])
|
||
|
||
# 3. 公司
|
||
companies = db.session.query(MaterialBase.company_name) \
|
||
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
|
||
.distinct().all()
|
||
sorted_companies = sorted([c[0] for c in companies])
|
||
|
||
return {
|
||
"categories": sorted_categories,
|
||
"types": sorted_types,
|
||
"companies": sorted_companies
|
||
}
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return {"categories": [], "types": [], "companies": []}
|
||
|
||
@staticmethod
|
||
def create_material(data):
|
||
"""新增基础信息"""
|
||
try:
|
||
if not data.get('name') or not data.get('spec'):
|
||
raise ValueError("名称和规格型号不能为空")
|
||
|
||
exist = MaterialBase.query.filter_by(
|
||
name=data['name'],
|
||
spec_model=data['spec']
|
||
).first()
|
||
if exist:
|
||
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
|
||
|
||
# 【核心修改】:兼容前端传来的布尔值
|
||
raw_enabled = data.get('isEnabled', True)
|
||
is_enabled_val = str(raw_enabled).lower() in ['1', 'true', 'yes', 't'] if raw_enabled is not None else True
|
||
|
||
new_material = MaterialBase(
|
||
company_name=data.get('companyName'),
|
||
name=data['name'],
|
||
common_name=data.get('commonName'),
|
||
spec_model=data['spec'],
|
||
category=data.get('category'),
|
||
material_type=data.get('type'),
|
||
unit=data.get('unit'),
|
||
visibility_level=data.get('visibilityLevel'),
|
||
manual_link=json.dumps(data.get('generalManual', [])),
|
||
product_image=json.dumps(data.get('generalImage', [])),
|
||
is_enabled=is_enabled_val
|
||
)
|
||
|
||
db.session.add(new_material)
|
||
db.session.commit()
|
||
return new_material
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def update_material(m_id, data):
|
||
"""修改基础信息"""
|
||
try:
|
||
material = MaterialBase.query.get(m_id)
|
||
if not material:
|
||
raise ValueError("数据不存在")
|
||
|
||
# 更新字段
|
||
if 'companyName' in data: material.company_name = data['companyName']
|
||
if 'name' in data: material.name = data['name']
|
||
if 'commonName' in data: material.common_name = data['commonName']
|
||
if 'spec' in data: material.spec_model = data['spec']
|
||
if 'category' in data: material.category = data['category']
|
||
if 'type' in data: material.material_type = data['type']
|
||
if 'unit' in data: material.unit = data['unit']
|
||
if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel']
|
||
|
||
if 'generalManual' in data:
|
||
material.manual_link = json.dumps(data['generalManual'])
|
||
if 'generalImage' in data:
|
||
material.product_image = json.dumps(data['generalImage'])
|
||
|
||
# 【核心修改】:兼容前端传来的布尔值
|
||
if 'isEnabled' in data:
|
||
raw_enabled = data['isEnabled']
|
||
material.is_enabled = str(raw_enabled).lower() in ['1', 'true', 'yes', 't']
|
||
|
||
db.session.commit()
|
||
return material
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def delete_material(m_id):
|
||
"""
|
||
删除基础信息 (带依赖检查)
|
||
"""
|
||
try:
|
||
material = MaterialBase.query.get(m_id)
|
||
if not material:
|
||
raise ValueError("数据不存在")
|
||
|
||
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
|
||
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
|
||
prod_usage_count = StockProduct.query.filter_by(base_id=m_id).count()
|
||
|
||
total_usage = buy_usage_count + semi_usage_count + prod_usage_count
|
||
|
||
if total_usage > 0:
|
||
raise ValueError(
|
||
f"无法删除:该基础物料正被使用中。\n"
|
||
f"- 采购库存记录: {buy_usage_count} 条\n"
|
||
f"- 半成品库存记录: {semi_usage_count} 条\n"
|
||
f"- 成品库存记录: {prod_usage_count} 条\n"
|
||
f"请先清理相关库存或仅‘禁用’此条目。"
|
||
)
|
||
|
||
db.session.delete(material)
|
||
db.session.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print(f"删除基础信息失败: {e}")
|
||
raise e
|
||
|
||
# ==============================================================================
|
||
# [核心修改] 统一资产统计导出(增加最高单价计算逻辑)
|
||
# ==============================================================================
|
||
@staticmethod
|
||
def export_excel(filters=None, user_permissions=None):
|
||
"""
|
||
全口径资产统计报表:
|
||
根据基础信息列表和库存表,计算出每个物料的最高历史单价并进行导出
|
||
"""
|
||
try:
|
||
# 1. 构造基础信息的筛选条件 (用于过滤库存)
|
||
filter_conditions = []
|
||
if filters:
|
||
if filters.get('keyword'):
|
||
kw = f"%{filters['keyword']}%"
|
||
filter_conditions.append(or_(
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.common_name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw),
|
||
MaterialBase.company_name.ilike(kw)
|
||
))
|
||
company = filters.get('company')
|
||
if company is not None and company != '':
|
||
filter_conditions.append(MaterialBase.company_name.ilike(company.strip()))
|
||
category = filters.get('category')
|
||
if category is not None and category != '':
|
||
filter_conditions.append(MaterialBase.category.ilike(category.strip()))
|
||
type_val = filters.get('type')
|
||
if type_val is not None and type_val != '':
|
||
filter_conditions.append(MaterialBase.material_type.ilike(type_val.strip()))
|
||
|
||
# 【核心修改】:增强布尔值解析
|
||
if filters.get('isEnabled') is not None:
|
||
val_str = str(filters['isEnabled']).lower()
|
||
is_active = val_str in ['1', 'true', 'yes', 't']
|
||
filter_conditions.append(MaterialBase.is_enabled == is_active)
|
||
|
||
# 2. 分别查询三个库存表,并 Join MaterialBase 进行筛选
|
||
# 2.1 采购库存 (StockBuy)
|
||
query_buy = db.session.query(StockBuy, MaterialBase).join(
|
||
MaterialBase, StockBuy.base_id == MaterialBase.id
|
||
)
|
||
for cond in filter_conditions:
|
||
query_buy = query_buy.filter(cond)
|
||
list_buy = query_buy.all()
|
||
|
||
# 2.2 半成品库存 (StockSemi)
|
||
query_semi = db.session.query(StockSemi, MaterialBase).join(
|
||
MaterialBase, StockSemi.base_id == MaterialBase.id
|
||
)
|
||
for cond in filter_conditions:
|
||
query_semi = query_semi.filter(cond)
|
||
list_semi = query_semi.all()
|
||
|
||
# 2.3 成品库存 (StockProduct)
|
||
query_product = db.session.query(StockProduct, MaterialBase).join(
|
||
MaterialBase, StockProduct.base_id == MaterialBase.id
|
||
)
|
||
for cond in filter_conditions:
|
||
query_product = query_product.filter(cond)
|
||
list_product = query_product.all()
|
||
|
||
# ====================================================
|
||
# [核心新增] 预先计算每个 base_id 的全局最高历史单价
|
||
# 优先级:采购件 > 半成品 > 成品
|
||
# ====================================================
|
||
buy_max_prices = {}
|
||
for stock, base in list_buy:
|
||
price = float(stock.pre_tax_unit_price or 0)
|
||
if price > buy_max_prices.get(base.id, 0):
|
||
buy_max_prices[base.id] = price
|
||
|
||
semi_max_prices = {}
|
||
for stock, base in list_semi:
|
||
# 半成品的单价直接取自 manual_cost 字段(单件总成本)
|
||
price = float(stock.manual_cost or 0)
|
||
|
||
if price > semi_max_prices.get(base.id, 0):
|
||
semi_max_prices[base.id] = price
|
||
|
||
product_max_prices = {}
|
||
for stock, base in list_product:
|
||
# 成品的单价直接取自 manual_cost 字段(单件总成本)
|
||
price = float(stock.manual_cost or 0)
|
||
|
||
if price > product_max_prices.get(base.id, 0):
|
||
product_max_prices[base.id] = price
|
||
|
||
# 构造获取某个物料最高价的闭包函数
|
||
def get_highest_price(base_id):
|
||
if base_id in buy_max_prices and buy_max_prices[base_id] > 0:
|
||
return buy_max_prices[base_id]
|
||
if base_id in semi_max_prices and semi_max_prices[base_id] > 0:
|
||
return semi_max_prices[base_id]
|
||
if base_id in product_max_prices and product_max_prices[base_id] > 0:
|
||
return product_max_prices[base_id]
|
||
return 0.0
|
||
|
||
# 3. 数据整合
|
||
all_rows = []
|
||
|
||
# 处理采购件
|
||
for stock, base in list_buy:
|
||
qty = float(stock.stock_quantity or 0)
|
||
# 使用该物料的全局最高单价作为不含税单价
|
||
highest_excl_price = get_highest_price(base.id)
|
||
tax_rate = float(stock.tax_rate or 0)
|
||
|
||
# 计算含税单价和总额
|
||
highest_incl_price = highest_excl_price * (1 + tax_rate / 100.0)
|
||
total_val_excl = qty * highest_excl_price
|
||
total_val_incl = qty * highest_incl_price
|
||
|
||
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
|
||
|
||
all_rows.append({
|
||
"base": base,
|
||
"type_name": "采购件",
|
||
"ident": ident,
|
||
"loc": stock.warehouse_location,
|
||
"source": stock.supplier_name,
|
||
"date": stock.in_date,
|
||
"qty": qty,
|
||
"avail": float(stock.available_quantity or 0),
|
||
"price_excl": highest_excl_price,
|
||
"total_val_excl": total_val_excl,
|
||
"tax": tax_rate,
|
||
"price_incl": highest_incl_price,
|
||
"total_val": total_val_incl
|
||
})
|
||
|
||
# 处理半成品
|
||
for stock, base in list_semi:
|
||
qty = float(stock.stock_quantity or 0)
|
||
# 半成品的单价直接取自 manual_cost 字段(单件总成本)
|
||
unit_cost = float(stock.manual_cost or 0)
|
||
|
||
total_val_excl = qty * unit_cost
|
||
total_val_incl = qty * unit_cost # 半成品无税
|
||
|
||
ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku
|
||
|
||
all_rows.append({
|
||
"base": base,
|
||
"type_name": "半成品",
|
||
"ident": ident,
|
||
"loc": stock.warehouse_location,
|
||
"source": stock.production_manager,
|
||
"date": stock.production_date,
|
||
"qty": qty,
|
||
"avail": float(stock.available_quantity or 0),
|
||
"price_excl": unit_cost,
|
||
"total_val_excl": total_val_excl,
|
||
"tax": 0.0,
|
||
"price_incl": unit_cost,
|
||
"total_val": total_val_incl
|
||
})
|
||
|
||
# 处理成品
|
||
for stock, base in list_product:
|
||
qty = float(stock.stock_quantity or 0)
|
||
# 成品的单价直接取自 manual_cost 字段(单件总成本)
|
||
unit_cost = float(stock.manual_cost or 0)
|
||
|
||
total_val_excl = qty * unit_cost
|
||
total_val_incl = qty * unit_cost
|
||
|
||
ident = stock.serial_number or stock.barcode or stock.sku
|
||
|
||
all_rows.append({
|
||
"base": base,
|
||
"type_name": "成品",
|
||
"ident": ident,
|
||
"loc": stock.warehouse_location,
|
||
"source": stock.production_manager,
|
||
"date": stock.production_date,
|
||
"qty": qty,
|
||
"avail": float(stock.available_quantity or 0),
|
||
"price_excl": unit_cost,
|
||
"total_val_excl": total_val_excl,
|
||
"tax": 0.0,
|
||
"price_incl": unit_cost,
|
||
"total_val": total_val_incl
|
||
})
|
||
|
||
# 4. 排序:按公司 -> 规格型号 -> 基础ID -> 批号 排序
|
||
all_rows.sort(key=lambda x: (
|
||
x['base'].company_name or "",
|
||
x['base'].spec_model or "",
|
||
x['base'].id,
|
||
x['ident'] or ""
|
||
))
|
||
|
||
# 5. 生成 Excel
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = "库存统计"
|
||
|
||
# 表头 (严格对应你的图 5)
|
||
headers = [
|
||
"所属公司", "资产名称", "规格型号", "物料类型",
|
||
"类别一级", "类别二级", "类别三级", "类别四级", "类别五级",
|
||
"计量单位",
|
||
"库存性质", "唯一标识码 (批号/SN)", "仓库位置",
|
||
"资产来源", "入库/生产日期",
|
||
"库存数量", "可用数量",
|
||
"单价/成本 (不含税)", "资产总额 (不含税)", "税率 (%)", "单价/成本 (含税)", "资产总额 (含税)"
|
||
]
|
||
ws.append(headers)
|
||
|
||
# 确定各字段在表头中的列索引
|
||
col_idx = {}
|
||
for idx, header in enumerate(headers):
|
||
if header == "所属公司":
|
||
col_idx['companyName'] = idx
|
||
elif header == "资产名称":
|
||
col_idx['name'] = idx
|
||
elif header == "规格型号":
|
||
col_idx['spec'] = idx
|
||
elif header == "物料类型":
|
||
col_idx['type'] = idx
|
||
elif header in ("类别一级", "类别二级", "类别三级", "类别四级", "类别五级"):
|
||
col_idx.setdefault('category_cols', []).append(idx)
|
||
elif header == "计量单位":
|
||
col_idx['unit'] = idx
|
||
elif header == "库存数量":
|
||
col_idx['inventoryCount'] = idx
|
||
elif header == "可用数量":
|
||
col_idx['availableCount'] = idx
|
||
elif header == "单价/成本 (不含税)":
|
||
col_idx['price_excl'] = idx
|
||
elif header == "资产总额 (不含税)":
|
||
col_idx['total_val_excl'] = idx
|
||
elif header == "税率 (%)":
|
||
col_idx['tax'] = idx
|
||
elif header == "单价/成本 (含税)":
|
||
col_idx['price_incl'] = idx
|
||
elif header == "资产总额 (含税)":
|
||
col_idx['total_val'] = idx
|
||
|
||
# 样式
|
||
header_fill = PatternFill(start_color="D7E4BC", end_color="D7E4BC", fill_type="solid")
|
||
border_style = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'),
|
||
bottom=Side(style='thin'))
|
||
|
||
for cell in ws[1]:
|
||
cell.font = Font(bold=True, name='微软雅黑')
|
||
cell.alignment = Alignment(horizontal='center', vertical='center')
|
||
cell.fill = header_fill
|
||
cell.border = border_style
|
||
|
||
# 字段到权限码的映射
|
||
field_to_perm = {
|
||
'companyName': 'material_list:companyName',
|
||
'name': 'material_list:name',
|
||
'spec': 'material_list:spec',
|
||
'type': 'material_list:type',
|
||
'unit': 'material_list:unit',
|
||
'category': 'material_list:category',
|
||
'inventoryCount': 'material_list:inventoryCount',
|
||
'availableCount': 'material_list:availableCount'
|
||
}
|
||
|
||
# 写入数据,并脱敏
|
||
for r in all_rows:
|
||
base = r['base']
|
||
# 类别拆分
|
||
cat_parts = (base.category or "").split('/')
|
||
while len(cat_parts) < 5:
|
||
cat_parts.append("")
|
||
|
||
# 日期格式化
|
||
date_str = r['date'].strftime('%Y-%m-%d') if isinstance(r['date'], datetime.date) else ""
|
||
|
||
row_val = [
|
||
base.company_name,
|
||
base.name,
|
||
base.spec_model,
|
||
base.material_type,
|
||
cat_parts[0], cat_parts[1], cat_parts[2], cat_parts[3], cat_parts[4],
|
||
base.unit,
|
||
r['type_name'],
|
||
r['ident'],
|
||
r['loc'],
|
||
r['source'],
|
||
date_str,
|
||
r['qty'],
|
||
r['avail'],
|
||
r['price_excl'],
|
||
r['total_val_excl'],
|
||
r['tax'],
|
||
r['price_incl'],
|
||
r['total_val']
|
||
]
|
||
|
||
# 根据用户权限脱敏
|
||
if user_permissions is not None:
|
||
for field, perm_code in field_to_perm.items():
|
||
if perm_code not in user_permissions:
|
||
if field == 'category':
|
||
for cat_idx in col_idx.get('category_cols', []):
|
||
row_val[cat_idx] = ''
|
||
elif field in col_idx:
|
||
row_val[col_idx[field]] = ''
|
||
|
||
# 联动脱敏:根据数据来源,校验对应模块的价格/成本权限
|
||
if user_permissions is not None:
|
||
# 超级管理员拥有所有权限,跳过价格脱敏
|
||
if 'material_list:*' in user_permissions:
|
||
# 拥有通配符权限,不隐藏价格列
|
||
pass
|
||
else:
|
||
has_price_perm = True
|
||
row_type = r['type_name']
|
||
|
||
# 根据数据来源检查对应模块的权限
|
||
if row_type == '采购件':
|
||
# 校验采购模块的价格权限
|
||
has_price_perm = any(p in user_permissions for p in
|
||
['inbound_buy:postTaxUnitPrice', 'inbound_buy:preTaxUnitPrice',
|
||
'inbound_buy:totalAmount'])
|
||
elif row_type == '半成品':
|
||
# 校验半成品模块的成本权限
|
||
has_price_perm = any(p in user_permissions for p in
|
||
['inbound_semi:rawMaterialCost', 'inbound_semi:manualCost'])
|
||
elif row_type == '成品':
|
||
# 校验成品模块的成本权限
|
||
has_price_perm = any(p in user_permissions for p in
|
||
['inbound_product:rawMaterialCost', 'inbound_product:manualCost'])
|
||
else:
|
||
# 未知类型,默认隐藏价格列
|
||
has_price_perm = False
|
||
|
||
# 如果没有对应模块的价格查看权限,则清空涉密的5个列
|
||
if not has_price_perm:
|
||
for p_col in ['price_excl', 'total_val_excl', 'tax', 'price_incl', 'total_val']:
|
||
if p_col in col_idx:
|
||
row_val[col_idx[p_col]] = ''
|
||
|
||
ws.append(row_val)
|
||
|
||
# 列宽调整
|
||
dims = {}
|
||
for row in ws.rows:
|
||
for cell in row:
|
||
if cell.value:
|
||
dims[cell.column_letter] = max((dims.get(cell.column_letter, 0), len(str(cell.value))))
|
||
for col, value in dims.items():
|
||
ws.column_dimensions[col].width = min(value + 2, 30)
|
||
|
||
output = io.BytesIO()
|
||
wb.save(output)
|
||
output.seek(0)
|
||
return output
|
||
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
raise e |