from app.extensions import db from app.models.bom import BomTable from app.models.base import MaterialBase from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct from sqlalchemy import func, distinct, or_, case import uuid from datetime import datetime class BomService: # ====================== 新版 BOM 逻辑(基于 bom_no) ====================== @staticmethod def generate_bom_no(): """生成唯一的 BOM 编号 (作为默认备选)""" timestamp = datetime.now().strftime('%Y%m%d%H%M%S') unique = str(uuid.uuid4())[:8] return f'BOM-{timestamp}-{unique}' @staticmethod def get_bom_list(keyword=None, active_only=False): """ 获取所有 BOM 配方(按 bom_no + version 分组) 支持模糊搜索:BOM编号、父件名称/规格、子件名称/规格 """ # 1. 关键词过滤:先找出符合条件的 (bom_no, version) 组合 query_base = db.session.query( BomTable.bom_no, BomTable.version ).join( MaterialBase, BomTable.parent_id == MaterialBase.id ) # ★ 过滤禁用状态 if active_only: query_base = query_base.filter(BomTable.is_enabled == True) if keyword: kw = f'%{keyword}%' # 关联子件表以支持子件搜索 child_alias = db.aliased(MaterialBase) query_base = query_base.outerjoin( child_alias, BomTable.child_id == child_alias.id ).filter( or_( BomTable.bom_no.ilike(kw), MaterialBase.name.ilike(kw), MaterialBase.spec_model.ilike(kw), child_alias.name.ilike(kw), child_alias.spec_model.ilike(kw) ) ) # 获取符合条件的唯一组合 target_pairs = query_base.distinct().all() if not target_pairs: return [] # 2. 聚合查询详情 results = [] for bom_no, version in target_pairs: summary = db.session.query( BomTable.parent_id, MaterialBase.name.label('parent_name'), MaterialBase.spec_model.label('parent_spec'), BomTable.is_enabled, func.count(BomTable.child_id).label('child_count') ).join( MaterialBase, BomTable.parent_id == MaterialBase.id ).filter( BomTable.bom_no == bom_no, BomTable.version == version ).group_by( BomTable.parent_id, MaterialBase.name, MaterialBase.spec_model, BomTable.is_enabled ).first() if summary: results.append({ 'bom_no': bom_no, 'version': version, 'parent_id': summary.parent_id, 'parent_name': summary.parent_name, 'parent_spec': summary.parent_spec or '', 'is_enabled': summary.is_enabled, 'child_count': summary.child_count }) results.sort(key=lambda x: (x['bom_no'], x['version']), reverse=True) return results @staticmethod def get_bom_detail(bom_no, version=None): """ 根据 bom_no (和 version) 获取配方详情 """ query = db.session.query( BomTable, MaterialBase.name.label('child_name'), MaterialBase.spec_model.label('child_spec') ).join( MaterialBase, BomTable.child_id == MaterialBase.id ).filter( BomTable.bom_no == bom_no ) if version: query = query.filter(BomTable.version == version) else: latest_ver = db.session.query(BomTable.version).filter_by(bom_no=bom_no) \ .order_by(BomTable.version.desc()).limit(1).scalar() if not latest_ver: return None query = query.filter(BomTable.version == latest_ver) rows = query.all() if not rows: return None first = rows[0] parent_id = first.BomTable.parent_id parent_material = MaterialBase.query.get(parent_id) children = [] for bom, child_name, child_spec in rows: children.append({ 'child_id': bom.child_id, 'child_name': child_name, 'child_spec': child_spec or '', 'dosage': float(bom.dosage) if bom.dosage else 0.0, 'remark': bom.remark or '' }) return { 'bom_no': bom_no, 'version': first.BomTable.version, 'parent_id': parent_id, 'parent_name': parent_material.name if parent_material else '', 'parent_spec': parent_material.spec_model if parent_material else '', 'is_enabled': first.BomTable.is_enabled, 'children': children } @staticmethod def save_bom(data): """保存 BOM (支持多版本),新增跨版本内容查重""" bom_no = data.get('bom_no') version = data.get('version', 'V1.0') parent_id = data['parent_id'] children = data['children'] is_enabled = data.get('is_enabled', True) if not bom_no: raise ValueError('BOM编号不能为空') for child in children: if child['child_id'] == parent_id: raise ValueError('父件与子件不能是同一物料') # ===== 跨版本内容查重 ===== # 将当前提交的 children 转换为可比较的集合 (child_id, dosage) current_children_set = set() for child in children: # 用 (child_id, dosage) 元组表示,dosage 转为整数比较 dosage_val = int(child.get('dosage', 0)) if child.get('dosage') else 0 current_children_set.add((child['child_id'], dosage_val)) # 查询该 bom_no 下所有其他版本的子件配置 existing_versions = db.session.query( BomTable.version, BomTable.child_id, BomTable.dosage ).filter( BomTable.bom_no == bom_no, BomTable.version != version # 排除当前正在保存的版本 ).all() # 按版本分组,构建每个版本的子件集合 version_children = {} for ver, child_id, dosage in existing_versions: if ver not in version_children: version_children[ver] = set() dosage_val = int(dosage) if dosage else 0 version_children[ver].add((child_id, dosage_val)) # 比对每个版本 for ver, existing_set in version_children.items(): if current_children_set == existing_set: raise ValueError(f'保存失败!当前子件配置与已有版本 {ver} 完全一致,请勿重复保存') # ===== 执行保存 ===== # 仅删除当前版本的旧记录 BomTable.query.filter_by(bom_no=bom_no, version=version).delete() for child in children: bom = BomTable( bom_no=bom_no, version=version, parent_id=parent_id, child_id=child['child_id'], dosage=child.get('dosage', 0), remark=child.get('remark', ''), is_enabled=is_enabled ) db.session.add(bom) db.session.commit() return bom_no @staticmethod def get_bom_with_stock_by_bom_no(bom_no): """ 根据 bom_no 获取配方详情,并计算: 1. 总可用库存 2. 最大可生产套数 3. ★ 聚合库位信息 (warehouse_locations) """ detail = BomService.get_bom_detail(bom_no) if not detail: return None for child in detail['children']: # 1. 查询该子件的总库存 stock_qty = db.session.query( func.coalesce(func.sum(StockBuy.available_quantity), 0) ).filter( StockBuy.base_id == child['child_id'] ).scalar() or 0 # 2. ★ 查询该子件涉及的所有库位,并去重拼接 (PostgreSQL 使用 string_agg) # 注意:这里假设主要是 stock_buy 表,如果是成品或半成品也需要做类似 Union 查询 # 为简化,这里演示只查 stock_buy 的库位 locations = db.session.query( # 去除空值和重复值 func.string_agg(distinct(StockBuy.warehouse_location), ', ') ).filter( StockBuy.base_id == child['child_id'], StockBuy.available_quantity > 0, # 只看有货的库位 StockBuy.warehouse_location != None, StockBuy.warehouse_location != '' ).scalar() child['current_stock'] = float(stock_qty) child['warehouse_location'] = locations or '' # 返回给前端 dosage = child['dosage'] child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0 return detail # ====================== MRP 齐套模拟计算 ====================== @staticmethod def calculate_kitting(entries: list) -> dict: """ MRP 齐套模拟计算 算法步骤: 1. 遍历传入的 BOM,取每个 BOM 最新版本的子件 2. 按子件 base_id 合并需求量:需求量 = dosage * (1 + loss_rate/100) * target_qty 3. 跨 StockBuy / StockSemi / StockProduct 聚合可用库存 4. 计算缺口:shortage = available_quantity - required_quantity 5. 计算每个 BOM 的"当前库存可生产最大套数"(木桶效应) :param entries: [{"bom_no": "BOM-001", "target_qty": 10}, ...] :return: { "bom_summary": [{bom_no, parent_name, max_producible}], "materials": [{base_id, material_name, spec, unit, required_qty, available_qty, shortage, bom_sources}] } """ # Step 1: 展开所有 BOM 的子件,聚合需求量 demand_map = {} # child_id -> {base_id, material_name, spec, unit, required_qty, bom_sources: []} # 记录每个 entry 的元信息(用于后续 per-BOM 产量计算) entry_meta = {} # bom_no -> {parent_name, version, children: {child_id: {dosage, loss_rate}}} for entry in entries: bom_no = entry.get('bom_no') target_qty = float(entry.get('target_qty', 0) or 0) if not bom_no or target_qty <= 0: continue # 取最新版本 latest_version = db.session.query( BomTable.version ).filter_by( bom_no=bom_no ).order_by( BomTable.version.desc() ).limit(1).scalar() if not latest_version: continue # 获取父件名称 parent_row = db.session.query( BomTable.parent_id, MaterialBase.name ).join( MaterialBase, BomTable.parent_id == MaterialBase.id ).filter( BomTable.bom_no == bom_no, BomTable.version == latest_version ).first() parent_name = parent_row.name if parent_row else '' # 查询该 BOM 所有子件 rows = db.session.query( BomTable, MaterialBase.name, MaterialBase.spec_model, MaterialBase.unit ).join( MaterialBase, BomTable.child_id == MaterialBase.id ).filter( BomTable.bom_no == bom_no, BomTable.version == latest_version, BomTable.is_enabled == True ).all() entry_meta[bom_no] = { 'parent_name': parent_name, 'version': latest_version, 'children': {} } for bom, child_name, child_spec, child_unit in rows: dosage = float(bom.dosage or 0) loss_rate = float(bom.loss_rate or 0) adj_dosage = dosage * (1 + loss_rate / 100.0) qty_needed = adj_dosage * target_qty # 记录 per-unit 用量(用于 max_producible 计算) entry_meta[bom_no]['children'][bom.child_id] = { 'dosage': dosage, 'loss_rate': loss_rate, 'adj_dosage': adj_dosage, 'per_unit': adj_dosage # 每生产1套该BOM所需的此子件数量 } if bom.child_id not in demand_map: demand_map[bom.child_id] = { 'base_id': bom.child_id, 'material_name': child_name or '', 'spec': child_spec or '', 'unit': child_unit or '', 'required_qty': 0.0, 'bom_sources': [] } demand_map[bom.child_id]['required_qty'] += qty_needed demand_map[bom.child_id]['bom_sources'].append({ 'bom_no': bom_no, 'dosage': dosage, 'loss_rate': loss_rate, 'target_qty': target_qty }) # Step 2: 批量查询三张库存表的可用库存 child_ids = list(demand_map.keys()) if not child_ids: return {'bom_summary': [], 'materials': []} available_map = {cid: 0.0 for cid in child_ids} for model_cls in (StockBuy, StockSemi, StockProduct): rows = db.session.query( model_cls.base_id, func.coalesce(model_cls.available_quantity, 0) ).filter( model_cls.base_id.in_(child_ids) ).all() for base_id, qty in rows: if base_id in available_map: available_map[base_id] += float(qty) # Step 3: 构造物料结果,计算缺口 materials = [] for base_id, info in demand_map.items(): avail = available_map.get(base_id, 0.0) shortage = avail - info['required_qty'] materials.append({ 'base_id': base_id, 'material_name': info['material_name'], 'spec': info['spec'], 'unit': info['unit'], 'required_qty': round(info['required_qty'], 4), 'available_qty': round(avail, 4), 'shortage': round(shortage, 4), 'bom_sources': info['bom_sources'] }) # 按缺件数量升序(最缺的排前面) materials.sort(key=lambda x: x['shortage']) # Step 4: 计算每个 BOM 的"当前库存可生产最大套数"(木桶效应) # 算法:对每个 BOM 的所有子件,计算 Floor(available_qty / per_unit_demand), # 取最小值 = 该 BOM 的最大可生产套数 bom_summary = [] for bom_no, meta in entry_meta.items(): min_producible = float('inf') for child_id, child_info in meta['children'].items(): avail = available_map.get(child_id, 0.0) per_unit = child_info['adj_dosage'] if per_unit > 0: producible = int(avail // per_unit) if producible < min_producible: min_producible = producible max_prod = int(min_producible) if min_producible != float('inf') else 0 bom_summary.append({ 'bom_no': bom_no, 'parent_name': meta['parent_name'], 'version': meta['version'], 'max_producible': max_prod }) return { 'bom_summary': bom_summary, 'materials': materials } # ====================== 兼容旧接口 ====================== @staticmethod def get_bom_no_by_parent(parent_id): row = BomTable.query.filter_by(parent_id=parent_id).order_by(BomTable.version.desc()).first() return row.bom_no if row else None @staticmethod def create_or_update_bom(parent_id, child_list, bom_no=None, version='V1.0'): if not bom_no: existing = BomTable.query.filter_by(parent_id=parent_id).first() bom_no = existing.bom_no if existing else BomService.generate_bom_no() BomTable.query.filter_by(bom_no=bom_no, version=version).delete() for item in child_list: bom = BomTable( bom_no=bom_no, version=version, parent_id=parent_id, child_id=item['child_id'], dosage=item.get('dosage', 0), remark=item.get('remark', '') ) db.session.add(bom) db.session.commit() return True @staticmethod def get_bom_with_stock(parent_id): bom_no = BomService.get_bom_no_by_parent(parent_id) if not bom_no: return [] detail = BomService.get_bom_with_stock_by_bom_no(bom_no) return detail['children'] if detail else []