Files
KCGL/inventory-backend/app/services/bom_service.py

246 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from sqlalchemy import func, distinct, or_, case
import uuid
from datetime import datetime
class BomService:
# ====================== 新版 BOM 逻辑(基于 bom_no ======================
@staticmethod
def generate_bom_no():
"""生成唯一的 BOM 编号 (作为默认备选)"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique = str(uuid.uuid4())[:8]
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list(keyword=None, active_only=False):
"""
获取所有 BOM 配方(按 bom_no + version 分组)
支持模糊搜索BOM编号、父件名称/规格、子件名称/规格
"""
# 1. 关键词过滤:先找出符合条件的 (bom_no, version) 组合
query_base = db.session.query(
BomTable.bom_no,
BomTable.version
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
)
# ★ 过滤禁用状态
if active_only:
query_base = query_base.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
# 关联子件表以支持子件搜索
child_alias = db.aliased(MaterialBase)
query_base = query_base.outerjoin(
child_alias, BomTable.child_id == child_alias.id
).filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
child_alias.name.ilike(kw),
child_alias.spec_model.ilike(kw)
)
)
# 获取符合条件的唯一组合
target_pairs = query_base.distinct().all()
if not target_pairs:
return []
# 2. 聚合查询详情
results = []
for bom_no, version in target_pairs:
summary = db.session.query(
BomTable.parent_id,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec'),
BomTable.is_enabled,
func.count(BomTable.child_id).label('child_count')
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no,
BomTable.version == version
).group_by(
BomTable.parent_id, MaterialBase.name, MaterialBase.spec_model, BomTable.is_enabled
).first()
if summary:
results.append({
'bom_no': bom_no,
'version': version,
'parent_id': summary.parent_id,
'parent_name': summary.parent_name,
'parent_spec': summary.parent_spec or '',
'is_enabled': summary.is_enabled,
'child_count': summary.child_count
})
results.sort(key=lambda x: (x['bom_no'], x['version']), reverse=True)
return results
@staticmethod
def get_bom_detail(bom_no, version=None):
"""
根据 bom_no (和 version) 获取配方详情
"""
query = db.session.query(
BomTable,
MaterialBase.name.label('child_name'),
MaterialBase.spec_model.label('child_spec')
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
)
if version:
query = query.filter(BomTable.version == version)
else:
latest_ver = db.session.query(BomTable.version).filter_by(bom_no=bom_no) \
.order_by(BomTable.version.desc()).limit(1).scalar()
if not latest_ver:
return None
query = query.filter(BomTable.version == latest_ver)
rows = query.all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
parent_material = MaterialBase.query.get(parent_id)
children = []
for bom, child_name, child_spec in rows:
children.append({
'child_id': bom.child_id,
'child_name': child_name,
'child_spec': child_spec or '',
'dosage': float(bom.dosage) if bom.dosage else 0.0,
'remark': bom.remark or ''
})
return {
'bom_no': bom_no,
'version': first.BomTable.version,
'parent_id': parent_id,
'parent_name': parent_material.name if parent_material else '',
'parent_spec': parent_material.spec_model if parent_material else '',
'is_enabled': first.BomTable.is_enabled,
'children': children
}
@staticmethod
def save_bom(data):
"""保存 BOM (支持多版本)"""
bom_no = data.get('bom_no')
version = data.get('version', 'V1.0')
parent_id = data['parent_id']
children = data['children']
is_enabled = data.get('is_enabled', True)
if not bom_no:
raise ValueError('BOM编号不能为空')
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 仅删除当前版本的旧记录
BomTable.query.filter_by(bom_no=bom_no, version=version).delete()
for child in children:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', ''),
is_enabled=is_enabled
)
db.session.add(bom)
db.session.commit()
return bom_no
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算:
1. 总可用库存
2. 最大可生产套数
3. ★ 聚合库位信息 (warehouse_locations)
"""
detail = BomService.get_bom_detail(bom_no)
if not detail:
return None
for child in detail['children']:
# 1. 查询该子件的总库存
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == child['child_id']
).scalar() or 0
# 2. ★ 查询该子件涉及的所有库位,并去重拼接 (PostgreSQL 使用 string_agg)
# 注意:这里假设主要是 stock_buy 表,如果是成品或半成品也需要做类似 Union 查询
# 为简化,这里演示只查 stock_buy 的库位
locations = db.session.query(
# 去除空值和重复值
func.string_agg(distinct(StockBuy.warehouse_location), ', ')
).filter(
StockBuy.base_id == child['child_id'],
StockBuy.available_quantity > 0, # 只看有货的库位
StockBuy.warehouse_location != None,
StockBuy.warehouse_location != ''
).scalar()
child['current_stock'] = float(stock_qty)
child['warehouse_location'] = locations or '' # 返回给前端
dosage = child['dosage']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== 兼容旧接口 ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
row = BomTable.query.filter_by(parent_id=parent_id).order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='V1.0'):
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
BomTable.query.filter_by(bom_no=bom_no, version=version).delete()
for item in child_list:
bom = BomTable(
bom_no=bom_no, version=version, parent_id=parent_id,
child_id=item['child_id'], dosage=item.get('dosage', 0), remark=item.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return True
@staticmethod
def get_bom_with_stock(parent_id):
bom_no = BomService.get_bom_no_by_parent(parent_id)
if not bom_no: return []
detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
return detail['children'] if detail else []