Files
KCGL/inventory-backend/app/services/stock_service.py

119 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.stock import StockBuy
from app.models.material import MaterialBase
from sqlalchemy.exc import SQLAlchemyError
class StockService:
@staticmethod
def create_inbound(data):
"""
处理入库逻辑:
1. 根据 SKU 查找物料。
2. 如果没找到,创建新物料 (MaterialBase)。
3. 创建入库单 (StockBuy)。
"""
try:
sku = data.get('sku_code')
material_id = data.get('material_id')
# --- 第一步:确定 material_id ---
# 如果前端没传 ID或者传了但我们想二次确认都通过 SKU 查一遍
existing_material = MaterialBase.query.filter_by(sku_code=sku).first()
if existing_material:
# 场景 A: 物料已存在 -> 直接使用其 ID
material_id = existing_material.id
else:
# 场景 B: 物料不存在 -> 自动创建新物料
if not data.get('material_name'):
raise ValueError(f"SKU [{sku}] 是新物料,必须填写【物料名称】才能入库。")
new_material = MaterialBase(
sku_code=sku,
name=data.get('material_name'),
spec_model=data.get('spec_model'),
unit=data.get('unit'),
category=data.get('category')
)
db.session.add(new_material)
db.session.flush() # 关键:将对象刷入暂存区,以获取自增的 ID
material_id = new_material.id
# --- 第二步:创建入库单 ---
qty = data.get('qty_inbound')
price = data.get('price_unit', 0)
new_stock = StockBuy(
material_id=material_id,
inbound_date=data.get('inbound_date'),
batch_no=data.get('batch_no'),
warehouse_loc=data.get('warehouse_loc'),
supplier_name=data.get('supplier_name'),
# 数量逻辑:初始时,当前量 = 可用量 = 入库量
qty_inbound=qty,
qty_current=qty,
qty_available=qty,
# 财务逻辑
price_unit=price,
price_total=float(qty) * float(price) if qty else 0
)
db.session.add(new_stock)
db.session.commit() # 统一提交事务
return new_stock
except SQLAlchemyError as e:
db.session.rollback() # 数据库报错回滚
raise e
except ValueError as e:
db.session.rollback() # 业务报错回滚
raise e
@staticmethod
def get_list(page, per_page):
"""获取分页列表"""
pagination = StockBuy.query.order_by(StockBuy.inbound_date.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return {
'items': [item.to_dict() for item in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page
}
@staticmethod
def update_inbound(stock_id, data):
"""更新入库单信息 (通常不允许改物料本身,只改入库相关)"""
stock = StockBuy.query.get_or_404(stock_id)
if 'warehouse_loc' in data: stock.warehouse_loc = data['warehouse_loc']
if 'supplier_name' in data: stock.supplier_name = data['supplier_name']
if 'batch_no' in data: stock.batch_no = data['batch_no']
if 'price_unit' in data: stock.price_unit = data['price_unit']
# 如果修改了数量,需要级联更新当前库存
if 'qty_inbound' in data:
old_qty = float(stock.qty_inbound)
new_qty = float(data['qty_inbound'])
diff = new_qty - old_qty
stock.qty_inbound = new_qty
stock.qty_current = float(stock.qty_current) + diff
stock.qty_available = float(stock.qty_available) + diff
db.session.commit()
return stock
@staticmethod
def delete_inbound(stock_id):
"""删除入库单"""
stock = StockBuy.query.get_or_404(stock_id)
db.session.delete(stock)
db.session.commit()