Files
KCGL/inventory-backend/app/services/bom_service.py
dxc 6d5d8a6aad feat: implement BOM versioning with bom_no and new management APIs
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
2026-02-12 09:37:27 +08:00

217 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from sqlalchemy import func, distinct
import uuid
from datetime import datetime
class BomService:
# ====================== 新版 BOM 逻辑(基于 bom_no ======================
@staticmethod
def generate_bom_no():
"""生成唯一的 BOM 编号"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique = str(uuid.uuid4())[:8]
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list():
"""
获取所有 BOM 配方(按 bom_no 分组)
返回:每个 BOM 的编号、父件信息、版本、子件数量
"""
subq = db.session.query(
BomTable.bom_no,
BomTable.parent_id,
BomTable.version,
func.count(BomTable.child_id).label('child_count')
).group_by(BomTable.bom_no, BomTable.parent_id, BomTable.version).subquery()
query = db.session.query(
subq.c.bom_no,
subq.c.parent_id,
subq.c.version,
subq.c.child_count,
MaterialBase.name.label('parent_name')
).join(MaterialBase, subq.c.parent_id == MaterialBase.id)
results = query.all()
return [{
'bom_no': row.bom_no,
'parent_id': row.parent_id,
'parent_name': row.parent_name,
'version': row.version,
'child_count': row.child_count
} for row in results]
@staticmethod
def get_bom_detail(bom_no):
"""
根据 bom_no 获取配方详情
返回包含父件信息和子件列表的对象
"""
rows = db.session.query(
BomTable,
MaterialBase.name.label('child_name')
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
).all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
parent_name = db.session.query(MaterialBase.name)\
.filter(MaterialBase.id == parent_id).scalar() or ''
children = []
for bom, child_name in rows:
children.append({
'child_id': bom.child_id,
'child_name': child_name,
'dosage': float(bom.dosage) if bom.dosage else 0.0,
'remark': bom.remark or ''
})
return {
'bom_no': bom_no,
'parent_id': parent_id,
'parent_name': parent_name,
'version': first.BomTable.version,
'children': children
}
@staticmethod
def save_bom(data):
"""
保存或更新一个 BOM 配方
data 结构:
{
"bom_no": "可选,为空则新建",
"version": "版本号默认v1",
"parent_id": 父件ID,
"children": [
{"child_id": 1, "dosage": 2.5, "remark": ""},
...
]
}
"""
bom_no = data.get('bom_no')
version = data.get('version', 'v1')
parent_id = data['parent_id']
children = data['children']
# 校验父件不能与子件相同
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no则生成一个新的
if not bom_no:
bom_no = BomService.generate_bom_no()
else:
# 删除该 bom_no 下所有现有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新记录
for child in children:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return bom_no
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算每个子件的库存和最大可生产数量
"""
detail = BomService.get_bom_detail(bom_no)
if not detail:
return None
for child in detail['children']:
# 查询该子件在 StockBuy 中的可用库存总量
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == child['child_id']
).scalar() or 0
child['current_stock'] = float(stock_qty)
dosage = child['dosage']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== 兼容旧接口(基于 parent_id ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
"""
根据父件 ID 获取其最新的 BOM 编号(用于兼容旧接口)
"""
row = BomTable.query.filter_by(parent_id=parent_id)\
.order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='v1'):
"""
兼容旧接口的保存方法(保留原有调用方式)
如果提供了 bom_no则更新该 bom_no否则为父件创建新 BOM。
"""
# 校验父件不能与子件相同
for item in child_list:
if item['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no尝试查找现有的否则新建
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
# 删除该 bom_no 下所有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新的
for item in child_list:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=item['child_id'],
dosage=item.get('dosage', 0),
remark=item.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return True
@staticmethod
def get_bom_with_stock(parent_id):
"""
兼容旧接口:根据父件 ID 获取 BOM 及库存信息
(实际会找到对应的 bom_no 再调用新方法)
"""
bom_no = BomService.get_bom_no_by_parent(parent_id)
if not bom_no:
return []
detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
if not detail:
return []
return detail['children']