Files
KCGL/inventory-backend/app/services/inbound/buy_service.py

295 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/buy_service.py
from app.extensions import db
from app.models.material import MaterialBase
from app.models.stock import StockBuy
from datetime import datetime
from sqlalchemy import or_
import traceback
class BuyInboundService:
@staticmethod
def search_base_material(keyword):
"""
搜索基础物料库
"""
try:
if not keyword:
return []
# 过滤条件:名称或规格包含关键词,且 is_enabled 为 True
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
"""
新增入库逻辑
"""
try:
# 1. 核心校验
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料进行入库 (缺少 base_id)")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
# 2. 处理日期 (防止空字符串报错)
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
date_str = str(data['in_date'])
# 截取前10位 YYYY-MM-DD
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str[:10], '%Y-%m-%d').date()
else:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass # 格式错误则使用当前日期
# 3. 数据转换
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
# 4. 创建 StockBuy
new_stock = StockBuy(
base_id=material.id,
sku=data.get('sku'),
in_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
barcode=data.get('barcode'),
# --- 状态与数量强制逻辑 ---
status='在库',
in_quantity=in_qty,
stock_quantity=in_qty, # 初始库存 = 入库量
available_quantity=in_qty, # 初始可用 = 入库量
inspection_status=data.get('inspection_status', '未检'),
# ----------------
warehouse_location=data.get('warehouse_location'),
unit_price=u_price,
total_price=in_qty * u_price,
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'),
# [字段映射] 前端 -> DB
buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'),
detail_link=data.get('detail_link'),
arrival_photo=data.get('arrival_photo'),
remark=data.get('remark')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
print(f"Insert Error: {str(e)}")
raise e
@staticmethod
def update_inbound(stock_id, data):
"""
更新入库逻辑
"""
try:
print(f"----- UPDATE DEBUG: ID={stock_id} -----")
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
# 1. 字段映射字典前端Key -> Model属性名
# 使用映射可以避免写大量 if...else且逻辑更清晰
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number',
'batch_number': 'batch_number',
'status': 'status',
'inspection_status': 'inspection_status',
'supplier_name': 'supplier_name',
'detail_link': 'detail_link',
'arrival_photo': 'arrival_photo',
'remark': 'remark',
'currency': 'currency',
'exchange_rate': 'exchange_rate',
# 关键映射
'purchaser': 'buyer_name',
'purchaser_email': 'buyer_email',
'source_link': 'original_link'
}
# 遍历更新 (排除日期、数量、单价,下面单独处理)
for frontend_key, db_attr in field_mapping.items():
if frontend_key in data:
setattr(stock, db_attr, data[frontend_key])
# 2. 核心数值逻辑 (数量 & 单价 & 总价)
qty_changed = False
price_changed = False
# (A) 处理入库数量变更
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
if new_qty != old_qty:
print(f"Quantity Changed: {old_qty} -> {new_qty}")
diff = new_qty - old_qty
stock.in_quantity = new_qty
# 联动更新库存和可用量
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
# (B) 处理单价变更
if 'unit_price' in data:
new_price = float(data['unit_price'])
old_price = float(stock.unit_price)
if new_price != old_price:
print(f"Price Changed: {old_price} -> {new_price}")
stock.unit_price = new_price
price_changed = True
# (C) 强制重算总价
if qty_changed or price_changed:
stock.total_price = float(stock.in_quantity) * float(stock.unit_price)
print(f"New Total Price: {stock.total_price}")
db.session.commit()
print("----- UPDATE SUCCESS -----")
return stock
except Exception as e:
db.session.rollback()
print(f"----- UPDATE FAILED: {str(e)} -----")
traceback.print_exc()
raise e
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page, limit, keyword=None):
"""
列表查询
"""
try:
# 使用 Outer Join 确保即使 MaterialBase 物理删除,库存记录也不会报错或消失
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
# 搜索逻辑
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockBuy.batch_number.ilike(f'%{keyword}%'),
StockBuy.serial_number.ilike(f'%{keyword}%'),
StockBuy.sku.ilike(f'%{keyword}%')
)
)
pagination = query.order_by(StockBuy.id.desc()).paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
# 获取关联的基础信息,如果关联不存在则给默认值
mat_name = item.material.name if item.material else '未知物料'
mat_spec = item.material.spec_model if item.material else ''
mat_cat = item.material.category if item.material else ''
mat_unit = item.material.unit if item.material else ''
mat_type = item.material.material_type if item.material else ''
# 构建返回字典
d = {
'id': item.id,
'base_id': item.base_id,
'material_name': mat_name,
'spec_model': mat_spec,
'category': mat_cat,
'unit': mat_unit,
'material_type': mat_type,
'sku': item.sku,
'inbound_date': str(item.in_date) if item.in_date else '',
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
'status': item.status,
'inspection_status': item.inspection_status,
'qty_inbound': float(item.in_quantity or 0),
'qty_stock': float(item.stock_quantity or 0),
'qty_available': float(item.available_quantity or 0),
'warehouse_loc': item.warehouse_location,
'unit_price': float(item.unit_price or 0),
'total_price': float(item.total_price or 0),
'currency': item.currency,
'exchange_rate': float(item.exchange_rate or 1),
'supplier_name': item.supplier_name,
# [关键映射] DB -> Frontend
'purchaser': item.buyer_name,
'purchaser_email': item.buyer_email,
'source_link': item.original_link,
'detail_link': item.detail_link,
'arrival_photo': item.arrival_photo,
'remark': item.remark
}
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
# 打印错误到 Docker 日志
print(f"List Error: {e}")
traceback.print_exc()
return {"total": 0, "items": []}