出库逻辑添加,扫码识别编码成功,后续对应逻辑没有完成

This commit is contained in:
dxc
2026-02-04 17:22:20 +08:00
parent 596f366fc4
commit 797b611530
12 changed files with 919 additions and 28 deletions

View File

@ -43,7 +43,7 @@ def create_app():
print(f"❌ 错误: Auth 模块导入失败: {e}")
# -----------------------------------------------------
# 2.1 注册入库聚合模块 (Inbound) - 【核心修复点】
# 2.1 注册入库聚合模块 (Inbound)
# -----------------------------------------------------
try:
from app.api.v1.inbound import inbound_bp
@ -79,7 +79,7 @@ def create_app():
print(f"❌ 错误: Upload 模块导入失败: {e}")
# -----------------------------------------------------
# 2.4 注册业务操作模块 (Transactions)
# 2.4 注册业务操作模块 (Transactions - 借还/维修/报废)
# -----------------------------------------------------
try:
from app.api.v1.transactions import trans_bp
@ -90,6 +90,19 @@ def create_app():
# 允许模块不存在时不崩溃
print(f"⚠️ 提示: Transaction 模块尚未创建或导入失败: {e}")
# -----------------------------------------------------
# 2.5 ★ [新增] 注册出库模块 (Outbound)
# -----------------------------------------------------
try:
from app.api.v1.outbound import outbound_bp
# 标准: /api/v1/outbound
app.register_blueprint(outbound_bp, url_prefix='/api/v1/outbound')
# 兼容: /api/outbound
app.register_blueprint(outbound_bp, url_prefix='/api/outbound', name='outbound_legacy')
print("✅ Outbound 模块注册成功")
except ImportError as e:
print(f"❌ 错误: Outbound 模块导入失败: {e}")
# =========================================================
# 3. 预加载数据模型
# =========================================================
@ -101,6 +114,9 @@ def create_app():
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# ★ [新增] 出库模型 (确保迁移工具能检测到 trans_outbound 表)
from app.models.outbound import TransOutbound
# 系统与业务模型
from app.models.system import SysUser, SysLog
from app.models.transaction import TransBorrow, TransRepair, TransScrap

View File

@ -0,0 +1,82 @@
from flask import Blueprint, request, jsonify
from app.services.outbound_service import OutboundService
from flask_jwt_extended import jwt_required, get_jwt_identity
outbound_bp = Blueprint('outbound', __name__, url_prefix='/outbound')
# --------------------------------------------------------
# 1. 扫码查询库存接口
# GET /api/v1/outbound/scan?barcode=...
# --------------------------------------------------------
@outbound_bp.route('/scan', methods=['GET'])
@jwt_required()
def scan_barcode():
barcode = request.args.get('barcode')
if not barcode:
return jsonify({'code': 400, 'msg': '请提供条码'}), 400
try:
result = OutboundService.get_stock_by_barcode(barcode)
if result:
return jsonify({'code': 200, 'data': result, 'msg': '扫描成功'})
else:
return jsonify({'code': 404, 'msg': '未找到对应的库存记录'}), 404
except Exception as e:
return jsonify({'code': 500, 'msg': str(e)}), 500
# --------------------------------------------------------
# 2. 提交出库单接口
# POST /api/v1/outbound
# --------------------------------------------------------
@outbound_bp.route('', methods=['POST'])
@jwt_required()
def create_outbound():
data = request.get_json()
if not data:
return jsonify({'code': 400, 'msg': '无有效数据'}), 400
current_user = get_jwt_identity() # 获取当前登录用户作为操作员
# 简单的必填校验 (更复杂的校验可放入 Schema)
required_fields = ['stock_id', 'source_table', 'quantity', 'consumer_name', 'signature_path']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({'code': 400, 'msg': f'缺少必填字段: {field}'}), 400
try:
outbound_record = OutboundService.create_outbound(data, operator_name=current_user)
return jsonify({
'code': 200,
'msg': '出库成功',
'data': outbound_record.to_dict()
})
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
return jsonify({'code': 500, 'msg': f'服务器内部错误: {str(e)}'}), 500
# --------------------------------------------------------
# 3. 获取出库记录列表
# GET /api/v1/outbound
# --------------------------------------------------------
@outbound_bp.route('', methods=['GET'])
@jwt_required()
def get_outbound_list():
try:
page = int(request.args.get('page', 1))
per_page = int(request.args.get('limit', 10))
keyword = request.args.get('keyword', '')
# 日期范围处理可根据前端传参格式调整
result = OutboundService.get_list(page, per_page, keyword)
return jsonify({
'code': 200,
'msg': '获取成功',
'data': result
})
except Exception as e:
return jsonify({'code': 500, 'msg': str(e)}), 500

View File

