Files
KCGL/inventory-backend/app/services/bom_service.py

446 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import func, distinct, or_, case
import uuid
from datetime import datetime
class BomService:
# ====================== 新版 BOM 逻辑(基于 bom_no ======================
@staticmethod
def generate_bom_no():
"""生成唯一的 BOM 编号 (作为默认备选)"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique = str(uuid.uuid4())[:8]
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list(keyword=None, active_only=False):
"""
获取所有 BOM 配方(按 bom_no + version 分组)
支持模糊搜索BOM编号、父件名称/规格、子件名称/规格
"""
# 1. 关键词过滤:先找出符合条件的 (bom_no, version) 组合
query_base = db.session.query(
BomTable.bom_no,
BomTable.version
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
)
# ★ 过滤禁用状态
if active_only:
query_base = query_base.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
# 关联子件表以支持子件搜索
child_alias = db.aliased(MaterialBase)
query_base = query_base.outerjoin(
child_alias, BomTable.child_id == child_alias.id
).filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
child_alias.name.ilike(kw),
child_alias.spec_model.ilike(kw)
)
)
# 获取符合条件的唯一组合
target_pairs = query_base.distinct().all()
if not target_pairs:
return []
# 2. 聚合查询详情
results = []
for bom_no, version in target_pairs:
summary = db.session.query(
BomTable.parent_id,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec'),
BomTable.is_enabled,
func.count(BomTable.child_id).label('child_count')
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no,
BomTable.version == version
).group_by(
BomTable.parent_id, MaterialBase.name, MaterialBase.spec_model, BomTable.is_enabled
).first()
if summary:
results.append({
'bom_no': bom_no,
'version': version,
'parent_id': summary.parent_id,
'parent_name': summary.parent_name,
'parent_spec': summary.parent_spec or '',
'is_enabled': summary.is_enabled,
'child_count': summary.child_count
})
results.sort(key=lambda x: (x['bom_no'], x['version']), reverse=True)
return results
@staticmethod
def get_bom_detail(bom_no, version=None):
"""
根据 bom_no (和 version) 获取配方详情
"""
query = db.session.query(
BomTable,
MaterialBase.name.label('child_name'),
MaterialBase.spec_model.label('child_spec')
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
)
if version:
query = query.filter(BomTable.version == version)
else:
latest_ver = db.session.query(BomTable.version).filter_by(bom_no=bom_no) \
.order_by(BomTable.version.desc()).limit(1).scalar()
if not latest_ver:
return None
query = query.filter(BomTable.version == latest_ver)
rows = query.all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
parent_material = MaterialBase.query.get(parent_id)
children = []
for bom, child_name, child_spec in rows:
children.append({
'child_id': bom.child_id,
'child_name': child_name,
'child_spec': child_spec or '',
'dosage': float(bom.dosage) if bom.dosage else 0.0,
'remark': bom.remark or ''
})
return {
'bom_no': bom_no,
'version': first.BomTable.version,
'parent_id': parent_id,
'parent_name': parent_material.name if parent_material else '',
'parent_spec': parent_material.spec_model if parent_material else '',
'is_enabled': first.BomTable.is_enabled,
'children': children
}
@staticmethod
def save_bom(data):
"""保存 BOM (支持多版本),新增跨版本内容查重"""
bom_no = data.get('bom_no')
version = data.get('version', 'V1.0')
parent_id = data['parent_id']
children = data['children']
is_enabled = data.get('is_enabled', True)
if not bom_no:
raise ValueError('BOM编号不能为空')
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# ===== 跨版本内容查重 =====
# 将当前提交的 children 转换为可比较的集合 (child_id, dosage)
current_children_set = set()
for child in children:
# 用 (child_id, dosage) 元组表示dosage 转为整数比较
dosage_val = int(child.get('dosage', 0)) if child.get('dosage') else 0
current_children_set.add((child['child_id'], dosage_val))
# 查询该 bom_no 下所有其他版本的子件配置
existing_versions = db.session.query(
BomTable.version,
BomTable.child_id,
BomTable.dosage
).filter(
BomTable.bom_no == bom_no,
BomTable.version != version # 排除当前正在保存的版本
).all()
# 按版本分组,构建每个版本的子件集合
version_children = {}
for ver, child_id, dosage in existing_versions:
if ver not in version_children:
version_children[ver] = set()
dosage_val = int(dosage) if dosage else 0
version_children[ver].add((child_id, dosage_val))
# 比对每个版本
for ver, existing_set in version_children.items():
if current_children_set == existing_set:
raise ValueError(f'保存失败!当前子件配置与已有版本 {ver} 完全一致,请勿重复保存')
# ===== 执行保存 =====
# 仅删除当前版本的旧记录
BomTable.query.filter_by(bom_no=bom_no, version=version).delete()
for child in children:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', ''),
is_enabled=is_enabled
)
db.session.add(bom)
db.session.commit()
return bom_no
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算:
1. 总可用库存
2. 最大可生产套数
3. ★ 聚合库位信息 (warehouse_locations)
"""
detail = BomService.get_bom_detail(bom_no)
if not detail:
return None
for child in detail['children']:
# 1. 查询该子件的总库存
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == child['child_id']
).scalar() or 0
# 2. ★ 查询该子件涉及的所有库位,并去重拼接 (PostgreSQL 使用 string_agg)
# 注意:这里假设主要是 stock_buy 表,如果是成品或半成品也需要做类似 Union 查询
# 为简化,这里演示只查 stock_buy 的库位
locations = db.session.query(
# 去除空值和重复值
func.string_agg(distinct(StockBuy.warehouse_location), ', ')
).filter(
StockBuy.base_id == child['child_id'],
StockBuy.available_quantity > 0, # 只看有货的库位
StockBuy.warehouse_location != None,
StockBuy.warehouse_location != ''
).scalar()
child['current_stock'] = float(stock_qty)
child['warehouse_location'] = locations or '' # 返回给前端
dosage = child['dosage']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== MRP 齐套模拟计算 ======================
@staticmethod
def calculate_kitting(entries: list) -> dict:
"""
MRP 齐套模拟计算
算法步骤:
1. 遍历传入的 BOM取每个 BOM 最新版本的子件
2. 按子件 base_id 合并需求量:需求量 = dosage * (1 + loss_rate/100) * target_qty
3. 跨 StockBuy / StockSemi / StockProduct 聚合可用库存
4. 计算缺口shortage = available_quantity - required_quantity
5. 计算每个 BOM 的"当前库存可生产最大套数"(木桶效应)
:param entries: [{"bom_no": "BOM-001", "target_qty": 10}, ...]
:return: {
"bom_summary": [{bom_no, parent_name, max_producible}],
"materials": [{base_id, material_name, spec, unit, required_qty, available_qty, shortage, bom_sources}]
}
"""
# Step 1: 展开所有 BOM 的子件,聚合需求量
demand_map = {} # child_id -> {base_id, material_name, spec, unit, required_qty, bom_sources: []}
# 记录每个 entry 的元信息(用于后续 per-BOM 产量计算)
entry_meta = {} # bom_no -> {parent_name, version, children: {child_id: {dosage, loss_rate}}}
for entry in entries:
bom_no = entry.get('bom_no')
target_qty = float(entry.get('target_qty', 0) or 0)
if not bom_no or target_qty <= 0:
continue
# 取最新版本
latest_version = db.session.query(
BomTable.version
).filter_by(
bom_no=bom_no
).order_by(
BomTable.version.desc()
).limit(1).scalar()
if not latest_version:
continue
# 获取父件名称
parent_row = db.session.query(
BomTable.parent_id, MaterialBase.name
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no,
BomTable.version == latest_version
).first()
parent_name = parent_row.name if parent_row else ''
# 查询该 BOM 所有子件
rows = db.session.query(
BomTable, MaterialBase.name, MaterialBase.spec_model, MaterialBase.unit
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no,
BomTable.version == latest_version,
BomTable.is_enabled == True
).all()
entry_meta[bom_no] = {
'parent_name': parent_name,
'version': latest_version,
'children': {}
}
for bom, child_name, child_spec, child_unit in rows:
dosage = float(bom.dosage or 0)
loss_rate = float(bom.loss_rate or 0)
adj_dosage = dosage * (1 + loss_rate / 100.0)
qty_needed = adj_dosage * target_qty
# 记录 per-unit 用量(用于 max_producible 计算)
entry_meta[bom_no]['children'][bom.child_id] = {
'dosage': dosage,
'loss_rate': loss_rate,
'adj_dosage': adj_dosage,
'per_unit': adj_dosage # 每生产1套该BOM所需的此子件数量
}
if bom.child_id not in demand_map:
demand_map[bom.child_id] = {
'base_id': bom.child_id,
'material_name': child_name or '',
'spec': child_spec or '',
'unit': child_unit or '',
'required_qty': 0.0,
'bom_sources': []
}
demand_map[bom.child_id]['required_qty'] += qty_needed
demand_map[bom.child_id]['bom_sources'].append({
'bom_no': bom_no,
'dosage': dosage,
'loss_rate': loss_rate,
'target_qty': target_qty
})
# Step 2: 批量查询三张库存表的可用库存
child_ids = list(demand_map.keys())
if not child_ids:
return {'bom_summary': [], 'materials': []}
available_map = {cid: 0.0 for cid in child_ids}
for model_cls in (StockBuy, StockSemi, StockProduct):
rows = db.session.query(
model_cls.base_id,
func.coalesce(model_cls.available_quantity, 0)
).filter(
model_cls.base_id.in_(child_ids)
).all()
for base_id, qty in rows:
if base_id in available_map:
available_map[base_id] += float(qty)
# Step 3: 构造物料结果,计算缺口
materials = []
for base_id, info in demand_map.items():
avail = available_map.get(base_id, 0.0)
shortage = avail - info['required_qty']
materials.append({
'base_id': base_id,
'material_name': info['material_name'],
'spec': info['spec'],
'unit': info['unit'],
'required_qty': round(info['required_qty'], 4),
'available_qty': round(avail, 4),
'shortage': round(shortage, 4),
'bom_sources': info['bom_sources']
})
# 按缺件数量升序(最缺的排前面)
materials.sort(key=lambda x: x['shortage'])
# Step 4: 计算每个 BOM 的"当前库存可生产最大套数"(木桶效应)
# 算法:对每个 BOM 的所有子件,计算 Floor(available_qty / per_unit_demand)
# 取最小值 = 该 BOM 的最大可生产套数
bom_summary = []
for bom_no, meta in entry_meta.items():
min_producible = float('inf')
for child_id, child_info in meta['children'].items():
avail = available_map.get(child_id, 0.0)
per_unit = child_info['adj_dosage']
if per_unit > 0:
producible = int(avail // per_unit)
if producible < min_producible:
min_producible = producible
max_prod = int(min_producible) if min_producible != float('inf') else 0
bom_summary.append({
'bom_no': bom_no,
'parent_name': meta['parent_name'],
'version': meta['version'],
'max_producible': max_prod
})
return {
'bom_summary': bom_summary,
'materials': materials
}
# ====================== 兼容旧接口 ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
row = BomTable.query.filter_by(parent_id=parent_id).order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='V1.0'):
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
BomTable.query.filter_by(bom_no=bom_no, version=version).delete()
for item in child_list:
bom = BomTable(
bom_no=bom_no, version=version, parent_id=parent_id,
child_id=item['child_id'], dosage=item.get('dosage', 0), remark=item.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return True
@staticmethod
def get_bom_with_stock(parent_id):
bom_no = BomService.get_bom_no_by_parent(parent_id)
if not bom_no: return []
detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
return detail['children'] if detail else []