Files
KCGL/inventory-backend/app/services/bom_service.py

273 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from sqlalchemy import func, distinct, or_
import uuid
from datetime import datetime
class BomService:
# ====================== 新版 BOM 逻辑(基于 bom_no ======================
@staticmethod
def generate_bom_no():
"""生成唯一的 BOM 编号 (作为默认备选)"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique = str(uuid.uuid4())[:8]
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list(keyword=None):
"""
获取所有 BOM 配方(按 bom_no 分组)
支持模糊搜索BOM编号、父件名称/规格、子件名称/规格
"""
# 1. 如果有搜索关键词,先筛选出符合条件的 bom_no 集合
filtered_bom_nos = None
if keyword:
kw = f'%{keyword}%'
# 条件A: 匹配 BOM编号 或 父件信息
# 需要 join 父件表
q1 = db.session.query(BomTable.bom_no).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
).filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
# 条件B: 匹配 子件信息
# 需要 join 子件表
q2 = db.session.query(BomTable.bom_no).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
# 取并集 (Union)
filtered_bom_nos = q1.union(q2).distinct().all()
filtered_bom_nos = [row[0] for row in filtered_bom_nos]
# 如果搜不到任何结果,直接返回空
if not filtered_bom_nos:
return []
# 2. 原有的分组聚合查询
subq_query = db.session.query(
BomTable.bom_no,
BomTable.parent_id,
BomTable.version,
func.count(BomTable.child_id).label('child_count')
)
# 应用筛选
if filtered_bom_nos is not None:
subq_query = subq_query.filter(BomTable.bom_no.in_(filtered_bom_nos))
subq = subq_query.group_by(BomTable.bom_no, BomTable.parent_id, BomTable.version).subquery()
query = db.session.query(
subq.c.bom_no,
subq.c.parent_id,
subq.c.version,
subq.c.child_count,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec')
).join(MaterialBase, subq.c.parent_id == MaterialBase.id)
# 按 bom_no 倒序排列
query = query.order_by(subq.c.bom_no.desc())
results = query.all()
return [{
'bom_no': row.bom_no,
'parent_id': row.parent_id,
'parent_name': row.parent_name,
'parent_spec': row.parent_spec or '',
'version': row.version,
'child_count': row.child_count
} for row in results]
@staticmethod
def get_bom_detail(bom_no):
"""
根据 bom_no 获取配方详情
返回包含父件信息和子件列表的对象
"""
rows = db.session.query(
BomTable,
MaterialBase.name.label('child_name'),
MaterialBase.spec_model.label('child_spec')
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
).all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
# 获取父件的名称和规格
parent_material = MaterialBase.query.filter(MaterialBase.id == parent_id).first()
if parent_material:
parent_name = parent_material.name
parent_spec = parent_material.spec_model or ''
else:
parent_name = ''
parent_spec = ''
children = []
for bom, child_name, child_spec in rows:
children.append({
'child_id': bom.child_id,
'child_name': child_name,
'child_spec': child_spec or '',
'dosage': float(bom.dosage) if bom.dosage else 0.0,
'remark': bom.remark or ''
})
return {
'bom_no': bom_no,
'parent_id': parent_id,
'parent_name': parent_name,
'parent_spec': parent_spec,
'version': first.BomTable.version,
'children': children
}
@staticmethod
def save_bom(data):
"""
保存或更新一个 BOM 配方
data 结构:
{
"bom_no": "用户输入或自动生成",
"version": "版本号默认v1",
"parent_id": 父件ID,
"children": [...]
}
"""
bom_no = data.get('bom_no')
version = data.get('version', 'v1')
parent_id = data['parent_id']
children = data['children']
# 校验父件不能与子件相同
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no则生成一个新的 (兼容旧逻辑,但现在前端会传值)
if not bom_no or not bom_no.strip():
# 如果前端没传,抛出异常要求用户填写,或者自动生成
# 这里选择自动生成作为兜底,但推荐前端校验必填
bom_no = BomService.generate_bom_no()
# 删除该 bom_no 下所有现有记录 (全量更新模式)
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新记录
for child in children:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return bom_no
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算每个子件的库存和最大可生产数量
"""
detail = BomService.get_bom_detail(bom_no)
if not detail:
return None
for child in detail['children']:
# 查询该子件在 StockBuy 中的可用库存总量
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == child['child_id']
).scalar() or 0
child['current_stock'] = float(stock_qty)
dosage = child['dosage']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== 兼容旧接口(基于 parent_id ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
"""
根据父件 ID 获取其最新的 BOM 编号(用于兼容旧接口)
"""
row = BomTable.query.filter_by(parent_id=parent_id) \
.order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='v1'):
"""
兼容旧接口的保存方法(保留原有调用方式)
如果提供了 bom_no则更新该 bom_no否则为父件创建新 BOM。
"""
# 校验父件不能与子件相同
for item in child_list:
if item['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no尝试查找现有的否则新建
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
# 删除该 bom_no 下所有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新的
for item in child_list:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=item['child_id'],
dosage=item.get('dosage', 0),
remark=item.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return True
@staticmethod
def get_bom_with_stock(parent_id):
"""
兼容旧接口:根据父件 ID 获取 BOM 及库存信息
(实际会找到对应的 bom_no 再调用新方法)
"""
bom_no = BomService.get_bom_no_by_parent(parent_id)
if not bom_no:
return []
detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
if not detail:
return []
return detail['children']