Files
KCGL/inventory-backend/app/services/bom_service.py

322 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from sqlalchemy import func, distinct, or_, case
from collections import defaultdict
import uuid
from datetime import datetime
class BomService:
# ====================== 新版 BOM 逻辑(基于 bom_no ======================
@staticmethod
def generate_bom_no():
"""生成唯一的 BOM 编号 (作为默认备选)"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique = str(uuid.uuid4())[:8]
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list(keyword=None, active_only=False):
"""
获取所有 BOM 配方(按 bom_no + version 分组)
支持模糊搜索BOM编号、父件名称/规格、子件名称/规格
"""
# 1. 关键词过滤:先找出符合条件的 (bom_no, version) 组合
query_base = db.session.query(
BomTable.bom_no,
BomTable.version
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
)
# ★ 过滤禁用状态
if active_only:
query_base = query_base.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
# 关联子件表以支持子件搜索
child_alias = db.aliased(MaterialBase)
query_base = query_base.outerjoin(
child_alias, BomTable.child_id == child_alias.id
).filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
child_alias.name.ilike(kw),
child_alias.spec_model.ilike(kw)
)
)
# 获取符合条件的唯一组合
target_pairs = query_base.distinct().all()
if not target_pairs:
return []
# 2. 聚合查询详情
results = []
for bom_no, version in target_pairs:
summary = db.session.query(
BomTable.parent_id,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec'),
MaterialBase.category.label('parent_category'),
BomTable.is_enabled,
func.count(BomTable.child_id).label('child_count')
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no,
BomTable.version == version
).group_by(
BomTable.parent_id, MaterialBase.name, MaterialBase.spec_model, MaterialBase.category, BomTable.is_enabled
).first()
if summary:
results.append({
'bom_no': bom_no,
'version': version,
'parent_id': summary.parent_id,
'parent_name': summary.parent_name,
'parent_spec': summary.parent_spec or '',
'parent_category': summary.parent_category or '',
'is_enabled': summary.is_enabled,
'child_count': summary.child_count
})
results.sort(key=lambda x: (x['bom_no'], x['version']), reverse=True)
# 如果有关键词,二次过滤结果(忽略大小写)
if keyword:
kw = keyword.lower()
results = [
r for r in results
if kw in (r.get('parent_name') or '').lower()
or kw in (r.get('parent_spec') or '').lower()
or kw in (r.get('bom_no') or '').lower()
or kw in (r.get('parent_category') or '').lower()
]
# 按 parent_category 分组
grouped = defaultdict(list)
for item in results:
cat = item.get('parent_category') or '未分类'
grouped[cat].append(item)
grouped_list = []
for cat, items in sorted(grouped.items(), key=lambda x: x[0]):
grouped_list.append({
'category': cat,
'count': len(items),
'items': items
})
return grouped_list
@staticmethod
def get_bom_detail(bom_no, version=None):
"""
根据 bom_no (和 version) 获取配方详情
"""
query = db.session.query(
BomTable,
MaterialBase.name.label('child_name'),
MaterialBase.spec_model.label('child_spec')
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
)
if version:
query = query.filter(BomTable.version == version)
else:
latest_ver = db.session.query(BomTable.version).filter_by(bom_no=bom_no) \
.order_by(BomTable.version.desc()).limit(1).scalar()
if not latest_ver:
return None
query = query.filter(BomTable.version == latest_ver)
rows = query.all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
parent_material = MaterialBase.query.get(parent_id)
children = []
for bom, child_name, child_spec in rows:
children.append({
'child_id': bom.child_id,
'child_name': child_name,
'child_spec': child_spec or '',
'dosage': float(bom.dosage) if bom.dosage else 0.0,
'remark': bom.remark or ''
})
return {
'bom_no': bom_no,
'version': first.BomTable.version,
'parent_id': parent_id,
'parent_name': parent_material.name if parent_material else '',
'parent_spec': parent_material.spec_model if parent_material else '',
'is_enabled': first.BomTable.is_enabled,
'children': children
}
@staticmethod
def save_bom(data):
"""保存 BOM (支持多版本),新增跨版本内容查重"""
bom_no = data.get('bom_no')
version = data.get('version', 'V1.0')
parent_id = data['parent_id']
children = data['children']
is_enabled = data.get('is_enabled', True)
if not bom_no:
raise ValueError('BOM编号不能为空')
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# ===== 跨版本内容查重 =====
# 将当前提交的 children 转换为可比较的集合 (child_id, dosage)
current_children_set = set()
for child in children:
# 用 (child_id, dosage) 元组表示dosage 转为整数比较
dosage_val = int(child.get('dosage', 0)) if child.get('dosage') else 0
current_children_set.add((child['child_id'], dosage_val))
# 查询该 bom_no 下所有其他版本的子件配置
existing_versions = db.session.query(
BomTable.version,
BomTable.child_id,
BomTable.dosage
).filter(
BomTable.bom_no == bom_no,
BomTable.version != version # 排除当前正在保存的版本
).all()
# 按版本分组,构建每个版本的子件集合
version_children = {}
for ver, child_id, dosage in existing_versions:
if ver not in version_children:
version_children[ver] = set()
dosage_val = int(dosage) if dosage else 0
version_children[ver].add((child_id, dosage_val))
# 比对每个版本
for ver, existing_set in version_children.items():
if current_children_set == existing_set:
raise ValueError(f'保存失败!当前子件配置与已有版本 {ver} 完全一致,请勿重复保存')
# ===== 执行保存 =====
# 仅删除当前版本的旧记录(改为对象级删除以触发审计事件)
old_records = BomTable.query.filter_by(bom_no=bom_no, version=version).all()
for rec in old_records:
db.session.delete(rec)
# 【核心修复】:强制立即执行 DELETE 语句,为后续的 INSERT 腾出唯一键空间
db.session.flush()
for child in children:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', ''),
is_enabled=is_enabled
)
db.session.add(bom)
db.session.commit()
return bom_no
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算(已修复 N+1 性能问题)
"""
detail = BomService.get_bom_detail(bom_no)
if not detail or not detail.get('children'):
return detail
# 1. 提取所有子件的 ID 列表
child_ids = [child['child_id'] for child in detail['children']]
# 2. 用一条 IN 语句批量查出所有相关子件的库存和库位
stock_stats = db.session.query(
StockBuy.base_id,
func.coalesce(func.sum(StockBuy.available_quantity), 0).label('total_qty'),
func.string_agg(distinct(StockBuy.warehouse_location), ', ').label('locations')
).filter(
StockBuy.base_id.in_(child_ids),
StockBuy.available_quantity > 0
).group_by(
StockBuy.base_id
).all()
# 3. 将查询结果转换为字典 (Map),方便后续 O(1) 极速匹配
stock_map = {
stat.base_id: {
'qty': stat.total_qty,
'loc': stat.locations if stat.locations else ''
}
for stat in stock_stats
}
# 4. 遍历组装数据(纯内存操作,极快)
for child in detail['children']:
base_id = child['child_id']
stat = stock_map.get(base_id, {'qty': 0, 'loc': ''})
stock_qty = float(stat['qty'])
dosage = float(child['dosage']) if child.get('dosage') else 0
child['current_stock'] = stock_qty
child['warehouse_location'] = stat['loc']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== 兼容旧接口 ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
row = BomTable.query.filter_by(parent_id=parent_id).order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='V1.0'):
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
# 改为对象级删除以触发审计事件
old_records = BomTable.query.filter_by(bom_no=bom_no, version=version).all()
for rec in old_records:
db.session.delete(rec)
for item in child_list:
bom = BomTable(
bom_no=bom_no, version=version, parent_id=parent_id,
child_id=item['child_id'], dosage=item.get('dosage', 0), remark=item.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return True
@staticmethod
def get_bom_with_stock(parent_id):
bom_no = BomService.get_bom_no_by_parent(parent_id)
if not bom_no: return []
detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
return detail['children'] if detail else []