209 lines
7.9 KiB
Python
209 lines
7.9 KiB
Python
import uuid
|
||
from datetime import datetime, timezone, timedelta # [修改] 引入 timezone 和 timedelta
|
||
from sqlalchemy import or_
|
||
from app.extensions import db
|
||
from app.models.outbound import TransOutbound
|
||
|
||
# 引入所有库存模型以进行查询
|
||
from app.models.inbound.buy import StockBuy
|
||
from app.models.inbound.semi import StockSemi
|
||
from app.models.inbound.product import StockProduct
|
||
# ★★★ [关键] 引入基础信息表,用于手动补全名称
|
||
from app.models.base import MaterialBase
|
||
|
||
|
||
class OutboundService:
|
||
|
||
@staticmethod
|
||
def generate_outbound_no():
|
||
"""
|
||
生成出库单号: OUT-yyyyMMdd-随机码
|
||
[修改] 强制使用北京时间生成日期前缀
|
||
"""
|
||
# 获取北京时间
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
current_time = datetime.now(beijing_tz)
|
||
|
||
date_str = current_time.strftime('%Y%m%d')
|
||
short_uuid = uuid.uuid4().hex[:6].upper()
|
||
return f"OUT-{date_str}-{short_uuid}"
|
||
|
||
@staticmethod
|
||
def get_stock_by_barcode(barcode):
|
||
"""
|
||
[核心逻辑] 根据扫码内容查找对应的库存物品
|
||
查找顺序: 成品 (StockProduct) -> 半成品 (StockSemi) -> 采购件 (StockBuy)
|
||
匹配逻辑: 匹配 barcode 字段 OR sku 字段
|
||
"""
|
||
if not barcode:
|
||
return None
|
||
|
||
clean_code = barcode.strip()
|
||
|
||
# --- 1. 查找成品表 (StockProduct) ---
|
||
prod = StockProduct.query.filter(
|
||
or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code)
|
||
).first()
|
||
if prod:
|
||
return OutboundService._format_scan_result(prod, 'stock_product')
|
||
|
||
# --- 2. 查找半成品表 (StockSemi) ---
|
||
semi = StockSemi.query.filter(
|
||
or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code)
|
||
).first()
|
||
if semi:
|
||
return OutboundService._format_scan_result(semi, 'stock_semi')
|
||
|
||
# --- 3. 查找采购件表 (StockBuy) ---
|
||
buy = StockBuy.query.filter(
|
||
or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code)
|
||
).first()
|
||
if buy:
|
||
return OutboundService._format_scan_result(buy, 'stock_buy')
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def _format_scan_result(item, table_name):
|
||
"""
|
||
[核心修复] 格式化返回数据,确保名称和规格一定能取到
|
||
"""
|
||
base_name = ""
|
||
base_spec = ""
|
||
|
||
# -------------------------------------------------------
|
||
# 修复逻辑:强制获取基础信息
|
||
# -------------------------------------------------------
|
||
|
||
# 步骤 1: 尝试通过 ORM 关联获取 (如果有定义 relationship)
|
||
if hasattr(item, 'base') and item.base:
|
||
base_name = item.base.name
|
||
base_spec = item.base.spec_model
|
||
|
||
# 步骤 2: [关键] 如果步骤1失败,但有 base_id,则手动查询 MaterialBase 表
|
||
# 这能解决“扫码有库存但显示未知物品”的问题
|
||
if not base_name and hasattr(item, 'base_id') and item.base_id:
|
||
try:
|
||
# 手动查基础表
|
||
base_info = MaterialBase.query.get(item.base_id)
|
||
if base_info:
|
||
base_name = base_info.name
|
||
base_spec = base_info.spec_model
|
||
except Exception as e:
|
||
print(f"基础信息查询失败: {e}")
|
||
|
||
# 步骤 3: 兜底逻辑,某些旧表可能直接存了 material_name 字段
|
||
if not base_name and hasattr(item, 'material_name'):
|
||
base_name = item.material_name
|
||
|
||
stock_qty = float(item.stock_quantity) if item.stock_quantity else 0
|
||
avail_qty = float(item.available_quantity) if item.available_quantity else 0
|
||
|
||
return {
|
||
'id': item.id,
|
||
'sku': item.sku,
|
||
'name': base_name or "未知物品", # 此时应该能正确显示名称了
|
||
'spec_model': base_spec or "", # 此时应该能正确显示规格了
|
||
'source_table': table_name, # 标记来源表
|
||
'stock_quantity': stock_qty, # 当前库存
|
||
'available_quantity': avail_qty, # 可用库存
|
||
'batch_number': getattr(item, 'batch_number', ''),
|
||
'warehouse_location': getattr(item, 'warehouse_location', ''),
|
||
'barcode': getattr(item, 'barcode', '')
|
||
}
|
||
|
||
@staticmethod
|
||
def create_outbound(data, operator_name='System'):
|
||
"""
|
||
[核心逻辑] 执行出库:扣减对应表的库存 + 创建记录
|
||
"""
|
||
source_table = data.get('source_table')
|
||
stock_id = data.get('stock_id')
|
||
quantity = float(data.get('quantity', 0))
|
||
|
||
if quantity <= 0:
|
||
raise ValueError("出库数量必须大于0")
|
||
|
||
# 1. 动态映射表模型
|
||
model_map = {
|
||
'stock_buy': StockBuy,
|
||
'stock_semi': StockSemi,
|
||
'stock_product': StockProduct
|
||
}
|
||
|
||
ModelClass = model_map.get(source_table)
|
||
if not ModelClass:
|
||
raise ValueError(f"无效的数据来源表: {source_table}")
|
||
|
||
# 2. 锁定并查询库存 (使用 with_for_update 防止并发超卖)
|
||
stock_item = ModelClass.query.with_for_update().get(stock_id)
|
||
if not stock_item:
|
||
raise ValueError("库存记录不存在或已被删除")
|
||
|
||
# 3. 校验库存充足
|
||
current_avail = float(stock_item.available_quantity)
|
||
if current_avail < quantity:
|
||
raise ValueError(f"库存不足!当前可用: {current_avail}, 请求出库: {quantity}")
|
||
|
||
try:
|
||
# 4. 扣减库存
|
||
stock_item.stock_quantity = float(stock_item.stock_quantity) - quantity
|
||
stock_item.available_quantity = float(stock_item.available_quantity) - quantity
|
||
|
||
# [新增] 计算北京时间
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
# replace(tzinfo=None) 是为了让存入数据库的时间变为 naive time (不带时区信息的本地时间)
|
||
# 这样数据库看起来就是 "2023-10-27 15:00:00" 而不是 UTC 时间
|
||
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
|
||
|
||
# 5. 创建出库记录
|
||
new_outbound = TransOutbound(
|
||
outbound_no=OutboundService.generate_outbound_no(),
|
||
sku=data.get('sku'),
|
||
source_table=source_table,
|
||
stock_id=stock_id,
|
||
barcode=data.get('barcode'),
|
||
outbound_type=data.get('outbound_type', 'SALES'),
|
||
quantity=quantity,
|
||
consumer_name=data.get('consumer_name'),
|
||
signature_path=data.get('signature_path'), # 存储签名的 URL
|
||
|
||
# [关键] 显式设置北京时间,覆盖 Model 中的 default=datetime.now (UTC)
|
||
outbound_time=current_time,
|
||
|
||
operator_name=operator_name,
|
||
remark=data.get('remark')
|
||
)
|
||
|
||
db.session.add(new_outbound)
|
||
db.session.commit()
|
||
|
||
return new_outbound
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def get_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None):
|
||
"""查询出库历史记录"""
|
||
query = TransOutbound.query.order_by(TransOutbound.outbound_time.desc())
|
||
|
||
if keyword:
|
||
query = query.filter(or_(
|
||
TransOutbound.outbound_no.ilike(f'%{keyword}%'),
|
||
TransOutbound.consumer_name.ilike(f'%{keyword}%'),
|
||
TransOutbound.sku.ilike(f'%{keyword}%')
|
||
))
|
||
|
||
if start_date and end_date:
|
||
query = query.filter(TransOutbound.outbound_time.between(start_date, end_date))
|
||
|
||
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
return {
|
||
'items': [item.to_dict() for item in pagination.items],
|
||
'total': pagination.total,
|
||
'pages': pagination.pages,
|
||
'current_page': page
|
||
} |