@ -0,0 +1,42 @@
from app.extensions import db
from datetime import datetime
class TransOutbound(db.Model):
__tablename__ = 'trans_outbound'
id = db.Column(db.Integer, primary_key=True)
outbound_no = db.Column(db.String(100), unique=True, nullable=False) # 出库单号
# 关联源库存信息
sku = db.Column(db.String(100))
source_table = db.Column(db.String(50)) # 'stock_buy', 'stock_product', etc.
stock_id = db.Column(db.Integer) # 对应源表的主键ID
barcode = db.Column(db.String(100)) # 实际扫码内容
# 业务信息
outbound_type = db.Column(db.String(50), default='SALES') # SALES, USE, TRANSFER
quantity = db.Column(db.Numeric(19, 4), nullable=False)
# 签字与追溯
consumer_name = db.Column(db.String(100)) # 领用人/客户
signature_path = db.Column(db.Text) # 签名图片路径
outbound_time = db.Column(db.DateTime, default=datetime.now)
operator_name = db.Column(db.String(100)) # 操作员
remark = db.Column(db.Text)
def to_dict(self):
return {
'id': self.id,
'outbound_no': self.outbound_no,
'sku': self.sku,
'source_table': self.source_table,
'outbound_type': self.outbound_type,
'quantity': float(self.quantity) if self.quantity else 0,
'consumer_name': self.consumer_name,
'signature_path': self.signature_path,
'outbound_time': self.outbound_time.strftime('%Y-%m-%d %H:%M:%S') if self.outbound_time else None,
'operator_name': self.operator_name,
'remark': self.remark
}

View File

@ -0,0 +1,153 @@
import uuid
from datetime import datetime
from sqlalchemy import or_
from app.extensions import db
from app.models.outbound import TransOutbound
# 导入所有库存实体模型,用于查找和扣减
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
class OutboundService:
@staticmethod
def generate_outbound_no():
"""生成出库单号: OUT-yyyyMMdd-随机码"""
date_str = datetime.now().strftime('%Y%m%d')
short_uuid = uuid.uuid4().hex[:6].upper()
return f"OUT-{date_str}-{short_uuid}"
@staticmethod
def get_stock_by_barcode(barcode):
"""
根据条码在各个库存表中查找
优先级: 成品 -> 半成品 -> 采购件
"""
if not barcode:
return None
# 1. 查成品
prod = StockProduct.query.filter_by(barcode=barcode).first()
if prod:
return OutboundService._format_scan_result(prod, 'stock_product', prod.sku)
# 2. 查半成品
semi = StockSemi.query.filter_by(barcode=barcode).first()
if semi:
return OutboundService._format_scan_result(semi, 'stock_semi', semi.sku)
# 3. 查采购件
buy = StockBuy.query.filter_by(barcode=barcode).first()
if buy:
# 采购件可能需要关联 material_base 获取名称,这里假设 base_id 关联已建立
name = buy.base.name if buy.base else "未知采购件"
spec = buy.base.spec_model if buy.base else ""
return OutboundService._format_scan_result(buy, 'stock_buy', buy.sku, name, spec)
return None
@staticmethod
def _format_scan_result(item, table_name, sku, name=None, spec=None):
"""格式化返回给前端的数据结构"""
# 如果没有传 name (例如成品/半成品),尝试通过关联获取,或者直接用 SKU 代替
item_name = name
item_spec = spec
if not item_name and hasattr(item, 'base') and item.base:
item_name = item.base.name
item_spec = item.base.spec_model
return {
'id': item.id,
'sku': sku,
'name': item_name or sku,
'spec_model': item_spec or '',
'source_table': table_name,
'stock_quantity': float(item.stock_quantity),
'available_quantity': float(item.available_quantity),
'batch_number': getattr(item, 'batch_number', ''),
'warehouse_location': getattr(item, 'warehouse_location', '')
}
@staticmethod
def create_outbound(data, operator_name='System'):
"""执行出库逻辑:扣减库存 + 记录日志"""
source_table = data.get('source_table')
stock_id = data.get('stock_id')
quantity = float(data.get('quantity', 0))
if quantity <= 0:
raise ValueError("出库数量必须大于0")
# 1. 获取对应的库存记录模型
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
'stock_product': StockProduct
}
ModelClass = model_map.get(source_table)
if not ModelClass:
raise ValueError(f"未知的库存来源表: {source_table}")
# 2. 锁定并查询库存 (使用 with_for_update 防止并发扣减)
stock_item = ModelClass.query.with_for_update().get(stock_id)
if not stock_item:
raise ValueError("库存记录不存在")
if stock_item.available_quantity < quantity:
raise ValueError(f"库存不足!当前可用: {stock_item.available_quantity}, 请求出库: {quantity}")
try:
# 3. 扣减库存
stock_item.stock_quantity -= quantity
stock_item.available_quantity -= quantity
# 4. 创建出库记录
new_outbound = TransOutbound(
outbound_no=OutboundService.generate_outbound_no(),
sku=data.get('sku'),
source_table=source_table,
stock_id=stock_id,
barcode=data.get('barcode'),
outbound_type=data.get('outbound_type', 'SALES'),
quantity=quantity,
consumer_name=data.get('consumer_name'),
signature_path=data.get('signature_path'), # 存储签名的 URL
operator_name=operator_name,
remark=data.get('remark')
)
db.session.add(new_outbound)
db.session.commit()
return new_outbound
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None):
query = TransOutbound.query.order_by(TransOutbound.outbound_time.desc())
if keyword:
query = query.filter(or_(
TransOutbound.outbound_no.ilike(f'%{keyword}%'),
TransOutbound.consumer_name.ilike(f'%{keyword}%'),
TransOutbound.sku.ilike(f'%{keyword}%')
))
if start_date and end_date:
# 假设传入的是 'YYYY-MM-DD',需要处理时间范围
query = query.filter(TransOutbound.outbound_time.between(start_date, end_date))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return {
'items': [item.to_dict() for item in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
}