Files
KCGL/inventory-backend/app/services/outbound_service.py
2026-02-05 14:36:36 +08:00

209 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid
from datetime import datetime, timezone, timedelta # [修改] 引入 timezone 和 timedelta
from sqlalchemy import or_
from app.extensions import db
from app.models.outbound import TransOutbound
# 引入所有库存模型以进行查询
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# ★★★ [关键] 引入基础信息表,用于手动补全名称
from app.models.base import MaterialBase
class OutboundService:
@staticmethod
def generate_outbound_no():
"""
生成出库单号: OUT-yyyyMMdd-随机码
[修改] 强制使用北京时间生成日期前缀
"""
# 获取北京时间
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz)
date_str = current_time.strftime('%Y%m%d')
short_uuid = uuid.uuid4().hex[:6].upper()
return f"OUT-{date_str}-{short_uuid}"
@staticmethod
def get_stock_by_barcode(barcode):
"""
[核心逻辑] 根据扫码内容查找对应的库存物品
查找顺序: 成品 (StockProduct) -> 半成品 (StockSemi) -> 采购件 (StockBuy)
匹配逻辑: 匹配 barcode 字段 OR sku 字段
"""
if not barcode:
return None
clean_code = barcode.strip()
# --- 1. 查找成品表 (StockProduct) ---
prod = StockProduct.query.filter(
or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code)
).first()
if prod:
return OutboundService._format_scan_result(prod, 'stock_product')
# --- 2. 查找半成品表 (StockSemi) ---
semi = StockSemi.query.filter(
or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code)
).first()
if semi:
return OutboundService._format_scan_result(semi, 'stock_semi')
# --- 3. 查找采购件表 (StockBuy) ---
buy = StockBuy.query.filter(
or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code)
).first()
if buy:
return OutboundService._format_scan_result(buy, 'stock_buy')
return None
@staticmethod
def _format_scan_result(item, table_name):
"""
[核心修复] 格式化返回数据,确保名称和规格一定能取到
"""
base_name = ""
base_spec = ""
# -------------------------------------------------------
# 修复逻辑:强制获取基础信息
# -------------------------------------------------------
# 步骤 1: 尝试通过 ORM 关联获取 (如果有定义 relationship)
if hasattr(item, 'base') and item.base:
base_name = item.base.name
base_spec = item.base.spec_model
# 步骤 2: [关键] 如果步骤1失败但有 base_id则手动查询 MaterialBase 表
# 这能解决“扫码有库存但显示未知物品”的问题
if not base_name and hasattr(item, 'base_id') and item.base_id:
try:
# 手动查基础表
base_info = MaterialBase.query.get(item.base_id)
if base_info:
base_name = base_info.name
base_spec = base_info.spec_model
except Exception as e:
print(f"基础信息查询失败: {e}")
# 步骤 3: 兜底逻辑,某些旧表可能直接存了 material_name 字段
if not base_name and hasattr(item, 'material_name'):
base_name = item.material_name
stock_qty = float(item.stock_quantity) if item.stock_quantity else 0
avail_qty = float(item.available_quantity) if item.available_quantity else 0
return {
'id': item.id,
'sku': item.sku,
'name': base_name or "未知物品", # 此时应该能正确显示名称了
'spec_model': base_spec or "", # 此时应该能正确显示规格了
'source_table': table_name, # 标记来源表
'stock_quantity': stock_qty, # 当前库存
'available_quantity': avail_qty, # 可用库存
'batch_number': getattr(item, 'batch_number', ''),
'warehouse_location': getattr(item, 'warehouse_location', ''),
'barcode': getattr(item, 'barcode', '')
}
@staticmethod
def create_outbound(data, operator_name='System'):
"""
[核心逻辑] 执行出库:扣减对应表的库存 + 创建记录
"""
source_table = data.get('source_table')
stock_id = data.get('stock_id')
quantity = float(data.get('quantity', 0))
if quantity <= 0:
raise ValueError("出库数量必须大于0")
# 1. 动态映射表模型
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
ModelClass = model_map.get(source_table)
if not ModelClass:
raise ValueError(f"无效的数据来源表: {source_table}")
# 2. 锁定并查询库存 (使用 with_for_update 防止并发超卖)
stock_item = ModelClass.query.with_for_update().get(stock_id)
if not stock_item:
raise ValueError("库存记录不存在或已被删除")
# 3. 校验库存充足
current_avail = float(stock_item.available_quantity)
if current_avail < quantity:
raise ValueError(f"库存不足!当前可用: {current_avail}, 请求出库: {quantity}")
try:
# 4. 扣减库存
stock_item.stock_quantity = float(stock_item.stock_quantity) - quantity
stock_item.available_quantity = float(stock_item.available_quantity) - quantity
# [新增] 计算北京时间
beijing_tz = timezone(timedelta(hours=8))
# replace(tzinfo=None) 是为了让存入数据库的时间变为 naive time (不带时区信息的本地时间)
# 这样数据库看起来就是 "2023-10-27 15:00:00" 而不是 UTC 时间
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
# 5. 创建出库记录
new_outbound = TransOutbound(
outbound_no=OutboundService.generate_outbound_no(),
sku=data.get('sku'),
source_table=source_table,
stock_id=stock_id,
barcode=data.get('barcode'),
outbound_type=data.get('outbound_type', 'SALES'),
quantity=quantity,
consumer_name=data.get('consumer_name'),
signature_path=data.get('signature_path'), # 存储签名的 URL
# [关键] 显式设置北京时间,覆盖 Model 中的 default=datetime.now (UTC)
outbound_time=current_time,
operator_name=operator_name,
remark=data.get('remark')
)
db.session.add(new_outbound)
db.session.commit()
return new_outbound
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None):
"""查询出库历史记录"""
query = TransOutbound.query.order_by(TransOutbound.outbound_time.desc())
if keyword:
query = query.filter(or_(
TransOutbound.outbound_no.ilike(f'%{keyword}%'),
TransOutbound.consumer_name.ilike(f'%{keyword}%'),
TransOutbound.sku.ilike(f'%{keyword}%')
))
if start_date and end_date:
query = query.filter(TransOutbound.outbound_time.between(start_date, end_date))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return {
'items': [item.to_dict() for item in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
}