perf: 消除出库列表和还库操作的 N+1 查询,改用批量 IN + joinedload

This commit is contained in:
DXC
2026-05-19 09:49:30 +08:00
parent d60e1c5188
commit 48651ffd01
2 changed files with 111 additions and 51 deletions

View File

@ -1,6 +1,7 @@
import uuid # .material -> .base refactor checked
from datetime import datetime, timezone, timedelta
from sqlalchemy import or_, func, desc, and_
from sqlalchemy.orm import joinedload
from app.extensions import db
from app.models.outbound import TransOutbound, OutboundApproval
@ -475,6 +476,37 @@ class OutboundService:
'stock_product': StockProduct
}
# ==========================================
# ★ 优化步骤 1第一遍循环单纯收集所有的 stock_id
# ==========================================
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for d in details:
if d.source_table in stock_ids_by_table and d.stock_id:
stock_ids_by_table[d.source_table].add(d.stock_id)
# ==========================================
# ★ 优化步骤 2发起批量查询并强制 JOIN 基础物料表
# ==========================================
# 格式: { ('stock_buy', 101): stock_obj, ... }
preloaded_stocks = {}
for table_name, ids in stock_ids_by_table.items():
if not ids:
continue
ModelClass = model_map[table_name]
# 魔法在这里in_() 一次性查出所有库存joinedload 顺便把 base 表的数据一起拉回来
items = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(ModelClass.id.in_(ids)).all()
for item in items:
preloaded_stocks[(table_name, item.id)] = item
# ==========================================
# ★ 优化步骤 3第二遍循环纯内存拼装极速
# ==========================================
for d in details:
ono = d.outbound_no
if ono not in grouped_map:
@ -490,34 +522,20 @@ class OutboundService:
'items': []
}
# --- 查询物品详细信息 (名称, 规格, 类型, 类别, 批号/SN) ---
item_name = "未知物品"
item_spec = ""
item_cat = ""
item_type = ""
batch_sn = "-"
# --- 直接从内存字典中获取O(1) 复杂度,绝对不触发 SQL ---
item_name, item_spec, item_cat, item_type, batch_sn = "未知物品", "", "", "", "-"
ModelClass = model_map.get(d.source_table)
if ModelClass and d.stock_id:
try:
stock_item = ModelClass.query.get(d.stock_id)
if stock_item:
# 获取批号/序列号用于追溯
batch_sn = getattr(stock_item, 'batch_number', None) or getattr(stock_item, 'serial_number', None) or '-'
if stock_item.base:
item_name = stock_item.base.name
item_spec = stock_item.base.spec_model
item_cat = stock_item.base.category
item_type = stock_item.base.material_type
elif stock_item and hasattr(stock_item, 'base_id') and stock_item.base_id:
base_info = MaterialBase.query.get(stock_item.base_id)
if base_info:
item_name = base_info.name
item_spec = base_info.spec_model
item_cat = base_info.category
item_type = base_info.material_type
except Exception as e:
print(f"Error fetching detail for stock_id {d.stock_id}: {e}")
stock_item = preloaded_stocks.get((d.source_table, d.stock_id))
if stock_item:
batch_sn = getattr(stock_item, 'batch_number', None) or getattr(stock_item, 'serial_number', None) or '-'
# 因为前面用了 joinedload这里调用 .base 瞬间返回,不会去查数据库
if stock_item.base:
item_name = stock_item.base.name
item_spec = stock_item.base.spec_model
item_cat = stock_item.base.category
item_type = stock_item.base.material_type
# 计算金额
price = float(d.unit_price) if d.unit_price else 0