from app.extensions import db from app.models.bom import BomTable from app.models.base import MaterialBase from app.models.inbound.buy import StockBuy from sqlalchemy import func, distinct, or_, case from collections import defaultdict import uuid import json import logging from datetime import datetime logger = logging.getLogger(__name__) # Redis 缓存键前缀 + TTL(秒) BOM_CACHE_PREFIX = 'bom:tree' BOM_CACHE_TTL = 43200 # 12小时 def _get_redis(): """ 获取 Redis 客户端实例,带容错保护。 若 extensions 中没有 redis_client 或连接失败,返回 None。 绝不抛出异常,确保业务不因此中断。 """ try: from app.extensions import redis_client return redis_client except Exception: return None def _cache_get(bom_no, version=None): """ 从 Redis 读取 BOM 缓存。 键 = bom:tree:{bom_no} 或 bom:tree:{bom_no}:{version} 返回:反序列化后的 dict 或 None """ client = _get_redis() if not client: return None key = f"{BOM_CACHE_PREFIX}:{bom_no}" + (f":{version}" if version else "") try: raw = client.get(key) if raw: logger.debug(f"[BOM Cache] HIT key={key}") return json.loads(raw) logger.debug(f"[BOM Cache] MISS key={key}") return None except Exception as e: logger.warning(f"[BOM Cache] GET 失败,降级查库. key={key}, err={e}") return None def _cache_set(bom_no, version, data): """ 将 BOM 数据写入 Redis,设置 12 小时 TTL。 即使写入失败也只是打日志,不阻断业务流程。 """ client = _get_redis() if not client: return key = f"{BOM_CACHE_PREFIX}:{bom_no}" + (f":{version}" if version else "") try: client.setex(key, BOM_CACHE_TTL, json.dumps(data, ensure_ascii=False)) logger.debug(f"[BOM Cache] SET key={key} ttl={BOM_CACHE_TTL}s") except Exception as e: logger.warning(f"[BOM Cache] SET 失败,已忽略. key={key}, err={e}") def _cache_delete(bom_no, version=None): """ 删除 Redis 中指定 BOM 的缓存条目。 在写操作(增/改/删)成功后调用,确保后续读请求拿到最新数据。 """ client = _get_redis() if not client: return # 删除版本级缓存 if version: key = f"{BOM_CACHE_PREFIX}:{bom_no}:{version}" try: client.delete(key) logger.debug(f"[BOM Cache] DEL key={key}") except Exception as e: logger.warning(f"[BOM Cache] DEL 失败,已忽略. key={key}, err={e}") # 同时删除"最新版"缓存(不带版本后缀),避免缓存不一致 key_latest = f"{BOM_CACHE_PREFIX}:{bom_no}" try: client.delete(key_latest) logger.debug(f"[BOM Cache] DEL key={key_latest}") except Exception as e: logger.warning(f"[BOM Cache] DEL 失败,已忽略. key={key_latest}, err={e}") class BomService: # ====================== 新版 BOM 逻辑(基于 bom_no) ====================== @staticmethod def generate_bom_no(): """生成唯一的 BOM 编号 (作为默认备选)""" timestamp = datetime.now().strftime('%Y%m%d%H%M%S') unique = str(uuid.uuid4())[:8] return f'BOM-{timestamp}-{unique}' @staticmethod def get_bom_list(keyword=None, active_only=False): """ 获取所有 BOM 配方(按 bom_no + version 分组) 支持模糊搜索:BOM编号、父件名称/规格、子件名称/规格 """ # 1. 关键词过滤:先找出符合条件的 (bom_no, version) 组合 query_base = db.session.query( BomTable.bom_no, BomTable.version ).join( MaterialBase, BomTable.parent_id == MaterialBase.id ) # ★ 过滤禁用状态 if active_only: query_base = query_base.filter(BomTable.is_enabled == True) if keyword: kw = f'%{keyword}%' # 关联子件表以支持子件搜索 child_alias = db.aliased(MaterialBase) query_base = query_base.outerjoin( child_alias, BomTable.child_id == child_alias.id ).filter( or_( BomTable.bom_no.ilike(kw), MaterialBase.name.ilike(kw), MaterialBase.spec_model.ilike(kw), child_alias.name.ilike(kw), child_alias.spec_model.ilike(kw) ) ) # 获取符合条件的唯一组合 target_pairs = query_base.distinct().all() if not target_pairs: return [] # 2. 聚合查询详情 results = [] for bom_no, version in target_pairs: summary = db.session.query( BomTable.parent_id, MaterialBase.name.label('parent_name'), MaterialBase.spec_model.label('parent_spec'), MaterialBase.category.label('parent_category'), BomTable.is_enabled, func.count(BomTable.child_id).label('child_count') ).join( MaterialBase, BomTable.parent_id == MaterialBase.id ).filter( BomTable.bom_no == bom_no, BomTable.version == version ).group_by( BomTable.parent_id, MaterialBase.name, MaterialBase.spec_model, MaterialBase.category, BomTable.is_enabled ).first() if summary: results.append({ 'bom_no': bom_no, 'version': version, 'parent_id': summary.parent_id, 'parent_name': summary.parent_name, 'parent_spec': summary.parent_spec or '', 'parent_category': summary.parent_category or '', 'is_enabled': summary.is_enabled, 'child_count': summary.child_count }) results.sort(key=lambda x: (x['bom_no'], x['version']), reverse=True) # 如果有关键词,二次过滤结果(忽略大小写) if keyword: kw = keyword.lower() results = [ r for r in results if kw in (r.get('parent_name') or '').lower() or kw in (r.get('parent_spec') or '').lower() or kw in (r.get('bom_no') or '').lower() or kw in (r.get('parent_category') or '').lower() ] # 按 parent_category 分组 grouped = defaultdict(list) for item in results: cat = item.get('parent_category') or '未分类' grouped[cat].append(item) grouped_list = [] for cat, items in sorted(grouped.items(), key=lambda x: x[0]): grouped_list.append({ 'category': cat, 'count': len(items), 'items': items }) return grouped_list @staticmethod def get_bom_detail(bom_no, version=None): """ 根据 bom_no (和 version) 获取配方详情。 Cache-Aside 模式(三步走): 1. 先查 Redis,有值直接返回(Cache Hit) 2. 无值或 Redis 报错,查数据库(Cache Miss → Fallback) 3. 数据库查好后写入 Redis,TTL=12h,供下次命中 注意:查询"最新版"时(version=None),缓存键不带版本后缀; 写入时也写入不带版本的键,这样无需指定 version 就能命中。 """ # ===== 第一步:尝试从 Redis 读取缓存 ===== cached = _cache_get(bom_no, version if version else None) if cached is not None: # Cache Hit:直接返回缓存数据,不再查库 logger.debug(f"[BOM] get_bom_detail bom_no={bom_no} version={version} → 命中缓存") return cached # ===== 第二步:Cache Miss → 查数据库 ===== logger.debug(f"[BOM] get_bom_detail bom_no={bom_no} version={version} → 查询数据库") query = db.session.query( BomTable, MaterialBase.name.label('child_name'), MaterialBase.spec_model.label('child_spec') ).join( MaterialBase, BomTable.child_id == MaterialBase.id ).filter( BomTable.bom_no == bom_no ) if version: query = query.filter(BomTable.version == version) else: latest_ver = db.session.query(BomTable.version).filter_by(bom_no=bom_no) \ .order_by(BomTable.version.desc()).limit(1).scalar() if not latest_ver: return None query = query.filter(BomTable.version == latest_ver) # 记录本次实际查的版本,用于缓存键 version = latest_ver rows = query.all() if not rows: return None first = rows[0] parent_id = first.BomTable.parent_id parent_material = MaterialBase.query.get(parent_id) children = [] for bom, child_name, child_spec in rows: children.append({ 'child_id': bom.child_id, 'child_name': child_name, 'child_spec': child_spec or '', 'dosage': float(bom.dosage) if bom.dosage else 0.0, 'remark': bom.remark or '' }) result = { 'bom_no': bom_no, 'version': first.BomTable.version, 'parent_id': parent_id, 'parent_name': parent_material.name if parent_material else '', 'parent_spec': parent_material.spec_model if parent_material else '', 'is_enabled': first.BomTable.is_enabled, 'children': children } # ===== 第三步:写入 Redis 缓存(TTL=12h),失败只打日志不阻断 ===== _cache_set(bom_no, version, result) return result @staticmethod def save_bom(data): """保存 BOM (支持多版本),新增跨版本内容查重""" bom_no = data.get('bom_no') version = data.get('version', 'V1.0') parent_id = data['parent_id'] children = data['children'] is_enabled = data.get('is_enabled', True) if not bom_no: raise ValueError('BOM编号不能为空') for child in children: if child['child_id'] == parent_id: raise ValueError('父件与子件不能是同一物料') # ===== 跨版本内容查重 ===== # 将当前提交的 children 转换为可比较的集合 (child_id, dosage) current_children_set = set() for child in children: # 用 (child_id, dosage) 元组表示,dosage 转为整数比较 dosage_val = int(child.get('dosage', 0)) if child.get('dosage') else 0 current_children_set.add((child['child_id'], dosage_val)) # 查询该 bom_no 下所有其他版本的子件配置 existing_versions = db.session.query( BomTable.version, BomTable.child_id, BomTable.dosage ).filter( BomTable.bom_no == bom_no, BomTable.version != version # 排除当前正在保存的版本 ).all() # 按版本分组,构建每个版本的子件集合 version_children = {} for ver, child_id, dosage in existing_versions: if ver not in version_children: version_children[ver] = set() dosage_val = int(dosage) if dosage else 0 version_children[ver].add((child_id, dosage_val)) # 比对每个版本 for ver, existing_set in version_children.items(): if current_children_set == existing_set: raise ValueError(f'保存失败!当前子件配置与已有版本 {ver} 完全一致,请勿重复保存') # ===== 执行保存 ===== # 仅删除当前版本的旧记录(改为对象级删除以触发审计事件) old_records = BomTable.query.filter_by(bom_no=bom_no, version=version).all() for rec in old_records: db.session.delete(rec) # 【核心修复】:强制立即执行 DELETE 语句,为后续的 INSERT 腾出唯一键空间 db.session.flush() for child in children: bom = BomTable( bom_no=bom_no, version=version, parent_id=parent_id, child_id=child['child_id'], dosage=child.get('dosage', 0), remark=child.get('remark', ''), is_enabled=is_enabled ) db.session.add(bom) db.session.commit() # ===== 写入后立刻清除缓存(Cache Invalidation) ===== # 确保后续 get_bom_detail 读取到最新数据,而不是 stale cache _cache_delete(bom_no, version) logger.info(f"[BOM Cache] save_bom → 缓存已失效 bom_no={bom_no} version={version}") return bom_no @staticmethod def get_bom_with_stock_by_bom_no(bom_no): """ 根据 bom_no 获取配方详情,并计算(已修复 N+1 性能问题) """ detail = BomService.get_bom_detail(bom_no) if not detail or not detail.get('children'): return detail # 1. 提取所有子件的 ID 列表 child_ids = [child['child_id'] for child in detail['children']] # 2. 用一条 IN 语句批量查出所有相关子件的库存和库位 stock_stats = db.session.query( StockBuy.base_id, func.coalesce(func.sum(StockBuy.available_quantity), 0).label('total_qty'), func.string_agg(distinct(StockBuy.warehouse_location), ', ').label('locations') ).filter( StockBuy.base_id.in_(child_ids), StockBuy.available_quantity > 0 ).group_by( StockBuy.base_id ).all() # 3. 将查询结果转换为字典 (Map),方便后续 O(1) 极速匹配 stock_map = { stat.base_id: { 'qty': stat.total_qty, 'loc': stat.locations if stat.locations else '' } for stat in stock_stats } # 4. 遍历组装数据(纯内存操作,极快) for child in detail['children']: base_id = child['child_id'] stat = stock_map.get(base_id, {'qty': 0, 'loc': ''}) stock_qty = float(stat['qty']) dosage = float(child['dosage']) if child.get('dosage') else 0 child['current_stock'] = stock_qty child['warehouse_location'] = stat['loc'] child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0 return detail # ====================== 兼容旧接口 ====================== @staticmethod def get_bom_no_by_parent(parent_id): row = BomTable.query.filter_by(parent_id=parent_id).order_by(BomTable.version.desc()).first() return row.bom_no if row else None @staticmethod def create_or_update_bom(parent_id, child_list, bom_no=None, version='V1.0'): if not bom_no: existing = BomTable.query.filter_by(parent_id=parent_id).first() bom_no = existing.bom_no if existing else BomService.generate_bom_no() # 改为对象级删除以触发审计事件 old_records = BomTable.query.filter_by(bom_no=bom_no, version=version).all() for rec in old_records: db.session.delete(rec) for item in child_list: bom = BomTable( bom_no=bom_no, version=version, parent_id=parent_id, child_id=item['child_id'], dosage=item.get('dosage', 0), remark=item.get('remark', '') ) db.session.add(bom) db.session.commit() # ===== 写入后立刻清除缓存(Cache Invalidation) ===== _cache_delete(bom_no, version) logger.info(f"[BOM Cache] create_or_update_bom → 缓存已失效 bom_no={bom_no} version={version}") return True @staticmethod def get_bom_with_stock(parent_id): bom_no = BomService.get_bom_no_by_parent(parent_id) if not bom_no: return [] detail = BomService.get_bom_with_stock_by_bom_no(bom_no) return detail['children'] if detail else []