2 Commits

Author SHA1 Message Date
dxc
8ba1ff37f0 V3.49 2026-06-16 14:54:03 +08:00
DXC
1450e6c1de fix(借还记录列表): 按 borrow_no 单号维度分页 + 修 SQLAlchemy Row 适配错误
- 分页基准从明细行改为单号:21 项单号不再被拆到 3 页

- 步骤 1a 构造 GROUP BY borrow_no 的 subquery(sort_key + status 聚合)

- 步骤 2 主查询 SELECT order_subq.c.borrow_no 一列,避免触发 PG GROUP BY 严格模式 (f405)

- 步骤 3 用 page_borrow_nos 拉明细,保留前端 groupMap 期望的 items 形态

- pagination.items 用 isinstance + hasattr(_mapping) 兜底提取纯字符串(修 psycopg2 can't adapt type 'Row')

- service 加 try-except,路由层识别 500 透传 traceback

- status 过滤改为单号聚合(borrowed=至少一条未还,returned=全部归还)
2026-06-16 14:53:42 +08:00
3 changed files with 328 additions and 217 deletions

View File

@ -131,6 +131,15 @@ def get_records():
search_type = request.args.get('search_type', 'all')
res = TransService.get_records(page=page, limit=10, status=status, keyword=keyword, search_type=search_type)
# ★ service 层异常时code==500 的字典(带 traceback需要直通到前端便于排查
if isinstance(res, dict) and res.get('code') == 500:
return jsonify({
'code': 500,
'msg': res.get('msg', '服务内部错误'),
'trace': res.get('trace', '')
}), 500
# 字段级脱敏
user_permissions = get_current_user_permissions()
if res.get('items'):

View File

@ -6,7 +6,7 @@ from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
from sqlalchemy import desc, func, nullslast, asc, or_, and_
from sqlalchemy import desc, func, nullslast, asc, or_, and_, case
from sqlalchemy.orm import joinedload
@ -325,241 +325,343 @@ class TransService:
@staticmethod
def get_records(page=1, limit=10, status='all', keyword=None, search_type='all'):
q = TransBorrow.query
"""
获取借还记录列表(按单号 borrow_no 维度分页,避免明细撑爆 pageSize
# 如果有关键词,需要联表搜索物料名称和规格型号
if keyword:
# 根据 search_type 构建不同的搜索条件
if search_type == 'all':
# 原有逻辑or_ 联表全局模糊搜索
# 查询 stock_buy 路径匹配的名称/规格
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
实现思路(三步走):
步骤 1: 构造 GROUP BY borrow_no 的"单号维度视图" subquery
(包含 borrow_no + sort_key + 状态聚合,全部聚合都在这里完成)
步骤 2: 用一个【纯净的列查询】从 subquery 中分页得到 page_borrow_nos
→ SELECT 只有 borrow_no 一列,【主查询无 GROUP BY】
→ 避免触发 PG "column must appear in GROUP BY" 严格模式
步骤 3: 用 page_borrow_nos 拉明细 + 预加载 material_name
# 查询 stock_semi 路径匹配的名称/规格
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
状态过滤按"单号聚合"判定:
- borrowed: 单号下至少有一条 is_returned=False
- returned: 单号下所有明细 is_returned=True
"""
try:
# ====================================================================
# 步骤 1a构造"单号维度"基础子查询GROUP BY borrow_no 在这里完成)
# ====================================================================
# 单号 + 排序键(最早 expected_return_time—— 这一层只含 2 列 + GROUP BY
order_subq = (
db.session.query(
TransBorrow.borrow_no.label('borrow_no'),
func.min(TransBorrow.expected_return_time).label('sort_key')
)
.group_by(TransBorrow.borrow_no)
.subquery()
)
# 查询 stock_product 路径匹配的名称/规格
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 状态聚合子查询(也是 GROUP BY borrow_no
status_subq = (
db.session.query(
TransBorrow.borrow_no.label('borrow_no'),
func.sum(
case((TransBorrow.is_returned == False, 1), else_=0)
).label('unreturned_count')
)
.group_by(TransBorrow.borrow_no)
.subquery()
)
# 合并三种来源的匹配 ID
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
# ====================================================================
# 步骤 1b构造关键词命中单号子查询保留原全部 search_type 逻辑)
# ====================================================================
keyword_conditions = None
if keyword:
# 根据 search_type 构建不同的搜索条件
if search_type == 'all':
# 原有逻辑or_ 联表全局模糊搜索
# 查询 stock_buy 路径匹配的名称/规格
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
keyword_conditions = or_(
TransBorrow.borrower_name.ilike(f'%{keyword}%'),
TransBorrow.sku.ilike(f'%{keyword}%'),
TransBorrow.borrow_no.ilike(f'%{keyword}%'),
TransBorrow.id.in_(all_matches)
# 查询 stock_semi 路径匹配的名称/规格
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_product 路径匹配的名称/规格
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 合并三种来源的匹配 ID
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = or_(
TransBorrow.borrower_name.ilike(f'%{keyword}%'),
TransBorrow.sku.ilike(f'%{keyword}%'),
TransBorrow.borrow_no.ilike(f'%{keyword}%'),
TransBorrow.id.in_(all_matches)
)
elif search_type == 'no':
keyword_conditions = TransBorrow.borrow_no.ilike(f'%{keyword}%')
elif search_type == 'name':
keyword_conditions = TransBorrow.borrower_name.ilike(f'%{keyword}%')
elif search_type == 'sku':
keyword_conditions = TransBorrow.sku.ilike(f'%{keyword}%')
elif search_type == 'material_name':
# 联表查询物料名称
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
elif search_type == 'spec_model':
# 联表查询规格型号
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
# 把"命中的单号"独立成 subquery供主查询做 IN 过滤
keyword_borrow_nos_subq = None
if keyword_conditions is not None:
keyword_borrow_nos_subq = (
db.session.query(TransBorrow.borrow_no)
.filter(keyword_conditions)
.distinct()
.subquery()
)
elif search_type == 'no':
keyword_conditions = TransBorrow.borrow_no.ilike(f'%{keyword}%')
# ====================================================================
# 步骤 2纯净列查询分页SELECT 只有 order_subq.c.borrow_no 一列)
# ====================================================================
borrow_no_q = db.session.query(order_subq.c.borrow_no)
elif search_type == 'name':
keyword_conditions = TransBorrow.borrower_name.ilike(f'%{keyword}%')
# 关键词过滤
if keyword_borrow_nos_subq is not None:
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(keyword_borrow_nos_subq)
)
elif search_type == 'sku':
keyword_conditions = TransBorrow.sku.ilike(f'%{keyword}%')
elif search_type == 'material_name':
# 联表查询物料名称
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
# 状态过滤(按"单号聚合"判定)
if status == 'borrowed':
# 单号下至少一条未还
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(
db.session.query(status_subq.c.borrow_no)
.filter(status_subq.c.unreturned_count > 0)
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
elif status == 'returned':
# 单号下所有明细都已归还
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(
db.session.query(status_subq.c.borrow_no)
.filter(status_subq.c.unreturned_count == 0)
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
)
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
# 排序(单号维度的 sort_key ASC
borrow_no_q = borrow_no_q.order_by(nullslast(asc(order_subq.c.sort_key)))
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
# 分页(基准 = borrow_no 单号数)
pagination = borrow_no_q.paginate(page=page, per_page=limit, error_out=False)
# ★ pagination.items 是 SQLAlchemy Row 对象psycopg2 无法直接 adapt Row
# 用 isinstance(row, tuple) 不够2.x 的 Row 不一定继承 tuple
# 用 hasattr(row, '_mapping') 兜底,强制提取 row[0] 拿到纯字符串
page_borrow_nos = [
row[0] if isinstance(row, tuple) or hasattr(row, '_mapping') else row
for row in pagination.items
]
total_orders = pagination.total # ★ 单号总数(修复前是明细数,分页错乱根因)
keyword_conditions = TransBorrow.id.in_(all_matches)
if not page_borrow_nos:
return {
'items': [],
'total': total_orders,
'page': page,
'limit': limit
}
elif search_type == 'spec_model':
# 联表查询规格型号
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
# ====================================================================
# 步骤 3按当前页 borrow_no 集合一次性拉出所有明细
# ====================================================================
detail_records = (
TransBorrow.query
.filter(TransBorrow.borrow_no.in_(page_borrow_nos))
.order_by(TransBorrow.borrow_no.asc(), TransBorrow.id.asc())
.all()
)
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
# ============================================================
# ★ 批量预加载物料名称三步收集ID → 批量JOIN → SKU兜底
# ============================================================
items_with_names = []
items = detail_records
if items:
# 步骤 1收集所有 (source_table, stock_id) 对
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for item in items:
if item.source_table in stock_ids_by_table and item.stock_id:
stock_ids_by_table[item.source_table].add(item.stock_id)
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
else:
keyword_conditions = None
else:
keyword_conditions = None
if keyword_conditions is not None:
q = q.filter(keyword_conditions)
if status == 'borrowed':
q = q.filter(TransBorrow.is_returned == False)
elif status == 'returned':
q = q.filter(TransBorrow.is_returned == True)
# 使用 distinct 防止跨表查询产生重复记录
q = q.distinct()
q = q.order_by(nullslast(asc(TransBorrow.expected_return_time)))
pagination = q.paginate(page=page, per_page=limit, error_out=False)
# ============================================================
# ★ 批量预加载物料名称两步收集ID → 批量 JOIN → 内存拼装)
# ============================================================
items_with_names = []
items = pagination.items
if items:
# 步骤 1收集所有 (source_table, stock_id) 对
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for item in items:
if item.source_table in stock_ids_by_table and item.stock_id:
stock_ids_by_table[item.source_table].add(item.stock_id)
# 步骤 2批量查询库存表并 JOIN MaterialBase
stock_map = {} # { ('stock_buy', 101): '物料名称', ... }
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
for table_name, ids in stock_ids_by_table.items():
if not ids:
continue
ModelClass = model_map.get(table_name)
if not ModelClass:
continue
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(ModelClass.id.in_(ids)).all()
for stock in stocks:
name = stock.base.name if stock.base else ''
stock_map[(table_name, stock.id)] = name
# 步骤 3前置收集 SKU 兜底候选集
empty_sku_set = set()
for item in items:
name = stock_map.get((item.source_table, item.stock_id), '')
if not name and item.sku:
empty_sku_set.add(item.sku)
# 步骤 3前置SKU 兜底批量查询
# 场景库存记录被跨表转移删旧建新trans_borrow.stock_id 指向孤立记录
# 通过 sku 在三张库存表中查找任意匹配,再通过 base_id 获取 MaterialBase.name
sku_name_map = {}
if empty_sku_set:
for ModelClass in [StockProduct, StockSemi, StockBuy]:
# 步骤 2批量查询库存表并 JOIN MaterialBase
stock_map = {} # { ('stock_buy', 101): '物料名称', ... }
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
for table_name, ids in stock_ids_by_table.items():
if not ids:
continue
ModelClass = model_map.get(table_name)
if not ModelClass:
continue
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(
ModelClass.sku.in_(empty_sku_set)
).all()
).filter(ModelClass.id.in_(ids)).all()
for stock in stocks:
if stock.sku not in sku_name_map and stock.base:
sku_name_map[stock.sku] = stock.base.name
name = stock.base.name if stock.base else ''
stock_map[(table_name, stock.id)] = name
# 步骤 3:为每条记录注入 material_name SKU 兜底
for item in items:
item_dict = item.to_dict()
material_name = stock_map.get((item.source_table, item.stock_id), '')
if not material_name and item.sku:
material_name = sku_name_map.get(item.sku, '')
item_dict['material_name'] = material_name
items_with_names.append(item_dict)
# 步骤 3(前置):收集 SKU 兜底候选集
empty_sku_set = set()
for item in items:
name = stock_map.get((item.source_table, item.stock_id), '')
if not name and item.sku:
empty_sku_set.add(item.sku)
items_data = items_with_names
else:
items_data = []
# 步骤 3前置SKU 兜底批量查询
# 场景库存记录被跨表转移删旧建新trans_borrow.stock_id 指向孤立记录
# 通过 sku 在三张库存表中查找任意匹配,再通过 base_id 获取 MaterialBase.name
sku_name_map = {}
if empty_sku_set:
for ModelClass in [StockProduct, StockSemi, StockBuy]:
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(
ModelClass.sku.in_(empty_sku_set)
).all()
for stock in stocks:
if stock.sku not in sku_name_map and stock.base:
sku_name_map[stock.sku] = stock.base.name
return {
'items': items_data,
'total': pagination.total,
'page': page,
'limit': limit
}
# 步骤 3为每条记录注入 material_name含 SKU 兜底)
for item in items:
item_dict = item.to_dict()
material_name = stock_map.get((item.source_table, item.stock_id), '')
if not material_name and item.sku:
material_name = sku_name_map.get(item.sku, '')
item_dict['material_name'] = material_name
items_with_names.append(item_dict)
return {
'items': items_with_names,
'total': total_orders,
'page': page,
'limit': limit
}
except Exception as e:
# ★ 捕鼠器:把任何 SQL/运行时错误以 500 + traceback 返回,避免静默吞噬
import traceback
return {
'code': 500,
'msg': str(e),
'trace': traceback.format_exc(),
'items': [],
'total': 0,
'page': page,
'limit': limit
}

View File

@ -239,7 +239,7 @@ const handleLogout = () => {
<footer v-if="!isLoginPage" class="app-footer">
<span class="version-tag">
<el-icon style="vertical-align: middle; margin-right: 4px"><InfoFilled /></el-icon>
当前版本:V3.48
当前版本:V3.49
</span>
</footer>