from app.extensions import db from app.models.bom import BomTable from app.models.base import MaterialBase from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct from sqlalchemy import func import logging logger = logging.getLogger(__name__) def _get_available_qty(material_type: str, base_id: int) -> float: """ 根据 material_type 查询对应库存表的 available_quantity。 若查不到记录或无可用量,返回 0.0。 """ qty = 0.0 if material_type == 'product': row = db.session.query( func.coalesce(func.sum(StockProduct.available_quantity), 0) ).filter(StockProduct.base_id == base_id).scalar() qty = float(row) if row else 0.0 elif material_type == 'semi': row = db.session.query( func.coalesce(func.sum(StockSemi.available_quantity), 0) ).filter(StockSemi.base_id == base_id).scalar() qty = float(row) if row else 0.0 elif material_type == 'buy': row = db.session.query( func.coalesce(func.sum(StockBuy.available_quantity), 0) ).filter(StockBuy.base_id == base_id).scalar() qty = float(row) if row else 0.0 else: qty = 0.0 return qty def _cascade_allocate( db_session, material_id: int, required_qty: float, result_list: list, ) -> None: """ 递归级联扣减领料核心函数。 算法: 1. 根据 material_id 查出 MaterialBase 获取 material_type 2. 查对应库存表 available_quantity 3. 计算 allocated_qty = min(可用库存, required_qty) 4. shortage_qty = required_qty - allocated_qty 5. 将结果追加到 result_list 6. 若 shortage_qty > 0,递归查 BomTable 展开子件, 对每个子件用 shortage_qty * dosage 继续分配 Args: db_session: SQLAlchemy db session material_id: 当前物料 ID required_qty: 当前层级需求数量 result_list: 收集结果的列表(就地修改) """ if required_qty <= 0: return material = db_session.query(MaterialBase).filter_by(id=material_id).first() if not material: logger.warning(f"[Cascade] 物料 {material_id} 在 material_base 中不存在,跳过") return mat_type = material.material_type or '' available = _get_available_qty(mat_type, material_id) allocated_qty = min(available, required_qty) shortage_qty = required_qty - allocated_qty result_list.append({ 'material_id': material_id, 'material_name': material.name, 'material_type': mat_type, 'spec_model': material.spec_model or '', 'unit': material.unit or '', 'required_qty': required_qty, 'available_qty': available, 'allocated_qty': allocated_qty, 'shortage_qty': shortage_qty, }) if shortage_qty > 0: child_rows = db_session.query(BomTable).filter( BomTable.parent_id == material_id, BomTable.is_enabled == True, ).all() for row in child_rows: child_id = row.child_id dosage = float(row.dosage) if row.dosage else 0.0 if dosage <= 0: logger.warning( f"[Cascade] 子件 child_id={child_id} dosage={dosage} 无效,跳过" ) continue child_required_qty = shortage_qty * dosage _cascade_allocate(db_session, child_id, child_required_qty, result_list) class CascadeService: @staticmethod def compute_cascade_result(bom_no: str, order_qty: float, version: str = None) -> dict: """ 根据 bom_no 和订单总量执行级联扣减领料,返回完整的分层结果。 Args: bom_no: BOM 编号 order_qty: 订单需求总量(顶层成品需求数量) version: BOM 版本(可选,默认取最新) Returns: { "bom_no": str, "version": str, "parent_id": int, "parent_name": str, "order_qty": float, "total_allocated": float, # 累计已分配量(用于评估满足率) "total_shortage": float, # 最终缺口(顶层 shortage_qty) "items": [ { "level": int, # 递归层级(0 为顶层) "material_id": int, "material_name": str, "material_type": str, "spec_model": str, "unit": str, "required_qty": float, "available_qty": float, "allocated_qty": float, "shortage_qty": float, }, ... ] } """ from app.services.bom_service import BomService detail = BomService.get_bom_detail(bom_no, version=version) if not detail: raise ValueError(f"BOM {bom_no} 不存在") parent_id = detail['parent_id'] parent_name = detail.get('parent_name', '') bom_version = detail.get('version', 'V1.0') result_list: list = [] _cascade_allocate(db.session, parent_id, order_qty, result_list) # 汇总顶层成品allocated和shortage parent_item = next( (it for it in result_list if it['material_id'] == parent_id), None ) total_allocated = parent_item['allocated_qty'] if parent_item else 0.0 total_shortage = parent_item['shortage_qty'] if parent_item else order_qty # 按 material_id 合并去重(同一物料被多个父件引用时合并 allocated_qty) merged_map: dict = {} for it in result_list: mid = it['material_id'] if mid not in merged_map: merged_map[mid] = { 'material_id': it['material_id'], 'material_name': it['material_name'], 'material_type': it['material_type'], 'spec_model': it['spec_model'], 'unit': it['unit'], 'required_qty': 0.0, 'available_qty': it['available_qty'], 'allocated_qty': 0.0, 'shortage_qty': 0.0, } merged_map[mid]['required_qty'] += it['required_qty'] merged_map[mid]['allocated_qty'] += it['allocated_qty'] merged_map[mid]['shortage_qty'] += it['shortage_qty'] merged_items = list(merged_map.values()) return { 'bom_no': bom_no, 'version': bom_version, 'parent_id': parent_id, 'parent_name': parent_name, 'order_qty': order_qty, 'total_allocated': total_allocated, 'total_shortage': total_shortage, 'items': merged_items, }