197 lines
7.0 KiB
Python
197 lines
7.0 KiB
Python
from app.extensions import db
|
||
from app.models.bom import BomTable
|
||
from app.models.base import MaterialBase
|
||
from app.models.inbound.buy import StockBuy
|
||
from app.models.inbound.semi import StockSemi
|
||
from app.models.inbound.product import StockProduct
|
||
from sqlalchemy import func
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _get_available_qty(material_type: str, base_id: int) -> float:
|
||
"""
|
||
根据 material_type 查询对应库存表的 available_quantity。
|
||
若查不到记录或无可用量,返回 0.0。
|
||
"""
|
||
qty = 0.0
|
||
if material_type == 'product':
|
||
row = db.session.query(
|
||
func.coalesce(func.sum(StockProduct.available_quantity), 0)
|
||
).filter(StockProduct.base_id == base_id).scalar()
|
||
qty = float(row) if row else 0.0
|
||
elif material_type == 'semi':
|
||
row = db.session.query(
|
||
func.coalesce(func.sum(StockSemi.available_quantity), 0)
|
||
).filter(StockSemi.base_id == base_id).scalar()
|
||
qty = float(row) if row else 0.0
|
||
elif material_type == 'buy':
|
||
row = db.session.query(
|
||
func.coalesce(func.sum(StockBuy.available_quantity), 0)
|
||
).filter(StockBuy.base_id == base_id).scalar()
|
||
qty = float(row) if row else 0.0
|
||
else:
|
||
qty = 0.0
|
||
return qty
|
||
|
||
|
||
def _cascade_allocate(
|
||
db_session,
|
||
material_id: int,
|
||
required_qty: float,
|
||
result_list: list,
|
||
) -> None:
|
||
"""
|
||
递归级联扣减领料核心函数。
|
||
|
||
算法:
|
||
1. 根据 material_id 查出 MaterialBase 获取 material_type
|
||
2. 查对应库存表 available_quantity
|
||
3. 计算 allocated_qty = min(可用库存, required_qty)
|
||
4. shortage_qty = required_qty - allocated_qty
|
||
5. 将结果追加到 result_list
|
||
6. 若 shortage_qty > 0,递归查 BomTable 展开子件,
|
||
对每个子件用 shortage_qty * dosage 继续分配
|
||
|
||
Args:
|
||
db_session: SQLAlchemy db session
|
||
material_id: 当前物料 ID
|
||
required_qty: 当前层级需求数量
|
||
result_list: 收集结果的列表(就地修改)
|
||
"""
|
||
if required_qty <= 0:
|
||
return
|
||
|
||
material = db_session.query(MaterialBase).filter_by(id=material_id).first()
|
||
if not material:
|
||
logger.warning(f"[Cascade] 物料 {material_id} 在 material_base 中不存在,跳过")
|
||
return
|
||
|
||
mat_type = material.material_type or ''
|
||
available = _get_available_qty(mat_type, material_id)
|
||
|
||
allocated_qty = min(available, required_qty)
|
||
shortage_qty = required_qty - allocated_qty
|
||
|
||
result_list.append({
|
||
'material_id': material_id,
|
||
'material_name': material.name,
|
||
'material_type': mat_type,
|
||
'spec_model': material.spec_model or '',
|
||
'unit': material.unit or '',
|
||
'required_qty': required_qty,
|
||
'available_qty': available,
|
||
'allocated_qty': allocated_qty,
|
||
'shortage_qty': shortage_qty,
|
||
})
|
||
|
||
if shortage_qty > 0:
|
||
child_rows = db_session.query(BomTable).filter(
|
||
BomTable.parent_id == material_id,
|
||
BomTable.is_enabled == True,
|
||
).all()
|
||
|
||
for row in child_rows:
|
||
child_id = row.child_id
|
||
dosage = float(row.dosage) if row.dosage else 0.0
|
||
if dosage <= 0:
|
||
logger.warning(
|
||
f"[Cascade] 子件 child_id={child_id} dosage={dosage} 无效,跳过"
|
||
)
|
||
continue
|
||
|
||
child_required_qty = shortage_qty * dosage
|
||
_cascade_allocate(db_session, child_id, child_required_qty, result_list)
|
||
|
||
|
||
class CascadeService:
|
||
|
||
@staticmethod
|
||
def compute_cascade_result(bom_no: str, order_qty: float, version: str = None) -> dict:
|
||
"""
|
||
根据 bom_no 和订单总量执行级联扣减领料,返回完整的分层结果。
|
||
|
||
Args:
|
||
bom_no: BOM 编号
|
||
order_qty: 订单需求总量(顶层成品需求数量)
|
||
version: BOM 版本(可选,默认取最新)
|
||
|
||
Returns:
|
||
{
|
||
"bom_no": str,
|
||
"version": str,
|
||
"parent_id": int,
|
||
"parent_name": str,
|
||
"order_qty": float,
|
||
"total_allocated": float, # 累计已分配量(用于评估满足率)
|
||
"total_shortage": float, # 最终缺口(顶层 shortage_qty)
|
||
"items": [
|
||
{
|
||
"level": int, # 递归层级(0 为顶层)
|
||
"material_id": int,
|
||
"material_name": str,
|
||
"material_type": str,
|
||
"spec_model": str,
|
||
"unit": str,
|
||
"required_qty": float,
|
||
"available_qty": float,
|
||
"allocated_qty": float,
|
||
"shortage_qty": float,
|
||
},
|
||
...
|
||
]
|
||
}
|
||
"""
|
||
from app.services.bom_service import BomService
|
||
|
||
detail = BomService.get_bom_detail(bom_no, version=version)
|
||
if not detail:
|
||
raise ValueError(f"BOM {bom_no} 不存在")
|
||
|
||
parent_id = detail['parent_id']
|
||
parent_name = detail.get('parent_name', '')
|
||
bom_version = detail.get('version', 'V1.0')
|
||
|
||
result_list: list = []
|
||
_cascade_allocate(db.session, parent_id, order_qty, result_list)
|
||
|
||
# 汇总顶层成品allocated和shortage
|
||
parent_item = next(
|
||
(it for it in result_list if it['material_id'] == parent_id), None
|
||
)
|
||
total_allocated = parent_item['allocated_qty'] if parent_item else 0.0
|
||
total_shortage = parent_item['shortage_qty'] if parent_item else order_qty
|
||
|
||
# 按 material_id 合并去重(同一物料被多个父件引用时合并 allocated_qty)
|
||
merged_map: dict = {}
|
||
for it in result_list:
|
||
mid = it['material_id']
|
||
if mid not in merged_map:
|
||
merged_map[mid] = {
|
||
'material_id': it['material_id'],
|
||
'material_name': it['material_name'],
|
||
'material_type': it['material_type'],
|
||
'spec_model': it['spec_model'],
|
||
'unit': it['unit'],
|
||
'required_qty': 0.0,
|
||
'available_qty': it['available_qty'],
|
||
'allocated_qty': 0.0,
|
||
'shortage_qty': 0.0,
|
||
}
|
||
merged_map[mid]['required_qty'] += it['required_qty']
|
||
merged_map[mid]['allocated_qty'] += it['allocated_qty']
|
||
merged_map[mid]['shortage_qty'] += it['shortage_qty']
|
||
|
||
merged_items = list(merged_map.values())
|
||
|
||
return {
|
||
'bom_no': bom_no,
|
||
'version': bom_version,
|
||
'parent_id': parent_id,
|
||
'parent_name': parent_name,
|
||
'order_qty': order_qty,
|
||
'total_allocated': total_allocated,
|
||
'total_shortage': total_shortage,
|
||
'items': merged_items,
|
||
} |