- 83b3db6 引入的 joinedload(ModelClass.base) 触发 LEFT OUTER JOIN,
而 with_for_update() 会被 SQLAlchemy 透传到 join 的 nullable 侧,
PG 直接抛 FeatureNotSupported,且连表加锁有死锁风险
- 退回最安全的单表 FOR UPDATE 模式,接受 N+1 lazy 加载的代价
- 在 防线3 上方加防回归注释,明确禁止未来再加 joinedload
- process_return 中的另两处 joinedload 不带 FOR UPDATE,不受 PG 限制,保留
566 lines
24 KiB
Python
566 lines
24 KiB
Python
import uuid # .material -> .base refactor checked
|
||
from datetime import datetime
|
||
from app.extensions import db
|
||
from app.models.transaction import TransBorrow
|
||
from app.models.inbound.buy import StockBuy
|
||
from app.models.inbound.semi import StockSemi
|
||
from app.models.inbound.product import StockProduct
|
||
from app.models.base import MaterialBase
|
||
from sqlalchemy import desc, func, nullslast, asc, or_, and_
|
||
from sqlalchemy.orm import joinedload
|
||
|
||
|
||
class TransService:
|
||
|
||
@staticmethod
|
||
def generate_borrow_no():
|
||
"""
|
||
生成借用单号: BOR-yyyyMMdd-0001 (按日流水)
|
||
逻辑:统计当天已存在的不同借用单号数量,+1 作为新序号
|
||
"""
|
||
now = datetime.now()
|
||
date_str = now.strftime('%Y%m%d')
|
||
prefix = f"BOR-{date_str}-"
|
||
|
||
# 使用 count distinct 来计算当天有多少个不同的借用单 (因为一单多货会占多行)
|
||
count = db.session.query(func.count(func.distinct(TransBorrow.borrow_no))) \
|
||
.filter(TransBorrow.borrow_no.like(f"{prefix}%")).scalar()
|
||
|
||
sequence = count + 1
|
||
return f"{prefix}{sequence:04d}"
|
||
|
||
@staticmethod
|
||
def execute_dispatch(approval_id, items, operator_name='System', borrower_name=None,
|
||
signature=None, remark=None, expected_return_time=None):
|
||
"""
|
||
执行借库扣减(审批通过后调用)
|
||
流程:锁审批单 → 构建审批上限字典 → 锁库存行 → 名称规格校验 → 扣减库存 → 生成 TransBorrow 记录 → 标记审批单完成
|
||
|
||
★ 关键设计:审批维度是 (name, spec_model) 而非 SKU
|
||
借库申请是按【名称 + 规格型号】发起的(borrow_service 强制要求 name/spec_model/quantity 三字段),
|
||
申请时尚未绑定具体库存行;扫码出库时通过锁定 stock 行回查 material_base 表,
|
||
用 (name, spec_model) 与审批单做物料维度聚合比对,避免 sku 维度坍塌或绕过。
|
||
"""
|
||
from app.models.borrow import BorrowApproval
|
||
|
||
if not items: raise ValueError("物品列表为空")
|
||
if not signature: raise ValueError("借用人必须签字")
|
||
|
||
# ==============================================
|
||
# ★ 防线1:并发防重复执行 - 用 SELECT FOR UPDATE 锁住审批单
|
||
# ==============================================
|
||
approval = BorrowApproval.query.with_for_update().get(approval_id)
|
||
if not approval:
|
||
raise ValueError("审批单不存在")
|
||
if approval.status != 1:
|
||
status_map = {0: '待审批', 1: '已通过', 2: '已驳回', 3: '已完成'}
|
||
raise ValueError(f"审批单状态为【{status_map.get(approval.status, approval.status)}】,无法执行借库")
|
||
|
||
# ★ borrower_name 兜底:优先用前端传参,其次从审批单读取(申请时填写的姓名)
|
||
if not borrower_name:
|
||
borrower_name = approval.borrower_name
|
||
if not borrower_name:
|
||
raise ValueError("审批单中未记录借库人姓名,请联系管理员补录")
|
||
|
||
# ==============================================
|
||
# ★ 防线2:构建审批上限字典(按 名称+规格 聚合,strip 防止匹配失败)
|
||
# Key = (name, spec_model),Value = 该物料累计允许借出数量
|
||
# ==============================================
|
||
approved_items = approval.get_items()
|
||
if not approved_items:
|
||
raise ValueError("审批单中无物料明细,请联系管理员检查")
|
||
|
||
approval_limits = {}
|
||
for ai in approved_items:
|
||
key = (
|
||
(ai.get('name') or '').strip(),
|
||
(ai.get('spec_model') or '').strip()
|
||
)
|
||
approval_limits[key] = approval_limits.get(key, 0) + float(ai.get('quantity', 0))
|
||
|
||
# 累计本次扫码出库量(key 与 approval_limits 完全一致)
|
||
dispatch_acc = {}
|
||
|
||
borrow_no = TransService.generate_borrow_no()
|
||
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
|
||
|
||
try:
|
||
for item in items:
|
||
source_table = item.get('source_table')
|
||
stock_id = item.get('id')
|
||
qty = float(item.get('out_quantity', 0))
|
||
|
||
ModelClass = model_map.get(source_table)
|
||
if not ModelClass: continue
|
||
|
||
# ==============================================
|
||
# ★ 防线3:并发超卖与负库存 - 锁行后再查可用库存
|
||
# ⚠️ 不要在此加 joinedload(ModelClass.base)!PG 禁止 FOR UPDATE
|
||
# 应用到 outer join 的 nullable 侧,会报 FeatureNotSupported
|
||
# 并有死锁风险。stock.base 走单条 lazy 加载是已知取舍。
|
||
# ==============================================
|
||
stock = ModelClass.query.with_for_update().get(stock_id)
|
||
if not stock: raise ValueError(f"库存不存在 ID:{stock_id}")
|
||
|
||
# ==============================================
|
||
# ★ 防线4:名称+规格 超额校验(动态累加、即时拦截)
|
||
# 库存表本身没有 name/spec_model 字段,通过 base 关联到 material_base
|
||
# ==============================================
|
||
if stock.base:
|
||
stock_name = (stock.base.name or '').strip()
|
||
stock_spec = (stock.base.spec_model or '').strip()
|
||
else:
|
||
stock_name = ''
|
||
stock_spec = ''
|
||
key = (stock_name, stock_spec)
|
||
|
||
limit = approval_limits.get(key)
|
||
if limit is None:
|
||
raise ValueError(
|
||
f"扫码物料【{stock_name} / {stock_spec}】不在审批单允许范围内,"
|
||
f"请检查审批单明细或重新发起申请"
|
||
)
|
||
|
||
dispatch_acc[key] = dispatch_acc.get(key, 0) + qty
|
||
current_total = dispatch_acc[key]
|
||
if current_total > limit:
|
||
raise ValueError(
|
||
f"实际出库数量超出了审批单允许的上限: "
|
||
f"物料={stock_name}({stock_spec}) "
|
||
f"审批上限={limit}, 实际扫码={current_total}"
|
||
)
|
||
|
||
if float(stock.available_quantity) < qty:
|
||
raise ValueError(f"物料【{stock_name} / {stock_spec}】可用库存不足")
|
||
|
||
# 1. 冻结库存 (只减可用)
|
||
stock.available_quantity = float(stock.available_quantity) - qty
|
||
|
||
# 2. 创建借用记录
|
||
record = TransBorrow(
|
||
borrow_no=borrow_no,
|
||
sku=stock.sku,
|
||
source_table=source_table,
|
||
stock_id=stock.id,
|
||
barcode=stock.barcode,
|
||
quantity=qty,
|
||
borrower_name=borrower_name,
|
||
borrow_signature=signature,
|
||
remark=remark,
|
||
expected_return_time=expected_return_time,
|
||
status='borrowed',
|
||
is_returned=False
|
||
)
|
||
db.session.add(record)
|
||
|
||
# ★ 3. 标记审批单为已完成
|
||
approval.status = 3
|
||
|
||
db.session.commit()
|
||
return borrow_no
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
# ★ 兼容旧入口(不走审批流的直接借库,保留以便平滑过渡)
|
||
@staticmethod
|
||
def create_borrow(data, operator_name='System'):
|
||
"""
|
||
借库逻辑(兼容旧模式):减少可用库存,不减总库存
|
||
@deprecated 请优先使用 execute_dispatch 走审批流
|
||
"""
|
||
return TransService.execute_dispatch(
|
||
approval_id=0,
|
||
items=data.get('items', []),
|
||
operator_name=operator_name,
|
||
borrower_name=data.get('borrower_name'),
|
||
signature=data.get('signature_path'),
|
||
remark=data.get('remark'),
|
||
expected_return_time=data.get('expected_return_time')
|
||
)
|
||
|
||
@staticmethod
|
||
def scan_for_return(barcode):
|
||
"""
|
||
扫码还库:查找未归还记录,并返回当前物品的库位
|
||
"""
|
||
records = TransBorrow.query.filter_by(barcode=barcode, is_returned=False).all()
|
||
if not records:
|
||
return None
|
||
|
||
# 取第一条未还记录
|
||
record = records[0]
|
||
|
||
# 获取当前库存表中的实时库位
|
||
current_location = ""
|
||
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
|
||
ModelClass = model_map.get(record.source_table)
|
||
|
||
if ModelClass:
|
||
stock = ModelClass.query.get(record.stock_id)
|
||
if stock:
|
||
current_location = stock.warehouse_location
|
||
|
||
res_dict = record.to_dict()
|
||
res_dict['current_location'] = current_location # 用于前端对比和预填
|
||
return res_dict
|
||
|
||
@staticmethod
|
||
def process_return(data, operator_name):
|
||
"""
|
||
还库逻辑(支持部分归还)- 已优化,消除 N+1 和长事务死锁风险
|
||
四步走策略:
|
||
1. 收集所有 borrow_id
|
||
2. 批量锁定借用记录
|
||
3. 收集库存ID并批量锁定库存
|
||
4. 内存中完成业务逻辑
|
||
"""
|
||
items = data.get('items', [])
|
||
signature = data.get('signature_path') # 库管签字
|
||
|
||
if not items: raise ValueError("还库列表为空")
|
||
if not signature: raise ValueError("库管必须签字确认")
|
||
|
||
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
|
||
|
||
try:
|
||
# ==========================================
|
||
# ★ 优化步骤 1:收集所有 borrow_id
|
||
# ==========================================
|
||
borrow_ids = []
|
||
item_map = {} # 存储原始 item 数据,key=borrow_id
|
||
for item in items:
|
||
borrow_id = item.get('id')
|
||
if borrow_id:
|
||
borrow_ids.append(borrow_id)
|
||
item_map[borrow_id] = {
|
||
'return_qty': float(item.get('return_qty', 0)),
|
||
'final_location': item.get('return_location')
|
||
}
|
||
|
||
if not borrow_ids:
|
||
raise ValueError("没有有效的归还记录")
|
||
|
||
# ==========================================
|
||
# ★ 优化步骤 2:批量锁定借用记录
|
||
# ==========================================
|
||
borrow_records = TransBorrow.query.with_for_update().filter(
|
||
TransBorrow.id.in_(borrow_ids)
|
||
).all()
|
||
|
||
borrow_map = {r.id: r for r in borrow_records}
|
||
|
||
# ==========================================
|
||
# ★ 优化步骤 3:收集库存ID并批量锁定库存
|
||
# ==========================================
|
||
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
|
||
|
||
for borrow_id, record in borrow_map.items():
|
||
if record.source_table in stock_ids_by_table and record.stock_id:
|
||
stock_ids_by_table[record.source_table].add(record.stock_id)
|
||
|
||
stock_map = {} # 格式: { ('stock_buy', 101): stock_obj }
|
||
for table_name, ids in stock_ids_by_table.items():
|
||
if not ids:
|
||
continue
|
||
ModelClass = model_map[table_name]
|
||
stocks = ModelClass.query.with_for_update().filter(
|
||
ModelClass.id.in_(ids)
|
||
).all()
|
||
for stock in stocks:
|
||
stock_map[(table_name, stock.id)] = stock
|
||
|
||
# ==========================================
|
||
# ★ 优化步骤 4:内存中完成业务逻辑
|
||
# ==========================================
|
||
for borrow_id, item_data in item_map.items():
|
||
return_qty = item_data['return_qty']
|
||
final_location = item_data['final_location']
|
||
|
||
record = borrow_map.get(borrow_id)
|
||
if not record:
|
||
continue
|
||
|
||
# 计算待还数量
|
||
returned_qty = float(record.returned_quantity) if record.returned_quantity else 0
|
||
total_qty = float(record.quantity) if record.quantity else 0
|
||
pending_qty = total_qty - returned_qty
|
||
|
||
# 校验归还数量
|
||
if return_qty <= 0:
|
||
raise ValueError(f"归还数量必须大于0")
|
||
if return_qty > pending_qty:
|
||
raise ValueError(f"本次归还数量({return_qty})不能大于待还数量({pending_qty})")
|
||
|
||
# 更新库存
|
||
stock = stock_map.get((record.source_table, record.stock_id))
|
||
if stock:
|
||
# 恢复可用库存
|
||
stock.available_quantity = float(stock.available_quantity) + return_qty
|
||
# 更新库位
|
||
if final_location:
|
||
stock.warehouse_location = final_location
|
||
|
||
# 更新归还数量和状态
|
||
new_returned_qty = returned_qty + return_qty
|
||
record.returned_quantity = new_returned_qty
|
||
|
||
if new_returned_qty >= total_qty:
|
||
record.is_returned = True
|
||
record.status = 'returned'
|
||
else:
|
||
record.is_returned = False
|
||
record.status = 'partial_returned'
|
||
|
||
record.return_time = datetime.now()
|
||
record.return_operator = operator_name
|
||
record.return_signature = signature
|
||
if final_location:
|
||
record.return_location = final_location
|
||
|
||
db.session.commit()
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def get_records(page=1, limit=10, status='all', keyword=None, search_type='all'):
|
||
q = TransBorrow.query
|
||
|
||
# 如果有关键词,需要联表搜索物料名称和规格型号
|
||
if keyword:
|
||
# 根据 search_type 构建不同的搜索条件
|
||
if search_type == 'all':
|
||
# 原有逻辑:or_ 联表全局模糊搜索
|
||
# 查询 stock_buy 路径匹配的名称/规格
|
||
buy_match = db.session.query(TransBorrow.id).join(
|
||
StockBuy, and_(
|
||
TransBorrow.stock_id == StockBuy.id,
|
||
TransBorrow.source_table == 'stock_buy'
|
||
)
|
||
).join(
|
||
MaterialBase, StockBuy.base_id == MaterialBase.id
|
||
).filter(
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
).subquery()
|
||
|
||
# 查询 stock_semi 路径匹配的名称/规格
|
||
semi_match = db.session.query(TransBorrow.id).join(
|
||
StockSemi, and_(
|
||
TransBorrow.stock_id == StockSemi.id,
|
||
TransBorrow.source_table == 'stock_semi'
|
||
)
|
||
).join(
|
||
MaterialBase, StockSemi.base_id == MaterialBase.id
|
||
).filter(
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
).subquery()
|
||
|
||
# 查询 stock_product 路径匹配的名称/规格
|
||
product_match = db.session.query(TransBorrow.id).join(
|
||
StockProduct, and_(
|
||
TransBorrow.stock_id == StockProduct.id,
|
||
TransBorrow.source_table == 'stock_product'
|
||
)
|
||
).join(
|
||
MaterialBase, StockProduct.base_id == MaterialBase.id
|
||
).filter(
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
).subquery()
|
||
|
||
# 合并三种来源的匹配 ID
|
||
all_matches = db.session.query(buy_match.c.id).union(
|
||
db.session.query(semi_match.c.id),
|
||
db.session.query(product_match.c.id)
|
||
).subquery()
|
||
|
||
keyword_conditions = or_(
|
||
TransBorrow.borrower_name.ilike(f'%{keyword}%'),
|
||
TransBorrow.sku.ilike(f'%{keyword}%'),
|
||
TransBorrow.borrow_no.ilike(f'%{keyword}%'),
|
||
TransBorrow.id.in_(all_matches)
|
||
)
|
||
|
||
elif search_type == 'no':
|
||
keyword_conditions = TransBorrow.borrow_no.ilike(f'%{keyword}%')
|
||
|
||
elif search_type == 'name':
|
||
keyword_conditions = TransBorrow.borrower_name.ilike(f'%{keyword}%')
|
||
|
||
elif search_type == 'sku':
|
||
keyword_conditions = TransBorrow.sku.ilike(f'%{keyword}%')
|
||
|
||
elif search_type == 'material_name':
|
||
# 联表查询物料名称
|
||
buy_match = db.session.query(TransBorrow.id).join(
|
||
StockBuy, and_(
|
||
TransBorrow.stock_id == StockBuy.id,
|
||
TransBorrow.source_table == 'stock_buy'
|
||
)
|
||
).join(
|
||
MaterialBase, StockBuy.base_id == MaterialBase.id
|
||
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
|
||
|
||
semi_match = db.session.query(TransBorrow.id).join(
|
||
StockSemi, and_(
|
||
TransBorrow.stock_id == StockSemi.id,
|
||
TransBorrow.source_table == 'stock_semi'
|
||
)
|
||
).join(
|
||
MaterialBase, StockSemi.base_id == MaterialBase.id
|
||
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
|
||
|
||
product_match = db.session.query(TransBorrow.id).join(
|
||
StockProduct, and_(
|
||
TransBorrow.stock_id == StockProduct.id,
|
||
TransBorrow.source_table == 'stock_product'
|
||
)
|
||
).join(
|
||
MaterialBase, StockProduct.base_id == MaterialBase.id
|
||
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
|
||
|
||
all_matches = db.session.query(buy_match.c.id).union(
|
||
db.session.query(semi_match.c.id),
|
||
db.session.query(product_match.c.id)
|
||
).subquery()
|
||
|
||
keyword_conditions = TransBorrow.id.in_(all_matches)
|
||
|
||
elif search_type == 'spec_model':
|
||
# 联表查询规格型号
|
||
buy_match = db.session.query(TransBorrow.id).join(
|
||
StockBuy, and_(
|
||
TransBorrow.stock_id == StockBuy.id,
|
||
TransBorrow.source_table == 'stock_buy'
|
||
)
|
||
).join(
|
||
MaterialBase, StockBuy.base_id == MaterialBase.id
|
||
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
|
||
|
||
semi_match = db.session.query(TransBorrow.id).join(
|
||
StockSemi, and_(
|
||
TransBorrow.stock_id == StockSemi.id,
|
||
TransBorrow.source_table == 'stock_semi'
|
||
)
|
||
).join(
|
||
MaterialBase, StockSemi.base_id == MaterialBase.id
|
||
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
|
||
|
||
product_match = db.session.query(TransBorrow.id).join(
|
||
StockProduct, and_(
|
||
TransBorrow.stock_id == StockProduct.id,
|
||
TransBorrow.source_table == 'stock_product'
|
||
)
|
||
).join(
|
||
MaterialBase, StockProduct.base_id == MaterialBase.id
|
||
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
|
||
|
||
all_matches = db.session.query(buy_match.c.id).union(
|
||
db.session.query(semi_match.c.id),
|
||
db.session.query(product_match.c.id)
|
||
).subquery()
|
||
|
||
keyword_conditions = TransBorrow.id.in_(all_matches)
|
||
|
||
else:
|
||
keyword_conditions = None
|
||
else:
|
||
keyword_conditions = None
|
||
|
||
if keyword_conditions is not None:
|
||
q = q.filter(keyword_conditions)
|
||
|
||
if status == 'borrowed':
|
||
q = q.filter(TransBorrow.is_returned == False)
|
||
elif status == 'returned':
|
||
q = q.filter(TransBorrow.is_returned == True)
|
||
|
||
# 使用 distinct 防止跨表查询产生重复记录
|
||
q = q.distinct()
|
||
|
||
q = q.order_by(nullslast(asc(TransBorrow.expected_return_time)))
|
||
pagination = q.paginate(page=page, per_page=limit, error_out=False)
|
||
|
||
# ============================================================
|
||
# ★ 批量预加载物料名称(两步:收集ID → 批量 JOIN → 内存拼装)
|
||
# ============================================================
|
||
items_with_names = []
|
||
items = pagination.items
|
||
if items:
|
||
# 步骤 1:收集所有 (source_table, stock_id) 对
|
||
stock_ids_by_table = {'stock_buy': set(), 'stock_semi': set(), 'stock_product': set()}
|
||
for item in items:
|
||
if item.source_table in stock_ids_by_table and item.stock_id:
|
||
stock_ids_by_table[item.source_table].add(item.stock_id)
|
||
|
||
# 步骤 2:批量查询库存表并 JOIN MaterialBase
|
||
stock_map = {} # { ('stock_buy', 101): '物料名称', ... }
|
||
model_map = {
|
||
'stock_buy': StockBuy,
|
||
'stock_semi': StockSemi,
|
||
'stock_product': StockProduct
|
||
}
|
||
for table_name, ids in stock_ids_by_table.items():
|
||
if not ids:
|
||
continue
|
||
ModelClass = model_map.get(table_name)
|
||
if not ModelClass:
|
||
continue
|
||
stocks = ModelClass.query.options(
|
||
joinedload(ModelClass.base)
|
||
).filter(ModelClass.id.in_(ids)).all()
|
||
for stock in stocks:
|
||
name = stock.base.name if stock.base else ''
|
||
stock_map[(table_name, stock.id)] = name
|
||
|
||
# 步骤 3(前置):收集 SKU 兜底候选集
|
||
empty_sku_set = set()
|
||
for item in items:
|
||
name = stock_map.get((item.source_table, item.stock_id), '')
|
||
if not name and item.sku:
|
||
empty_sku_set.add(item.sku)
|
||
|
||
# 步骤 3(前置):SKU 兜底批量查询
|
||
# 场景:库存记录被跨表转移(删旧建新)时,trans_borrow.stock_id 指向孤立记录
|
||
# 通过 sku 在三张库存表中查找任意匹配,再通过 base_id 获取 MaterialBase.name
|
||
sku_name_map = {}
|
||
if empty_sku_set:
|
||
for ModelClass in [StockProduct, StockSemi, StockBuy]:
|
||
stocks = ModelClass.query.options(
|
||
joinedload(ModelClass.base)
|
||
).filter(
|
||
ModelClass.sku.in_(empty_sku_set)
|
||
).all()
|
||
for stock in stocks:
|
||
if stock.sku not in sku_name_map and stock.base:
|
||
sku_name_map[stock.sku] = stock.base.name
|
||
|
||
# 步骤 3:为每条记录注入 material_name(含 SKU 兜底)
|
||
for item in items:
|
||
item_dict = item.to_dict()
|
||
material_name = stock_map.get((item.source_table, item.stock_id), '')
|
||
if not material_name and item.sku:
|
||
material_name = sku_name_map.get(item.sku, '')
|
||
item_dict['material_name'] = material_name
|
||
items_with_names.append(item_dict)
|
||
|
||
items_data = items_with_names
|
||
else:
|
||
items_data = []
|
||
|
||
return {
|
||
'items': items_data,
|
||
'total': pagination.total,
|
||
'page': page,
|
||
'limit': limit
|
||
}
|