2 Commits

Author SHA1 Message Date
dxc
8ba1ff37f0 V3.49 2026-06-16 14:54:03 +08:00
DXC
1450e6c1de fix(借还记录列表): 按 borrow_no 单号维度分页 + 修 SQLAlchemy Row 适配错误
- 分页基准从明细行改为单号:21 项单号不再被拆到 3 页

- 步骤 1a 构造 GROUP BY borrow_no 的 subquery(sort_key + status 聚合)

- 步骤 2 主查询 SELECT order_subq.c.borrow_no 一列,避免触发 PG GROUP BY 严格模式 (f405)

- 步骤 3 用 page_borrow_nos 拉明细,保留前端 groupMap 期望的 items 形态

- pagination.items 用 isinstance + hasattr(_mapping) 兜底提取纯字符串(修 psycopg2 can't adapt type 'Row')

- service 加 try-except,路由层识别 500 透传 traceback

- status 过滤改为单号聚合(borrowed=至少一条未还,returned=全部归还)
2026-06-16 14:53:42 +08:00
3 changed files with 328 additions and 217 deletions

View File

@ -131,6 +131,15 @@ def get_records():
search_type = request.args.get('search_type', 'all') search_type = request.args.get('search_type', 'all')
res = TransService.get_records(page=page, limit=10, status=status, keyword=keyword, search_type=search_type) res = TransService.get_records(page=page, limit=10, status=status, keyword=keyword, search_type=search_type)
# ★ service 层异常时code==500 的字典(带 traceback需要直通到前端便于排查
if isinstance(res, dict) and res.get('code') == 500:
return jsonify({
'code': 500,
'msg': res.get('msg', '服务内部错误'),
'trace': res.get('trace', '')
}), 500
# 字段级脱敏 # 字段级脱敏
user_permissions = get_current_user_permissions() user_permissions = get_current_user_permissions()
if res.get('items'): if res.get('items'):

View File

@ -6,7 +6,7 @@ from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase from app.models.base import MaterialBase
from sqlalchemy import desc, func, nullslast, asc, or_, and_ from sqlalchemy import desc, func, nullslast, asc, or_, and_, case
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
@ -325,241 +325,343 @@ class TransService:
@staticmethod @staticmethod
def get_records(page=1, limit=10, status='all', keyword=None, search_type='all'): def get_records(page=1, limit=10, status='all', keyword=None, search_type='all'):
q = TransBorrow.query """
获取借还记录列表(按单号 borrow_no 维度分页,避免明细撑爆 pageSize
# 如果有关键词,需要联表搜索物料名称和规格型号 实现思路(三步走):
if keyword: 步骤 1: 构造 GROUP BY borrow_no 的"单号维度视图" subquery
# 根据 search_type 构建不同的搜索条件 (包含 borrow_no + sort_key + 状态聚合,全部聚合都在这里完成)
if search_type == 'all': 步骤 2: 用一个【纯净的列查询】从 subquery 中分页得到 page_borrow_nos
# 原有逻辑or_ 联表全局模糊搜索 → SELECT 只有 borrow_no 一列,【主查询无 GROUP BY】
# 查询 stock_buy 路径匹配的名称/规格 → 避免触发 PG "column must appear in GROUP BY" 严格模式
buy_match = db.session.query(TransBorrow.id).join( 步骤 3: 用 page_borrow_nos 拉明细 + 预加载 material_name
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_semi 路径匹配的名称/规格 状态过滤按"单号聚合"判定:
semi_match = db.session.query(TransBorrow.id).join( - borrowed: 单号下至少有一条 is_returned=False
StockSemi, and_( - returned: 单号下所有明细 is_returned=True
TransBorrow.stock_id == StockSemi.id, """
TransBorrow.source_table == 'stock_semi' try:
) # ====================================================================
).join( # 步骤 1a构造"单号维度"基础子查询GROUP BY borrow_no 在这里完成)
MaterialBase, StockSemi.base_id == MaterialBase.id # ====================================================================
).filter( # 单号 + 排序键(最早 expected_return_time—— 这一层只含 2 列 + GROUP BY
or_( order_subq = (
MaterialBase.name.ilike(f'%{keyword}%'), db.session.query(
MaterialBase.spec_model.ilike(f'%{keyword}%') TransBorrow.borrow_no.label('borrow_no'),
) func.min(TransBorrow.expected_return_time).label('sort_key')
).subquery() )
.group_by(TransBorrow.borrow_no)
.subquery()
)
# 查询 stock_product 路径匹配的名称/规格 # 状态聚合子查询(也是 GROUP BY borrow_no
product_match = db.session.query(TransBorrow.id).join( status_subq = (
StockProduct, and_( db.session.query(
TransBorrow.stock_id == StockProduct.id, TransBorrow.borrow_no.label('borrow_no'),
TransBorrow.source_table == 'stock_product' func.sum(
) case((TransBorrow.is_returned == False, 1), else_=0)
).join( ).label('unreturned_count')
MaterialBase, StockProduct.base_id == MaterialBase.id )
).filter( .group_by(TransBorrow.borrow_no)
or_( .subquery()
MaterialBase.name.ilike(f'%{keyword}%'), )
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 合并三种来源的匹配 ID # ====================================================================
all_matches = db.session.query(buy_match.c.id).union( # 步骤 1b构造关键词命中单号子查询保留原全部 search_type 逻辑)
db.session.query(semi_match.c.id), # ====================================================================
db.session.query(product_match.c.id) keyword_conditions = None
).subquery() if keyword:
# 根据 search_type 构建不同的搜索条件
if search_type == 'all':
# 原有逻辑or_ 联表全局模糊搜索
# 查询 stock_buy 路径匹配的名称/规格
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
keyword_conditions = or_( # 查询 stock_semi 路径匹配的名称/规格
TransBorrow.borrower_name.ilike(f'%{keyword}%'), semi_match = db.session.query(TransBorrow.id).join(
TransBorrow.sku.ilike(f'%{keyword}%'), StockSemi, and_(
TransBorrow.borrow_no.ilike(f'%{keyword}%'), TransBorrow.stock_id == StockSemi.id,
TransBorrow.id.in_(all_matches) TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_product 路径匹配的名称/规格
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 合并三种来源的匹配 ID
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = or_(
TransBorrow.borrower_name.ilike(f'%{keyword}%'),
TransBorrow.sku.ilike(f'%{keyword}%'),
TransBorrow.borrow_no.ilike(f'%{keyword}%'),
TransBorrow.id.in_(all_matches)
)
elif search_type == 'no':
keyword_conditions = TransBorrow.borrow_no.ilike(f'%{keyword}%')
elif search_type == 'name':
keyword_conditions = TransBorrow.borrower_name.ilike(f'%{keyword}%')
elif search_type == 'sku':
keyword_conditions = TransBorrow.sku.ilike(f'%{keyword}%')
elif search_type == 'material_name':
# 联表查询物料名称
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
elif search_type == 'spec_model':
# 联表查询规格型号
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
# 把"命中的单号"独立成 subquery供主查询做 IN 过滤
keyword_borrow_nos_subq = None
if keyword_conditions is not None:
keyword_borrow_nos_subq = (
db.session.query(TransBorrow.borrow_no)
.filter(keyword_conditions)
.distinct()
.subquery()
) )
elif search_type == 'no': # ====================================================================
keyword_conditions = TransBorrow.borrow_no.ilike(f'%{keyword}%') # 步骤 2纯净列查询分页SELECT 只有 order_subq.c.borrow_no 一列)
# ====================================================================
borrow_no_q = db.session.query(order_subq.c.borrow_no)
elif search_type == 'name': # 关键词过滤
keyword_conditions = TransBorrow.borrower_name.ilike(f'%{keyword}%') if keyword_borrow_nos_subq is not None:
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(keyword_borrow_nos_subq)
)
elif search_type == 'sku': # 状态过滤(按"单号聚合"判定)
keyword_conditions = TransBorrow.sku.ilike(f'%{keyword}%') if status == 'borrowed':
# 单号下至少一条未还
elif search_type == 'material_name': borrow_no_q = borrow_no_q.filter(
# 联表查询物料名称 order_subq.c.borrow_no.in_(
buy_match = db.session.query(TransBorrow.id).join( db.session.query(status_subq.c.borrow_no)
StockBuy, and_( .filter(status_subq.c.unreturned_count > 0)
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
) )
).join( )
MaterialBase, StockBuy.base_id == MaterialBase.id elif status == 'returned':
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery() # 单号下所有明细都已归还
borrow_no_q = borrow_no_q.filter(
semi_match = db.session.query(TransBorrow.id).join( order_subq.c.borrow_no.in_(
StockSemi, and_( db.session.query(status_subq.c.borrow_no)
TransBorrow.stock_id == StockSemi.id, .filter(status_subq.c.unreturned_count == 0)
TransBorrow.source_table == 'stock_semi'
) )
).join( )
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join( # 排序(单号维度的 sort_key ASC
StockProduct, and_( borrow_no_q = borrow_no_q.order_by(nullslast(asc(order_subq.c.sort_key)))
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union( # 分页(基准 = borrow_no 单号数)
db.session.query(semi_match.c.id), pagination = borrow_no_q.paginate(page=page, per_page=limit, error_out=False)
db.session.query(product_match.c.id) # ★ pagination.items 是 SQLAlchemy Row 对象psycopg2 无法直接 adapt Row
).subquery() # 用 isinstance(row, tuple) 不够2.x 的 Row 不一定继承 tuple
# 用 hasattr(row, '_mapping') 兜底,强制提取 row[0] 拿到纯字符串
page_borrow_nos = [
row[0] if isinstance(row, tuple) or hasattr(row, '_mapping') else row
for row in pagination.items
]
total_orders = pagination.total # ★ 单号总数(修复前是明细数,分页错乱根因)
keyword_conditions = TransBorrow.id.in_(all_matches) if not page_borrow_nos:
return {
'items': [],
'total': total_orders,
'page': page,
'limit': limit
}
elif search_type == 'spec_model': # ====================================================================
# 联表查询规格型号 # 步骤 3按当前页 borrow_no 集合一次性拉出所有明细
buy_match = db.session.query(TransBorrow.id).join( # ====================================================================
StockBuy, and_( detail_records = (
TransBorrow.stock_id == StockBuy.id, TransBorrow.query
TransBorrow.source_table == 'stock_buy' .filter(TransBorrow.borrow_no.in_(page_borrow_nos))
) .order_by(TransBorrow.borrow_no.asc(), TransBorrow.id.asc())
).join( .all()
MaterialBase, StockBuy.base_id == MaterialBase.id )
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join( # ============================================================
StockSemi, and_( # ★ 批量预加载物料名称三步收集ID → 批量JOIN → SKU兜底
TransBorrow.stock_id == StockSemi.id, # ============================================================
TransBorrow.source_table == 'stock_semi' items_with_names = []
) items = detail_records
).join( if items:
MaterialBase, StockSemi.base_id == MaterialBase.id # 步骤 1收集所有 (source_table, stock_id) 对
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery() stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for item in items:
if item.source_table in stock_ids_by_table and item.stock_id:
stock_ids_by_table[item.source_table].add(item.stock_id)
product_match = db.session.query(TransBorrow.id).join( # 步骤 2批量查询库存表并 JOIN MaterialBase
StockProduct, and_( stock_map = {} # { ('stock_buy', 101): '物料名称', ... }
TransBorrow.stock_id == StockProduct.id, model_map = {
TransBorrow.source_table == 'stock_product' 'stock_buy': StockBuy,
) 'stock_semi': StockSemi,
).join( 'stock_product': StockProduct
MaterialBase, StockProduct.base_id == MaterialBase.id }
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery() for table_name, ids in stock_ids_by_table.items():
if not ids:
all_matches = db.session.query(buy_match.c.id).union( continue
db.session.query(semi_match.c.id), ModelClass = model_map.get(table_name)
db.session.query(product_match.c.id) if not ModelClass:
).subquery() continue
keyword_conditions = TransBorrow.id.in_(all_matches)
else:
keyword_conditions = None
else:
keyword_conditions = None
if keyword_conditions is not None:
q = q.filter(keyword_conditions)
if status == 'borrowed':
q = q.filter(TransBorrow.is_returned == False)
elif status == 'returned':
q = q.filter(TransBorrow.is_returned == True)
# 使用 distinct 防止跨表查询产生重复记录
q = q.distinct()
q = q.order_by(nullslast(asc(TransBorrow.expected_return_time)))
pagination = q.paginate(page=page, per_page=limit, error_out=False)
# ============================================================
# ★ 批量预加载物料名称两步收集ID → 批量 JOIN → 内存拼装)
# ============================================================
items_with_names = []
items = pagination.items
if items:
# 步骤 1收集所有 (source_table, stock_id) 对
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for item in items:
if item.source_table in stock_ids_by_table and item.stock_id:
stock_ids_by_table[item.source_table].add(item.stock_id)
# 步骤 2批量查询库存表并 JOIN MaterialBase
stock_map = {} # { ('stock_buy', 101): '物料名称', ... }
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
for table_name, ids in stock_ids_by_table.items():
if not ids:
continue
ModelClass = model_map.get(table_name)
if not ModelClass:
continue
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(ModelClass.id.in_(ids)).all()
for stock in stocks:
name = stock.base.name if stock.base else ''
stock_map[(table_name, stock.id)] = name
# 步骤 3前置收集 SKU 兜底候选集
empty_sku_set = set()
for item in items:
name = stock_map.get((item.source_table, item.stock_id), '')
if not name and item.sku:
empty_sku_set.add(item.sku)
# 步骤 3前置SKU 兜底批量查询
# 场景库存记录被跨表转移删旧建新trans_borrow.stock_id 指向孤立记录
# 通过 sku 在三张库存表中查找任意匹配,再通过 base_id 获取 MaterialBase.name
sku_name_map = {}
if empty_sku_set:
for ModelClass in [StockProduct, StockSemi, StockBuy]:
stocks = ModelClass.query.options( stocks = ModelClass.query.options(
joinedload(ModelClass.base) joinedload(ModelClass.base)
).filter( ).filter(ModelClass.id.in_(ids)).all()
ModelClass.sku.in_(empty_sku_set)
).all()
for stock in stocks: for stock in stocks:
if stock.sku not in sku_name_map and stock.base: name = stock.base.name if stock.base else ''
sku_name_map[stock.sku] = stock.base.name stock_map[(table_name, stock.id)] = name
# 步骤 3:为每条记录注入 material_name SKU 兜底 # 步骤 3(前置):收集 SKU 兜底候选集
for item in items: empty_sku_set = set()
item_dict = item.to_dict() for item in items:
material_name = stock_map.get((item.source_table, item.stock_id), '') name = stock_map.get((item.source_table, item.stock_id), '')
if not material_name and item.sku: if not name and item.sku:
material_name = sku_name_map.get(item.sku, '') empty_sku_set.add(item.sku)
item_dict['material_name'] = material_name
items_with_names.append(item_dict)
items_data = items_with_names # 步骤 3前置SKU 兜底批量查询
else: # 场景库存记录被跨表转移删旧建新trans_borrow.stock_id 指向孤立记录
items_data = [] # 通过 sku 在三张库存表中查找任意匹配,再通过 base_id 获取 MaterialBase.name
sku_name_map = {}
if empty_sku_set:
for ModelClass in [StockProduct, StockSemi, StockBuy]:
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(
ModelClass.sku.in_(empty_sku_set)
).all()
for stock in stocks:
if stock.sku not in sku_name_map and stock.base:
sku_name_map[stock.sku] = stock.base.name
return { # 步骤 3为每条记录注入 material_name含 SKU 兜底)
'items': items_data, for item in items:
'total': pagination.total, item_dict = item.to_dict()
'page': page, material_name = stock_map.get((item.source_table, item.stock_id), '')
'limit': limit if not material_name and item.sku:
} material_name = sku_name_map.get(item.sku, '')
item_dict['material_name'] = material_name
items_with_names.append(item_dict)
return {
'items': items_with_names,
'total': total_orders,
'page': page,
'limit': limit
}
except Exception as e:
# ★ 捕鼠器:把任何 SQL/运行时错误以 500 + traceback 返回,避免静默吞噬
import traceback
return {
'code': 500,
'msg': str(e),
'trace': traceback.format_exc(),
'items': [],
'total': 0,
'page': page,
'limit': limit
}

View File

@ -239,7 +239,7 @@ const handleLogout = () => {
<footer v-if="!isLoginPage" class="app-footer"> <footer v-if="!isLoginPage" class="app-footer">
<span class="version-tag"> <span class="version-tag">
<el-icon style="vertical-align: middle; margin-right: 4px"><InfoFilled /></el-icon> <el-icon style="vertical-align: middle; margin-right: 4px"><InfoFilled /></el-icon>
当前版本:V3.48 当前版本:V3.49
</span> </span>
</footer> </footer>