Files
KCGL/inventory-backend/app/services/trans_service.py
DXC 1450e6c1de fix(借还记录列表): 按 borrow_no 单号维度分页 + 修 SQLAlchemy Row 适配错误
- 分页基准从明细行改为单号:21 项单号不再被拆到 3 页

- 步骤 1a 构造 GROUP BY borrow_no 的 subquery(sort_key + status 聚合)

- 步骤 2 主查询 SELECT order_subq.c.borrow_no 一列,避免触发 PG GROUP BY 严格模式 (f405)

- 步骤 3 用 page_borrow_nos 拉明细,保留前端 groupMap 期望的 items 形态

- pagination.items 用 isinstance + hasattr(_mapping) 兜底提取纯字符串(修 psycopg2 can't adapt type 'Row')

- service 加 try-except,路由层识别 500 透传 traceback

- status 过滤改为单号聚合(borrowed=至少一条未还,returned=全部归还)
2026-06-16 14:53:42 +08:00

668 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid # .material -> .base refactor checked
from datetime import datetime
from app.extensions import db
from app.models.transaction import TransBorrow
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
from sqlalchemy import desc, func, nullslast, asc, or_, and_, case
from sqlalchemy.orm import joinedload
class TransService:
@staticmethod
def generate_borrow_no():
"""
生成借用单号: BOR-yyyyMMdd-0001 (按日流水)
逻辑:统计当天已存在的不同借用单号数量,+1 作为新序号
"""
now = datetime.now()
date_str = now.strftime('%Y%m%d')
prefix = f"BOR-{date_str}-"
# 使用 count distinct 来计算当天有多少个不同的借用单 (因为一单多货会占多行)
count = db.session.query(func.count(func.distinct(TransBorrow.borrow_no))) \
.filter(TransBorrow.borrow_no.like(f"{prefix}%")).scalar()
sequence = count + 1
return f"{prefix}{sequence:04d}"
@staticmethod
def execute_dispatch(approval_id, items, operator_name='System', borrower_name=None,
signature=None, remark=None, expected_return_time=None):
"""
执行借库扣减(审批通过后调用)
流程:锁审批单 → 构建审批上限字典 → 锁库存行 → 名称规格校验 → 扣减库存 → 生成 TransBorrow 记录 → 标记审批单完成
★ 关键设计:审批维度是 (name, spec_model) 而非 SKU
借库申请是按【名称 + 规格型号】发起的borrow_service 强制要求 name/spec_model/quantity 三字段),
申请时尚未绑定具体库存行;扫码出库时通过锁定 stock 行回查 material_base 表,
用 (name, spec_model) 与审批单做物料维度聚合比对,避免 sku 维度坍塌或绕过。
"""
from app.models.borrow import BorrowApproval
if not items: raise ValueError("物品列表为空")
if not signature: raise ValueError("借用人必须签字")
# ==============================================
# ★ 防线1并发防重复执行 - 用 SELECT FOR UPDATE 锁住审批单
# ==============================================
approval = BorrowApproval.query.with_for_update().get(approval_id)
if not approval:
raise ValueError("审批单不存在")
if approval.status != 1:
status_map = {0: '待审批', 1: '已通过', 2: '已驳回', 3: '已完成'}
raise ValueError(f"审批单状态为【{status_map.get(approval.status, approval.status)}】,无法执行借库")
# ★ borrower_name 兜底:优先用前端传参,其次从审批单读取(申请时填写的姓名)
if not borrower_name:
borrower_name = approval.borrower_name
if not borrower_name:
raise ValueError("审批单中未记录借库人姓名,请联系管理员补录")
# ==============================================
# ★ 防线2构建审批上限字典按 名称+规格 聚合strip 防止匹配失败)
# Key = (name, spec_model)Value = 该物料累计允许借出数量
# ==============================================
approved_items = approval.get_items()
if not approved_items:
raise ValueError("审批单中无物料明细,请联系管理员检查")
approval_limits = {}
for ai in approved_items:
key = (
(ai.get('name') or '').strip(),
(ai.get('spec_model') or '').strip()
)
approval_limits[key] = approval_limits.get(key, 0) + float(ai.get('quantity', 0))
# 累计本次扫码出库量key 与 approval_limits 完全一致)
dispatch_acc = {}
borrow_no = TransService.generate_borrow_no()
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
try:
for item in items:
source_table = item.get('source_table')
stock_id = item.get('id')
qty = float(item.get('out_quantity', 0))
ModelClass = model_map.get(source_table)
if not ModelClass: continue
# ==============================================
# ★ 防线3并发超卖与负库存 - 锁行后再查可用库存
# ⚠️ 不要在此加 joinedload(ModelClass.base)PG 禁止 FOR UPDATE
# 应用到 outer join 的 nullable 侧,会报 FeatureNotSupported
# 并有死锁风险。stock.base 走单条 lazy 加载是已知取舍。
# ==============================================
stock = ModelClass.query.with_for_update().get(stock_id)
if not stock: raise ValueError(f"库存不存在 ID:{stock_id}")
# ==============================================
# ★ 防线4名称+规格 超额校验(动态累加、即时拦截)
# 库存表本身没有 name/spec_model 字段,通过 base 关联到 material_base
# ==============================================
if stock.base:
stock_name = (stock.base.name or '').strip()
stock_spec = (stock.base.spec_model or '').strip()
else:
stock_name = ''
stock_spec = ''
key = (stock_name, stock_spec)
limit = approval_limits.get(key)
if limit is None:
raise ValueError(
f"扫码物料【{stock_name} / {stock_spec}】不在审批单允许范围内,"
f"请检查审批单明细或重新发起申请"
)
dispatch_acc[key] = dispatch_acc.get(key, 0) + qty
current_total = dispatch_acc[key]
if current_total > limit:
raise ValueError(
f"实际出库数量超出了审批单允许的上限: "
f"物料={stock_name}({stock_spec}) "
f"审批上限={limit}, 实际扫码={current_total}"
)
if float(stock.available_quantity) < qty:
raise ValueError(f"物料【{stock_name} / {stock_spec}】可用库存不足")
# 1. 冻结库存 (只减可用)
stock.available_quantity = float(stock.available_quantity) - qty
# 2. 创建借用记录
record = TransBorrow(
borrow_no=borrow_no,
sku=stock.sku,
source_table=source_table,
stock_id=stock.id,
barcode=stock.barcode,
quantity=qty,
borrower_name=borrower_name,
borrow_signature=signature,
remark=remark,
expected_return_time=expected_return_time,
status='borrowed',
is_returned=False
)
db.session.add(record)
# ★ 3. 标记审批单为已完成
approval.status = 3
db.session.commit()
return borrow_no
except Exception as e:
db.session.rollback()
raise e
# ★ 兼容旧入口(不走审批流的直接借库,保留以便平滑过渡)
@staticmethod
def create_borrow(data, operator_name='System'):
"""
借库逻辑(兼容旧模式):减少可用库存,不减总库存
@deprecated 请优先使用 execute_dispatch 走审批流
"""
return TransService.execute_dispatch(
approval_id=0,
items=data.get('items', []),
operator_name=operator_name,
borrower_name=data.get('borrower_name'),
signature=data.get('signature_path'),
remark=data.get('remark'),
expected_return_time=data.get('expected_return_time')
)
@staticmethod
def scan_for_return(barcode):
"""
扫码还库:查找未归还记录,并返回当前物品的库位
"""
records = TransBorrow.query.filter_by(barcode=barcode, is_returned=False).all()
if not records:
return None
# 取第一条未还记录
record = records[0]
# 获取当前库存表中的实时库位
current_location = ""
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
ModelClass = model_map.get(record.source_table)
if ModelClass:
stock = ModelClass.query.get(record.stock_id)
if stock:
current_location = stock.warehouse_location
res_dict = record.to_dict()
res_dict['current_location'] = current_location # 用于前端对比和预填
return res_dict
@staticmethod
def process_return(data, operator_name):
"""
还库逻辑(支持部分归还)- 已优化,消除 N+1 和长事务死锁风险
四步走策略:
1. 收集所有 borrow_id
2. 批量锁定借用记录
3. 收集库存ID并批量锁定库存
4. 内存中完成业务逻辑
"""
items = data.get('items', [])
signature = data.get('signature_path') # 库管签字
if not items: raise ValueError("还库列表为空")
if not signature: raise ValueError("库管必须签字确认")
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
try:
# ==========================================
# ★ 优化步骤 1收集所有 borrow_id
# ==========================================
borrow_ids = []
item_map = {} # 存储原始 item 数据key=borrow_id
for item in items:
borrow_id = item.get('id')
if borrow_id:
borrow_ids.append(borrow_id)
item_map[borrow_id] = {
'return_qty': float(item.get('return_qty', 0)),
'final_location': item.get('return_location')
}
if not borrow_ids:
raise ValueError("没有有效的归还记录")
# ==========================================
# ★ 优化步骤 2批量锁定借用记录
# ==========================================
borrow_records = TransBorrow.query.with_for_update().filter(
TransBorrow.id.in_(borrow_ids)
).all()
borrow_map = {r.id: r for r in borrow_records}
# ==========================================
# ★ 优化步骤 3收集库存ID并批量锁定库存
# ==========================================
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for borrow_id, record in borrow_map.items():
if record.source_table in stock_ids_by_table and record.stock_id:
stock_ids_by_table[record.source_table].add(record.stock_id)
stock_map = {} # 格式: { ('stock_buy', 101): stock_obj }
for table_name, ids in stock_ids_by_table.items():
if not ids:
continue
ModelClass = model_map[table_name]
stocks = ModelClass.query.with_for_update().filter(
ModelClass.id.in_(ids)
).all()
for stock in stocks:
stock_map[(table_name, stock.id)] = stock
# ==========================================
# ★ 优化步骤 4内存中完成业务逻辑
# ==========================================
for borrow_id, item_data in item_map.items():
return_qty = item_data['return_qty']
final_location = item_data['final_location']
record = borrow_map.get(borrow_id)
if not record:
continue
# 计算待还数量
returned_qty = float(record.returned_quantity) if record.returned_quantity else 0
total_qty = float(record.quantity) if record.quantity else 0
pending_qty = total_qty - returned_qty
# 校验归还数量
if return_qty <= 0:
raise ValueError(f"归还数量必须大于0")
if return_qty > pending_qty:
raise ValueError(f"本次归还数量({return_qty})不能大于待还数量({pending_qty})")
# 更新库存
stock = stock_map.get((record.source_table, record.stock_id))
if stock:
# 恢复可用库存
stock.available_quantity = float(stock.available_quantity) + return_qty
# 更新库位
if final_location:
stock.warehouse_location = final_location
# 更新归还数量和状态
new_returned_qty = returned_qty + return_qty
record.returned_quantity = new_returned_qty
if new_returned_qty >= total_qty:
record.is_returned = True
record.status = 'returned'
else:
record.is_returned = False
record.status = 'partial_returned'
record.return_time = datetime.now()
record.return_operator = operator_name
record.return_signature = signature
if final_location:
record.return_location = final_location
db.session.commit()
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_records(page=1, limit=10, status='all', keyword=None, search_type='all'):
"""
获取借还记录列表(按单号 borrow_no 维度分页,避免明细撑爆 pageSize
实现思路(三步走):
步骤 1: 构造 GROUP BY borrow_no 的"单号维度视图" subquery
(包含 borrow_no + sort_key + 状态聚合,全部聚合都在这里完成)
步骤 2: 用一个【纯净的列查询】从 subquery 中分页得到 page_borrow_nos
→ SELECT 只有 borrow_no 一列,【主查询无 GROUP BY】
→ 避免触发 PG "column must appear in GROUP BY" 严格模式
步骤 3: 用 page_borrow_nos 拉明细 + 预加载 material_name
状态过滤按"单号聚合"判定:
- borrowed: 单号下至少有一条 is_returned=False
- returned: 单号下所有明细 is_returned=True
"""
try:
# ====================================================================
# 步骤 1a构造"单号维度"基础子查询GROUP BY borrow_no 在这里完成)
# ====================================================================
# 单号 + 排序键(最早 expected_return_time—— 这一层只含 2 列 + GROUP BY
order_subq = (
db.session.query(
TransBorrow.borrow_no.label('borrow_no'),
func.min(TransBorrow.expected_return_time).label('sort_key')
)
.group_by(TransBorrow.borrow_no)
.subquery()
)
# 状态聚合子查询(也是 GROUP BY borrow_no
status_subq = (
db.session.query(
TransBorrow.borrow_no.label('borrow_no'),
func.sum(
case((TransBorrow.is_returned == False, 1), else_=0)
).label('unreturned_count')
)
.group_by(TransBorrow.borrow_no)
.subquery()
)
# ====================================================================
# 步骤 1b构造关键词命中单号子查询保留原全部 search_type 逻辑)
# ====================================================================
keyword_conditions = None
if keyword:
# 根据 search_type 构建不同的搜索条件
if search_type == 'all':
# 原有逻辑or_ 联表全局模糊搜索
# 查询 stock_buy 路径匹配的名称/规格
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_semi 路径匹配的名称/规格
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_product 路径匹配的名称/规格
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 合并三种来源的匹配 ID
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = or_(
TransBorrow.borrower_name.ilike(f'%{keyword}%'),
TransBorrow.sku.ilike(f'%{keyword}%'),
TransBorrow.borrow_no.ilike(f'%{keyword}%'),
TransBorrow.id.in_(all_matches)
)
elif search_type == 'no':
keyword_conditions = TransBorrow.borrow_no.ilike(f'%{keyword}%')
elif search_type == 'name':
keyword_conditions = TransBorrow.borrower_name.ilike(f'%{keyword}%')
elif search_type == 'sku':
keyword_conditions = TransBorrow.sku.ilike(f'%{keyword}%')
elif search_type == 'material_name':
# 联表查询物料名称
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
elif search_type == 'spec_model':
# 联表查询规格型号
buy_match = db.session.query(TransBorrow.id).join(
StockBuy, and_(
TransBorrow.stock_id == StockBuy.id,
TransBorrow.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransBorrow.id).join(
StockSemi, and_(
TransBorrow.stock_id == StockSemi.id,
TransBorrow.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransBorrow.id).join(
StockProduct, and_(
TransBorrow.stock_id == StockProduct.id,
TransBorrow.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.id).union(
db.session.query(semi_match.c.id),
db.session.query(product_match.c.id)
).subquery()
keyword_conditions = TransBorrow.id.in_(all_matches)
# 把"命中的单号"独立成 subquery供主查询做 IN 过滤
keyword_borrow_nos_subq = None
if keyword_conditions is not None:
keyword_borrow_nos_subq = (
db.session.query(TransBorrow.borrow_no)
.filter(keyword_conditions)
.distinct()
.subquery()
)
# ====================================================================
# 步骤 2纯净列查询分页SELECT 只有 order_subq.c.borrow_no 一列)
# ====================================================================
borrow_no_q = db.session.query(order_subq.c.borrow_no)
# 关键词过滤
if keyword_borrow_nos_subq is not None:
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(keyword_borrow_nos_subq)
)
# 状态过滤(按"单号聚合"判定)
if status == 'borrowed':
# 单号下至少一条未还
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(
db.session.query(status_subq.c.borrow_no)
.filter(status_subq.c.unreturned_count > 0)
)
)
elif status == 'returned':
# 单号下所有明细都已归还
borrow_no_q = borrow_no_q.filter(
order_subq.c.borrow_no.in_(
db.session.query(status_subq.c.borrow_no)
.filter(status_subq.c.unreturned_count == 0)
)
)
# 排序(单号维度的 sort_key ASC
borrow_no_q = borrow_no_q.order_by(nullslast(asc(order_subq.c.sort_key)))
# 分页(基准 = borrow_no 单号数)
pagination = borrow_no_q.paginate(page=page, per_page=limit, error_out=False)
# ★ pagination.items 是 SQLAlchemy Row 对象psycopg2 无法直接 adapt Row
# 用 isinstance(row, tuple) 不够2.x 的 Row 不一定继承 tuple
# 用 hasattr(row, '_mapping') 兜底,强制提取 row[0] 拿到纯字符串
page_borrow_nos = [
row[0] if isinstance(row, tuple) or hasattr(row, '_mapping') else row
for row in pagination.items
]
total_orders = pagination.total # ★ 单号总数(修复前是明细数,分页错乱根因)
if not page_borrow_nos:
return {
'items': [],
'total': total_orders,
'page': page,
'limit': limit
}
# ====================================================================
# 步骤 3按当前页 borrow_no 集合一次性拉出所有明细
# ====================================================================
detail_records = (
TransBorrow.query
.filter(TransBorrow.borrow_no.in_(page_borrow_nos))
.order_by(TransBorrow.borrow_no.asc(), TransBorrow.id.asc())
.all()
)
# ============================================================
# ★ 批量预加载物料名称三步收集ID → 批量JOIN → SKU兜底
# ============================================================
items_with_names = []
items = detail_records
if items:
# 步骤 1收集所有 (source_table, stock_id) 对
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
for item in items:
if item.source_table in stock_ids_by_table and item.stock_id:
stock_ids_by_table[item.source_table].add(item.stock_id)
# 步骤 2批量查询库存表并 JOIN MaterialBase
stock_map = {} # { ('stock_buy', 101): '物料名称', ... }
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
for table_name, ids in stock_ids_by_table.items():
if not ids:
continue
ModelClass = model_map.get(table_name)
if not ModelClass:
continue
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(ModelClass.id.in_(ids)).all()
for stock in stocks:
name = stock.base.name if stock.base else ''
stock_map[(table_name, stock.id)] = name
# 步骤 3前置收集 SKU 兜底候选集
empty_sku_set = set()
for item in items:
name = stock_map.get((item.source_table, item.stock_id), '')
if not name and item.sku:
empty_sku_set.add(item.sku)
# 步骤 3前置SKU 兜底批量查询
# 场景库存记录被跨表转移删旧建新trans_borrow.stock_id 指向孤立记录
# 通过 sku 在三张库存表中查找任意匹配,再通过 base_id 获取 MaterialBase.name
sku_name_map = {}
if empty_sku_set:
for ModelClass in [StockProduct, StockSemi, StockBuy]:
stocks = ModelClass.query.options(
joinedload(ModelClass.base)
).filter(
ModelClass.sku.in_(empty_sku_set)
).all()
for stock in stocks:
if stock.sku not in sku_name_map and stock.base:
sku_name_map[stock.sku] = stock.base.name
# 步骤 3为每条记录注入 material_name含 SKU 兜底)
for item in items:
item_dict = item.to_dict()
material_name = stock_map.get((item.source_table, item.stock_id), '')
if not material_name and item.sku:
material_name = sku_name_map.get(item.sku, '')
item_dict['material_name'] = material_name
items_with_names.append(item_dict)
return {
'items': items_with_names,
'total': total_orders,
'page': page,
'limit': limit
}
except Exception as e:
# ★ 捕鼠器:把任何 SQL/运行时错误以 500 + traceback 返回,避免静默吞噬
import traceback
return {
'code': 500,
'msg': str(e),
'trace': traceback.format_exc(),
'items': [],
'total': 0,
'page': page,
'limit': limit
}