出库进行修改,确保可以进行多个样例的出库以及出库的记录展示

This commit is contained in:
dxc
2026-02-05 16:54:11 +08:00
parent 3f6ab3e607
commit c1ddb8093f
6 changed files with 608 additions and 385 deletions

View File

@ -18,7 +18,7 @@ def scan_barcode():
return jsonify({'code': 400, 'msg': '请提供条码'}), 400
try:
# 调用 Service 层去三个表中查找
# 调用 Service 层去三个表中查找 (Service已更新会返回价格)
result = OutboundService.get_stock_by_barcode(barcode)
if result:
@ -39,7 +39,7 @@ def scan_barcode():
# --------------------------------------------------------
# 2. 提交出库单接口
# 2. 提交出库单接口 (批量)
# POST /api/v1/outbound
# --------------------------------------------------------
@outbound_bp.route('', methods=['POST'])
@ -54,26 +54,26 @@ def create_outbound():
if not current_user_name:
current_user_name = 'Unknown'
# ★ [修改] 获取最终的操作员名称
# 优先取前端传来的 operator_name (你在前端下拉框选的人)
# 如果前端没传,则回退使用当前登录用户的名字
# 获取最终的操作员名称
final_operator = data.get('operator_name')
if not final_operator:
final_operator = current_user_name
# 必填校验
required_fields = ['stock_id', 'source_table', 'quantity', 'consumer_name', 'signature_path']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({'code': 400, 'msg': f'缺少必填字段: {field}'}), 400
# 必填校验 (针对整个单据)
# items 必须是列表且不为空consumer_name 和 signature_path 必填
if 'items' not in data or not data['items']:
return jsonify({'code': 400, 'msg': '出库商品列表不能为空'}), 400
if not data.get('consumer_name') or not data.get('signature_path'):
return jsonify({'code': 400, 'msg': '领用人及签名信息缺失'}), 400
try:
# ★ [修改] 将确认后的操作员名称传给 Service
outbound_record = OutboundService.create_outbound(data, operator_name=final_operator)
# ★ [修改] 调用批量创建服务
outbound_no = OutboundService.create_outbound_batch(data, operator_name=final_operator)
return jsonify({
'code': 200,
'msg': '出库成功',
'data': outbound_record.to_dict()
'data': {'outbound_no': outbound_no}
})
except ValueError as e:
# 业务逻辑错误 (如库存不足)
@ -84,7 +84,7 @@ def create_outbound():
# --------------------------------------------------------
# 3. 获取出库记录列表
# 3. 获取出库记录列表 (分组展示)
# GET /api/v1/outbound
# --------------------------------------------------------
@outbound_bp.route('', methods=['GET'])
@ -94,8 +94,10 @@ def get_outbound_list():
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
keyword = request.args.get('keyword', '')
# 如果前端传了日期范围,可以解析处理,这里暂略
result = OutboundService.get_list(page, limit, keyword)
# ★ [修改] 调用分组查询服务
result = OutboundService.get_grouped_list(page, limit, keyword)
return jsonify({
'code': 200,

View File

@ -6,7 +6,8 @@ class TransOutbound(db.Model):
__tablename__ = 'trans_outbound'
id = db.Column(db.Integer, primary_key=True)
outbound_no = db.Column(db.String(100), unique=True, nullable=False) # 出库单号
# 修改:不再唯一,因为批量出库时多个商品共用一个单号
outbound_no = db.Column(db.String(100), nullable=False)
# 关联源库存信息
sku = db.Column(db.String(100))
@ -18,6 +19,9 @@ class TransOutbound(db.Model):
outbound_type = db.Column(db.String(50), default='SALES') # SALES(销售), USE(领用), TRANSFER(调拨)
quantity = db.Column(db.Numeric(19, 4), nullable=False)
# [新增] 出库时的单价,用于计算金额
unit_price = db.Column(db.Numeric(19, 2), default=0)
# 签字与追溯
consumer_name = db.Column(db.String(100)) # 领用人/客户
signature_path = db.Column(db.Text) # 电子签名图片路径
@ -34,6 +38,7 @@ class TransOutbound(db.Model):
'source_table': self.source_table,
'outbound_type': self.outbound_type,
'quantity': float(self.quantity) if self.quantity else 0,
'unit_price': float(self.unit_price) if self.unit_price else 0,
'consumer_name': self.consumer_name,
'signature_path': self.signature_path,
'outbound_time': self.outbound_time.strftime('%Y-%m-%d %H:%M:%S') if self.outbound_time else None,

View File

@ -1,6 +1,6 @@
import uuid
from datetime import datetime, timezone, timedelta # [修改] 引入 timezone 和 timedelta
from sqlalchemy import or_
from datetime import datetime, timezone, timedelta
from sqlalchemy import or_, func, desc
from app.extensions import db
from app.models.outbound import TransOutbound
@ -8,7 +8,7 @@ from app.models.outbound import TransOutbound
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# ★★★ [关键] 引入基础信息表,用于手动补全名称
# 引入基础信息表
from app.models.base import MaterialBase
@ -17,82 +17,90 @@ class OutboundService:
@staticmethod
def generate_outbound_no():
"""
生成出库单号: OUT-yyyyMMdd-随机码
[修改] 强制使用北京时间生成日期前缀
生成出库单号: OUT-yyyyMMdd-HHmm-当日流水(4位)
例如: OUT-20260205-1558-0001
"""
# 获取北京时间
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz)
now = datetime.now(beijing_tz)
date_str = current_time.strftime('%Y%m%d')
short_uuid = uuid.uuid4().hex[:6].upper()
return f"OUT-{date_str}-{short_uuid}"
date_str = now.strftime('%Y%m%d')
time_str = now.strftime('%H%M')
prefix = f"OUT-{date_str}-"
existing_count = db.session.query(func.count(func.distinct(TransOutbound.outbound_no))) \
.filter(TransOutbound.outbound_no.like(f"{prefix}%")).scalar()
sequence = existing_count + 1
return f"OUT-{date_str}-{time_str}-{sequence:04d}"
@staticmethod
def get_stock_by_barcode(barcode):
"""
[核心逻辑] 根据扫码内容查找对应的库存物品
查找顺序: 成品 (StockProduct) -> 半成品 (StockSemi) -> 采购件 (StockBuy)
匹配逻辑: 匹配 barcode 字段 OR sku 字段
根据扫码内容查找对应的库存物品,并附带价格信息
"""
if not barcode:
return None
clean_code = barcode.strip()
# --- 1. 查找成品表 (StockProduct) ---
def get_price(item, table_type):
if table_type == 'stock_product':
return float(item.sale_price) if item.sale_price else 0
elif table_type == 'stock_buy':
return float(item.unit_price) if item.unit_price else 0
return 0
prod = StockProduct.query.filter(
or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code)
).first()
if prod:
return OutboundService._format_scan_result(prod, 'stock_product')
res = OutboundService._format_scan_result(prod, 'stock_product')
res['price'] = get_price(prod, 'stock_product')
return res
# --- 2. 查找半成品表 (StockSemi) ---
semi = StockSemi.query.filter(
or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code)
).first()
if semi:
return OutboundService._format_scan_result(semi, 'stock_semi')
res = OutboundService._format_scan_result(semi, 'stock_semi')
res['price'] = 0
return res
# --- 3. 查找采购件表 (StockBuy) ---
buy = StockBuy.query.filter(
or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code)
).first()
if buy:
return OutboundService._format_scan_result(buy, 'stock_buy')
res = OutboundService._format_scan_result(buy, 'stock_buy')
res['price'] = get_price(buy, 'stock_buy')
return res
return None
@staticmethod
def _format_scan_result(item, table_name):
"""
[核心修复] 格式化返回数据,确保名称和规格一定能取到
"""
base_name = ""
base_spec = ""
base_cat = ""
base_type = ""
# -------------------------------------------------------
# 修复逻辑:强制获取基础信息
# -------------------------------------------------------
# 步骤 1: 尝试通过 ORM 关联获取 (如果有定义 relationship)
if hasattr(item, 'base') and item.base:
base_name = item.base.name
base_spec = item.base.spec_model
base_cat = item.base.category
base_type = item.base.material_type
# 步骤 2: [关键] 如果步骤1失败但有 base_id则手动查询 MaterialBase 表
# 这能解决“扫码有库存但显示未知物品”的问题
if not base_name and hasattr(item, 'base_id') and item.base_id:
try:
# 手动查基础表
base_info = MaterialBase.query.get(item.base_id)
if base_info:
base_name = base_info.name
base_spec = base_info.spec_model
except Exception as e:
print(f"基础信息查询失败: {e}")
base_cat = base_info.category
base_type = base_info.material_type
except Exception:
pass
# 步骤 3: 兜底逻辑,某些旧表可能直接存了 material_name 字段
if not base_name and hasattr(item, 'material_name'):
base_name = item.material_name
@ -102,107 +110,205 @@ class OutboundService:
return {
'id': item.id,
'sku': item.sku,
'name': base_name or "未知物品", # 此时应该能正确显示名称了
'spec_model': base_spec or "", # 此时应该能正确显示规格了
'source_table': table_name, # 标记来源表
'stock_quantity': stock_qty, # 当前库存
'available_quantity': avail_qty, # 可用库存
'name': base_name or "未知物品",
'spec_model': base_spec or "",
'category': base_cat or "",
'material_type': base_type or "",
'source_table': table_name,
'stock_quantity': stock_qty,
'available_quantity': avail_qty,
'batch_number': getattr(item, 'batch_number', ''),
'warehouse_location': getattr(item, 'warehouse_location', ''),
'barcode': getattr(item, 'barcode', '')
}
@staticmethod
def create_outbound(data, operator_name='System'):
"""
[核心逻辑] 执行出库:扣减对应表的库存 + 创建记录
"""
source_table = data.get('source_table')
stock_id = data.get('stock_id')
quantity = float(data.get('quantity', 0))
def create_outbound_batch(data, operator_name='System'):
items = data.get('items', [])
if not items:
raise ValueError("出库商品列表不能为空")
if quantity <= 0:
raise ValueError("出库数量必须大于0")
outbound_no = OutboundService.generate_outbound_no()
common_data = {
'outbound_no': outbound_no,
'consumer_name': data.get('consumer_name'),
'outbound_type': data.get('outbound_type', 'SALES'),
'signature_path': data.get('signature_path'),
'operator_name': operator_name,
'remark': data.get('remark')
}
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
# 1. 动态映射表模型
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
ModelClass = model_map.get(source_table)
if not ModelClass:
raise ValueError(f"无效的数据来源表: {source_table}")
# 2. 锁定并查询库存 (使用 with_for_update 防止并发超卖)
stock_item = ModelClass.query.with_for_update().get(stock_id)
if not stock_item:
raise ValueError("库存记录不存在或已被删除")
# 3. 校验库存充足
current_avail = float(stock_item.available_quantity)
if current_avail < quantity:
raise ValueError(f"库存不足!当前可用: {current_avail}, 请求出库: {quantity}")
try:
# 4. 扣减库存
stock_item.stock_quantity = float(stock_item.stock_quantity) - quantity
stock_item.available_quantity = float(stock_item.available_quantity) - quantity
for item in items:
source_table = item.get('source_table')
stock_id = item.get('stock_id')
quantity = float(item.get('quantity', 0))
unit_price = float(item.get('price', 0))
# [新增] 计算北京时间
beijing_tz = timezone(timedelta(hours=8))
# replace(tzinfo=None) 是为了让存入数据库的时间变为 naive time (不带时区信息的本地时间)
# 这样数据库看起来就是 "2023-10-27 15:00:00" 而不是 UTC 时间
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
if quantity <= 0:
raise ValueError(f"SKU {item.get('sku')} 的出库数量必须大于0")
# 5. 创建出库记录
new_outbound = TransOutbound(
outbound_no=OutboundService.generate_outbound_no(),
sku=data.get('sku'),
source_table=source_table,
stock_id=stock_id,
barcode=data.get('barcode'),
outbound_type=data.get('outbound_type', 'SALES'),
quantity=quantity,
consumer_name=data.get('consumer_name'),
signature_path=data.get('signature_path'), # 存储签名的 URL
ModelClass = model_map.get(source_table)
if not ModelClass:
continue
# [关键] 显式设置北京时间,覆盖 Model 中的 default=datetime.now (UTC)
outbound_time=current_time,
stock_record = ModelClass.query.with_for_update().get(stock_id)
if not stock_record:
raise ValueError(f"库存记录不存在 (ID: {stock_id})")
operator_name=operator_name,
remark=data.get('remark')
)
if float(stock_record.available_quantity) < quantity:
raise ValueError(f"SKU {stock_record.sku} 库存不足,当前可用: {stock_record.available_quantity}")
stock_record.stock_quantity = float(stock_record.stock_quantity) - quantity
stock_record.available_quantity = float(stock_record.available_quantity) - quantity
new_record = TransOutbound(
sku=item.get('sku'),
source_table=source_table,
stock_id=stock_id,
barcode=item.get('barcode'),
quantity=quantity,
unit_price=unit_price,
outbound_time=current_time,
**common_data
)
db.session.add(new_record)
db.session.add(new_outbound)
db.session.commit()
return new_outbound
return outbound_no
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None):
"""查询出库历史记录"""
query = TransOutbound.query.order_by(TransOutbound.outbound_time.desc())
def get_grouped_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None):
"""
查询出库记录(按出库单号分组),包含详细物品信息
"""
# 1. 查询分页单号
stmt = db.session.query(
TransOutbound.outbound_no,
func.max(TransOutbound.outbound_time).label('max_time')
).group_by(TransOutbound.outbound_no)
if keyword:
query = query.filter(or_(
stmt = stmt.filter(or_(
TransOutbound.outbound_no.ilike(f'%{keyword}%'),
TransOutbound.consumer_name.ilike(f'%{keyword}%'),
TransOutbound.sku.ilike(f'%{keyword}%')
))
if start_date and end_date:
query = query.filter(TransOutbound.outbound_time.between(start_date, end_date))
stmt = stmt.filter(TransOutbound.outbound_time.between(start_date, end_date))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
stmt = stmt.order_by(desc('max_time'))
pagination = stmt.paginate(page=page, per_page=per_page, error_out=False)
outbound_nos = [row.outbound_no for row in pagination.items]
if not outbound_nos:
return {
'items': [],
'total': 0,
'pages': 0,
'current_page': page
}
# 2. 查询详细记录
details = TransOutbound.query.filter(TransOutbound.outbound_no.in_(outbound_nos)).all()
# 3. 组装数据并查询物品详情
grouped_map = {}
# 映射表模型以便查询
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
for d in details:
ono = d.outbound_no
if ono not in grouped_map:
grouped_map[ono] = {
'outbound_no': ono,
'outbound_time': d.outbound_time.strftime('%Y-%m-%d %H:%M:%S'),
'outbound_type': d.outbound_type,
'consumer_name': d.consumer_name,
'operator_name': d.operator_name,
'signature_path': d.signature_path,
'remark': d.remark,
'total_amount': 0.0,
'items': []
}
# --- 查询物品详细信息 (名称, 规格, 类型, 类别) ---
item_name = "未知物品"
item_spec = ""
item_cat = ""
item_type = ""
ModelClass = model_map.get(d.source_table)
if ModelClass and d.stock_id:
# 注意这里在循环中查询可能会有N+1问题但考虑到单页数据量通常每单条目不多暂时可接受
# 生产环境建议优化为预加载或批量查询
try:
stock_item = ModelClass.query.get(d.stock_id)
if stock_item and stock_item.base:
item_name = stock_item.base.name
item_spec = stock_item.base.spec_model
item_cat = stock_item.base.category
item_type = stock_item.base.material_type
elif stock_item and hasattr(stock_item, 'base_id') and stock_item.base_id:
base_info = MaterialBase.query.get(stock_item.base_id)
if base_info:
item_name = base_info.name
item_spec = base_info.spec_model
item_cat = base_info.category
item_type = base_info.material_type
except Exception as e:
print(f"Error fetching detail for stock_id {d.stock_id}: {e}")
# 计算金额
price = float(d.unit_price) if d.unit_price else 0
qty = float(d.quantity)
subtotal = price * qty
grouped_map[ono]['total_amount'] += subtotal
grouped_map[ono]['items'].append({
'sku': d.sku,
'name': item_name,
'spec_model': item_spec,
'category': item_cat,
'material_type': item_type,
'quantity': qty,
'unit_price': price,
'subtotal': subtotal
})
# 4. 排序输出
result_list = []
for ono in outbound_nos:
if ono in grouped_map:
obj = grouped_map[ono]
obj['items'].sort(key=lambda x: x['unit_price'], reverse=True)
obj['total_amount'] = round(obj['total_amount'], 2)
result_list.append(obj)
return {
'items': [item.to_dict() for item in pagination.items],
'items': result_list,
'total': pagination.total,
'pages': pagination.pages,
'current_page': page