Files
KCGL/inventory-backend/app/services/outbound_service.py

466 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid # .material -> .base refactor checked
from datetime import datetime, timezone, timedelta
from sqlalchemy import or_, func, desc, and_
from app.extensions import db
from app.models.outbound import TransOutbound
# 引入所有库存模型以进行查询
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# 引入基础信息表
from app.models.base import MaterialBase
class OutboundService:
@staticmethod
def generate_outbound_no():
"""
生成出库单号: OUT-yyyyMMdd-HHmm-当日流水(4位)
例如: OUT-20260205-1558-0001
"""
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
date_str = now.strftime('%Y%m%d')
time_str = now.strftime('%H%M')
prefix = f"OUT-{date_str}-"
existing_count = db.session.query(func.count(func.distinct(TransOutbound.outbound_no))) \
.filter(TransOutbound.outbound_no.like(f"{prefix}%")).scalar()
sequence = existing_count + 1
return f"OUT-{date_str}-{time_str}-{sequence:04d}"
@staticmethod
def get_stock_by_barcode(barcode):
"""
根据扫码内容查找对应的库存物品,并附带价格信息
"""
if not barcode:
return None
clean_code = barcode.strip()
def get_price(item, table_type):
if table_type == 'stock_product':
return float(item.sale_price) if item.sale_price else 0
elif table_type == 'stock_buy':
return float(item.pre_tax_unit_price) if item.pre_tax_unit_price else 0
return 0
prod = StockProduct.query.filter(
or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code)
).first()
if prod:
res = OutboundService._format_scan_result(prod, 'stock_product')
res['price'] = get_price(prod, 'stock_product')
return res
semi = StockSemi.query.filter(
or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code)
).first()
if semi:
res = OutboundService._format_scan_result(semi, 'stock_semi')
res['price'] = 0
return res
buy = StockBuy.query.filter(
or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code)
).first()
if buy:
res = OutboundService._format_scan_result(buy, 'stock_buy')
res['price'] = get_price(buy, 'stock_buy')
return res
return None
@staticmethod
def _format_scan_result(item, table_name):
base_name = ""
base_spec = ""
base_cat = ""
base_type = ""
if hasattr(item, 'base') and item.base:
base_name = item.base.name
base_spec = item.base.spec_model
base_cat = item.base.category
base_type = item.base.material_type
if not base_name and hasattr(item, 'base_id') and item.base_id:
try:
base_info = MaterialBase.query.get(item.base_id)
if base_info:
base_name = base_info.name
base_spec = base_info.spec_model
base_cat = base_info.category
base_type = base_info.material_type
except Exception:
pass
if not base_name and hasattr(item, 'base') and item.base:
base_name = item.base.name
stock_qty = float(item.stock_quantity) if item.stock_quantity else 0
avail_qty = float(item.available_quantity) if item.available_quantity else 0
return {
'id': item.id,
'sku': item.sku,
'name': base_name or "未知物品",
'spec_model': base_spec or "",
'category': base_cat or "",
'material_type': base_type or "",
'source_table': table_name,
'stock_quantity': stock_qty,
'available_quantity': avail_qty,
'batch_number': getattr(item, 'batch_number', ''),
'warehouse_location': getattr(item, 'warehouse_location', ''),
'barcode': getattr(item, 'barcode', '')
}
@staticmethod
def create_outbound_batch(data, operator_name='System'):
items = data.get('items', [])
if not items:
raise ValueError("出库商品列表不能为空")
outbound_no = OutboundService.generate_outbound_no()
common_data = {
'outbound_no': outbound_no,
'consumer_name': data.get('consumer_name'),
'outbound_type': data.get('outbound_type', 'SALES'),
'signature_path': data.get('signature_path'),
'operator_name': operator_name,
'remark': data.get('remark')
}
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
try:
for item in items:
source_table = item.get('source_table')
stock_id = item.get('stock_id')
quantity = float(item.get('quantity', 0))
unit_price = float(item.get('price', 0))
if quantity <= 0:
raise ValueError(f"SKU {item.get('sku')} 的出库数量必须大于0")
ModelClass = model_map.get(source_table)
if not ModelClass:
continue
stock_record = ModelClass.query.with_for_update().get(stock_id)
if not stock_record:
raise ValueError(f"库存记录不存在 (ID: {stock_id})")
if float(stock_record.available_quantity) < quantity:
raise ValueError(f"SKU {stock_record.sku} 库存不足,当前可用: {stock_record.available_quantity}")
stock_record.stock_quantity = float(stock_record.stock_quantity) - quantity
stock_record.available_quantity = float(stock_record.available_quantity) - quantity
new_record = TransOutbound(
sku=item.get('sku'),
source_table=source_table,
stock_id=stock_id,
barcode=item.get('barcode'),
quantity=quantity,
unit_price=unit_price,
outbound_time=current_time,
**common_data
)
db.session.add(new_record)
db.session.commit()
return outbound_no
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_grouped_list(page=1, per_page=10, keyword=None, search_type='all', start_date=None, end_date=None):
"""
查询出库记录(按出库单号分组),包含详细物品信息
支持跨表搜索单号、领用人、SKU、物料名称、规格型号
search_type: all, no, name, sku, material_name, spec_model
"""
# 1. 构建基础查询
# 如果有关键词,需要联表搜索物料名称和规格型号
if keyword:
# 根据 search_type 构建不同的搜索条件
if search_type == 'all':
# 原有逻辑or_ 联表全局模糊搜索
# 查询 stock_buy 路径匹配的名称/规格
buy_match = db.session.query(TransOutbound.outbound_no).join(
StockBuy, and_(
TransOutbound.stock_id == StockBuy.id,
TransOutbound.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_semi 路径匹配的名称/规格
semi_match = db.session.query(TransOutbound.outbound_no).join(
StockSemi, and_(
TransOutbound.stock_id == StockSemi.id,
TransOutbound.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 查询 stock_product 路径匹配的名称/规格
product_match = db.session.query(TransOutbound.outbound_no).join(
StockProduct, and_(
TransOutbound.stock_id == StockProduct.id,
TransOutbound.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).subquery()
# 合并三种来源的匹配单号
all_matches = db.session.query(buy_match.c.outbound_no).union(
db.session.query(semi_match.c.outbound_no),
db.session.query(product_match.c.outbound_no)
).subquery()
keyword_conditions = or_(
TransOutbound.outbound_no.ilike(f'%{keyword}%'),
TransOutbound.consumer_name.ilike(f'%{keyword}%'),
TransOutbound.sku.ilike(f'%{keyword}%'),
TransOutbound.outbound_no.in_(all_matches)
)
elif search_type == 'no':
keyword_conditions = TransOutbound.outbound_no.ilike(f'%{keyword}%')
elif search_type == 'name':
keyword_conditions = TransOutbound.consumer_name.ilike(f'%{keyword}%')
elif search_type == 'sku':
keyword_conditions = TransOutbound.sku.ilike(f'%{keyword}%')
elif search_type == 'material_name':
# 联表查询物料名称
buy_match = db.session.query(TransOutbound.outbound_no).join(
StockBuy, and_(
TransOutbound.stock_id == StockBuy.id,
TransOutbound.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransOutbound.outbound_no).join(
StockSemi, and_(
TransOutbound.stock_id == StockSemi.id,
TransOutbound.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransOutbound.outbound_no).join(
StockProduct, and_(
TransOutbound.stock_id == StockProduct.id,
TransOutbound.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.outbound_no).union(
db.session.query(semi_match.c.outbound_no),
db.session.query(product_match.c.outbound_no)
).subquery()
keyword_conditions = TransOutbound.outbound_no.in_(all_matches)
elif search_type == 'spec_model':
# 联表查询规格型号
buy_match = db.session.query(TransOutbound.outbound_no).join(
StockBuy, and_(
TransOutbound.stock_id == StockBuy.id,
TransOutbound.source_table == 'stock_buy'
)
).join(
MaterialBase, StockBuy.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
semi_match = db.session.query(TransOutbound.outbound_no).join(
StockSemi, and_(
TransOutbound.stock_id == StockSemi.id,
TransOutbound.source_table == 'stock_semi'
)
).join(
MaterialBase, StockSemi.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
product_match = db.session.query(TransOutbound.outbound_no).join(
StockProduct, and_(
TransOutbound.stock_id == StockProduct.id,
TransOutbound.source_table == 'stock_product'
)
).join(
MaterialBase, StockProduct.base_id == MaterialBase.id
).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery()
all_matches = db.session.query(buy_match.c.outbound_no).union(
db.session.query(semi_match.c.outbound_no),
db.session.query(product_match.c.outbound_no)
).subquery()
keyword_conditions = TransOutbound.outbound_no.in_(all_matches)
else:
keyword_conditions = None
else:
keyword_conditions = None
stmt = db.session.query(
TransOutbound.outbound_no,
func.max(TransOutbound.outbound_time).label('max_time')
).group_by(TransOutbound.outbound_no)
if keyword_conditions is not None:
stmt = stmt.filter(keyword_conditions)
if start_date and end_date:
stmt = stmt.filter(TransOutbound.outbound_time.between(start_date, end_date))
stmt = stmt.order_by(desc('max_time'))
# 使用 distinct 确保跨表查询不重复
stmt = stmt.distinct()
pagination = stmt.paginate(page=page, per_page=per_page, error_out=False)
outbound_nos = [row.outbound_no for row in pagination.items]
if not outbound_nos:
return {
'items': [],
'total': 0,
'pages': 0,
'current_page': page
}
# 2. 查询详细记录
details = TransOutbound.query.filter(TransOutbound.outbound_no.in_(outbound_nos)).all()
# 3. 组装数据并查询物品详情
grouped_map = {}
# 映射表模型以便查询
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
for d in details:
ono = d.outbound_no
if ono not in grouped_map:
grouped_map[ono] = {
'outbound_no': ono,
'outbound_time': d.outbound_time.strftime('%Y-%m-%d %H:%M:%S'),
'outbound_type': d.outbound_type,
'consumer_name': d.consumer_name,
'operator_name': d.operator_name,
'signature_path': d.signature_path,
'remark': d.remark,
'total_amount': 0.0,
'items': []
}
# --- 查询物品详细信息 (名称, 规格, 类型, 类别) ---
item_name = "未知物品"
item_spec = ""
item_cat = ""
item_type = ""
ModelClass = model_map.get(d.source_table)
if ModelClass and d.stock_id:
# 注意这里在循环中查询可能会有N+1问题但考虑到单页数据量通常每单条目不多暂时可接受
# 生产环境建议优化为预加载或批量查询
try:
stock_item = ModelClass.query.get(d.stock_id)
if stock_item and stock_item.base:
item_name = stock_item.base.name
item_spec = stock_item.base.spec_model
item_cat = stock_item.base.category
item_type = stock_item.base.material_type
elif stock_item and hasattr(stock_item, 'base_id') and stock_item.base_id:
base_info = MaterialBase.query.get(stock_item.base_id)
if base_info:
item_name = base_info.name
item_spec = base_info.spec_model
item_cat = base_info.category
item_type = base_info.material_type
except Exception as e:
print(f"Error fetching detail for stock_id {d.stock_id}: {e}")
# 计算金额
price = float(d.unit_price) if d.unit_price else 0
qty = float(d.quantity)
subtotal = price * qty
grouped_map[ono]['total_amount'] += subtotal
grouped_map[ono]['items'].append({
'sku': d.sku,
'name': item_name,
'spec_model': item_spec,
'category': item_cat,
'material_type': item_type,
'quantity': qty,
'unit_price': price,
'subtotal': subtotal
})
# 4. 排序输出
result_list = []
for ono in outbound_nos:
if ono in grouped_map:
obj = grouped_map[ono]
obj['items'].sort(key=lambda x: x['unit_price'], reverse=True)
obj['total_amount'] = round(obj['total_amount'], 2)
result_list.append(obj)
return {
'items': result_list,
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
}