Files
KCGL/inventory-backend/app/services/inbound/inbound_summary_service.py

208 lines
9.3 KiB
Python

from sqlalchemy import select, literal, union_all, desc, asc, func, or_, cast, String, Numeric, \
Date # .material -> .base refactor checked
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
import traceback
class InboundSummaryService:
@staticmethod
def get_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None, source_type=None):
"""
聚合查询:
1. 联合 StockBuy, StockSemi, StockProduct 三张表
2. 关联 MaterialBase 获取名称规格
3. 计算动态状态 (库存耗尽显示已出库)
4. 排序:默认按入库日期倒序 (最近的在前)
"""
try:
# =========================================================
# 1. 构建三个子查询 (Subqueries)
# 使用 getattr 动态安全地获取 batch_number 和 serial_number(或 sn)
# 避免因不同表结构缺失字段导致报错
# =========================================================
# --- A. 采购件 (StockBuy) ---
q_buy = db.session.query(
StockBuy.id.label('id'),
StockBuy.base_id.label('base_id'),
StockBuy.sku.label('sku'),
StockBuy.in_date.label('inbound_date'),
StockBuy.in_quantity.label('in_qty'),
StockBuy.stock_quantity.label('current_qty'),
cast(StockBuy.supplier_name, String).label('source_info'),
StockBuy.status.label('orig_status'),
cast(getattr(StockBuy, 'batch_number', literal('')), String).label('batch_number'),
cast(getattr(StockBuy, 'serial_number', getattr(StockBuy, 'sn', literal(''))), String).label(
'serial_number'),
cast(literal('buy'), String).label('source_type')
)
# --- B. 半成品 (StockSemi) ---
q_semi = db.session.query(
StockSemi.id.label('id'),
StockSemi.base_id.label('base_id'),
StockSemi.sku.label('sku'),
StockSemi.production_date.label('inbound_date'),
StockSemi.in_quantity.label('in_qty'),
StockSemi.stock_quantity.label('current_qty'),
cast(StockSemi.production_manager, String).label('source_info'),
StockSemi.status.label('orig_status'),
cast(getattr(StockSemi, 'batch_number', literal('')), String).label('batch_number'),
cast(getattr(StockSemi, 'serial_number', getattr(StockSemi, 'sn', literal(''))), String).label(
'serial_number'),
cast(literal('semi'), String).label('source_type')
)
# --- C. 成品 (StockProduct) ---
q_product = db.session.query(
StockProduct.id.label('id'),
StockProduct.base_id.label('base_id'),
StockProduct.sku.label('sku'),
StockProduct.production_date.label('inbound_date'),
StockProduct.in_quantity.label('in_qty'),
StockProduct.stock_quantity.label('current_qty'),
cast(StockProduct.production_manager, String).label('source_info'),
StockProduct.status.label('orig_status'),
cast(getattr(StockProduct, 'batch_number', literal('')), String).label('batch_number'),
cast(getattr(StockProduct, 'serial_number', getattr(StockProduct, 'sn', literal(''))), String).label(
'serial_number'),
cast(literal('product'), String).label('source_type')
)
# =========================================================
# 2. 组合查询 (UNION ALL)
# =========================================================
combined_query = union_all(q_buy, q_semi, q_product)
cte = combined_query.subquery()
# =========================================================
# 3. 主查询:关联 MaterialBase
# =========================================================
query = db.session.query(
cte,
MaterialBase.name.label('material_name'),
MaterialBase.spec_model.label('spec_model'),
MaterialBase.category.label('category'),
MaterialBase.material_type.label('material_type')
).outerjoin(
MaterialBase, cte.c.base_id == MaterialBase.id
)
# =========================================================
# 4. 过滤条件
# =========================================================
if keyword:
rule = or_(
cte.c.sku.ilike(f'%{keyword}%'),
cte.c.source_info.ilike(f'%{keyword}%'),
cte.c.batch_number.ilike(f'%{keyword}%'),
cte.c.serial_number.ilike(f'%{keyword}%'), # 加入对序列号的搜索支持
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
query = query.filter(rule)
if start_date and end_date:
query = query.filter(cte.c.inbound_date.between(start_date, end_date))
if source_type:
query = query.filter(cte.c.source_type == source_type)
# =========================================================
# 5. 获取总数
# =========================================================
count_query = db.session.query(func.count()) \
.select_from(cte) \
.outerjoin(MaterialBase, cte.c.base_id == MaterialBase.id)
if keyword:
count_query = count_query.filter(rule)
if start_date and end_date:
count_query = count_query.filter(cte.c.inbound_date.between(start_date, end_date))
if source_type:
count_query = count_query.filter(cte.c.source_type == source_type)
total = count_query.scalar() or 0
# =========================================================
# 6. 排序与分页
# =========================================================
# ★★★ 修改处:优先按入库日期倒序排列 (最近的在前) ★★★
# 如果日期相同,再按 SKU 排序,保证分页稳定性
query = query.order_by(desc(cte.c.inbound_date), asc(cte.c.sku))
pagination = query.limit(per_page).offset((page - 1) * per_page).all()
# =========================================================
# 7. 数据格式化
# =========================================================
items = []
type_map = {
'buy': '采购入库',
'semi': '半成品生产',
'product': '成品完工'
}
for row in pagination:
date_str = ""
if row.inbound_date:
try:
date_str = row.inbound_date.strftime('%Y-%m-%d')
except Exception:
date_str = str(row.inbound_date)
in_qty = float(row.in_qty) if row.in_qty is not None else 0.0
current_qty = float(row.current_qty) if row.current_qty is not None else 0.0
# 状态逻辑
final_status = row.orig_status
if current_qty <= 0:
final_status = "已出库"
elif current_qty < in_qty:
final_status = "部分出库"
# 处理 批次 / 序列号 展示逻辑
b_num = row.batch_number or ""
s_num = row.serial_number or ""
if b_num and s_num and b_num != s_num:
display_batch_sn = f"{b_num} / {s_num}"
else:
display_batch_sn = b_num or s_num
items.append({
'id': row.id,
'sku': row.sku or "",
'name': row.material_name or "未知物品",
'spec_model': row.spec_model or "",
'category': row.category or "",
'material_type': row.material_type or "",
'inbound_date': date_str,
'quantity': in_qty,
'current_qty': current_qty,
'source_info': row.source_info or "",
'status': final_status,
'source_type': row.source_type,
'type_label': type_map.get(row.source_type, "未知类型"),
'batch_number': display_batch_sn # 输出统一的批次与序列号组合字符串给前端
})
return {
'items': items,
'total': total,
'pages': (total + per_page - 1) // per_page if per_page > 0 else 0,
'current_page': page
}
except Exception as e:
print("【InboundSummaryService Error】:", str(e))
traceback.print_exc()
raise e