feat: implement MRP kitting calculator for production simulation and shared component analysis

This commit is contained in:
DXC
2026-03-24 09:10:56 +08:00
parent 5fe645dc0b
commit 706d7e551c
5 changed files with 665 additions and 0 deletions

View File

@ -2,6 +2,8 @@ from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import func, distinct, or_, case
import uuid
from datetime import datetime
@ -248,6 +250,117 @@ class BomService:
return detail
# ====================== MRP 齐套模拟计算 ======================
@staticmethod
def calculate_kitting(entries: list) -> list:
"""
MRP 齐套模拟计算
算法步骤:
1. 遍历传入的 BOM取每个 BOM 最新版本的子件
2. 按子件 base_id 合并需求量:需求量 = dosage * (1 + loss_rate/100) * target_qty
3. 跨 StockBuy / StockSemi / StockProduct 聚合可用库存
4. 计算缺口shortage = available_quantity - required_quantity
:param entries: [{"bom_no": "BOM-001", "target_qty": 10}, ...]
:return: [{base_id, name, spec, unit, required_qty, available_qty, shortage, children: [...]}, ...]
"""
# Step 1: 展开所有 BOM 的子件,聚合需求量
demand_map = {} # child_id -> {base_id, material_name, spec, unit, required_qty, bom_sources: []}
for entry in entries:
bom_no = entry.get('bom_no')
target_qty = float(entry.get('target_qty', 0) or 0)
if not bom_no or target_qty <= 0:
continue
# 取最新版本
latest_version = db.session.query(
BomTable.version
).filter_by(
bom_no=bom_no
).order_by(
BomTable.version.desc()
).limit(1).scalar()
if not latest_version:
continue
# 查询该 BOM 所有子件
rows = db.session.query(
BomTable, MaterialBase.name, MaterialBase.spec_model, MaterialBase.unit
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no,
BomTable.version == latest_version,
BomTable.is_enabled == True
).all()
for bom, child_name, child_spec, child_unit in rows:
dosage = float(bom.dosage or 0)
loss_rate = float(bom.loss_rate or 0)
adj_dosage = dosage * (1 + loss_rate / 100.0)
qty_needed = adj_dosage * target_qty
if bom.child_id not in demand_map:
demand_map[bom.child_id] = {
'base_id': bom.child_id,
'material_name': child_name or '',
'spec': child_spec or '',
'unit': child_unit or '',
'required_qty': 0.0,
'bom_sources': []
}
demand_map[bom.child_id]['required_qty'] += qty_needed
demand_map[bom.child_id]['bom_sources'].append({
'bom_no': bom_no,
'dosage': dosage,
'loss_rate': loss_rate,
'target_qty': target_qty
})
# Step 2: 批量查询三张库存表的可用库存
child_ids = list(demand_map.keys())
if not child_ids:
return []
# StockBuy.available_quantity, StockSemi.available_quantity, StockProduct.available_quantity
available_map = {cid: 0.0 for cid in child_ids}
for model_cls in (StockBuy, StockSemi, StockProduct):
if model_cls is None:
continue
rows = db.session.query(
model_cls.base_id,
func.coalesce(model_cls.available_quantity, 0)
).filter(
model_cls.base_id.in_(child_ids)
).all()
for base_id, qty in rows:
if base_id in available_map:
available_map[base_id] += float(qty)
# Step 3: 构造结果,计算缺口
results = []
for base_id, info in demand_map.items():
avail = available_map.get(base_id, 0.0)
shortage = avail - info['required_qty']
results.append({
'base_id': base_id,
'material_name': info['material_name'],
'spec': info['spec'],
'unit': info['unit'],
'required_qty': round(info['required_qty'], 4),
'available_qty': round(avail, 4),
'shortage': round(shortage, 4),
'bom_sources': info['bom_sources']
})
# 按缺件数量降序排列(最缺的排前面)
results.sort(key=lambda x: x['shortage'])
return results
# ====================== 兼容旧接口 ======================
@staticmethod
def get_bom_no_by_parent(parent_id):