Files
KCGL/inventory-backend/app/services/inbound/buy_service.py
2026-02-05 11:08:29 +08:00

391 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件路径: inventory-backend/app/services/inbound/buy_service.py
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.base import MaterialBase
# 引入出库记录模型,用于查询流转历史
from app.models.outbound import TransOutbound
from datetime import datetime
from sqlalchemy import or_, func, text
import traceback
import json
class BuyInboundService:
# ============================================================
# 1. 基础物料搜索 (供下拉框使用)
# ============================================================
@staticmethod
def search_base_material(keyword):
"""
搜索基础物料
如果 keyword 为空,返回最新的 20 条记录
"""
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
# 无论是否有关键词,都按 ID 倒序排列,取前 20 条
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
"""
处理入库逻辑
"""
try:
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料进行入库 (缺少 base_id)")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
# 兼容 YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD
date_str = str(data['in_date'])
if len(date_str) > 10:
date_str = date_str[:10]
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
# 1. 获取全局打印流水号 (跨表唯一)
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
# 2. 自动生成 SKU (格式: 00000001)
generated_sku = str(next_global_id).zfill(10)
# 3. 条码逻辑处理
final_barcode = data.get('barcode')
if not final_barcode:
final_barcode = generated_sku
# 4. 图片列表转 JSON 字符串处理
arrival_list = data.get('arrival_photo', [])
report_list = data.get('inspection_report', [])
if not isinstance(arrival_list, list): arrival_list = []
if not isinstance(report_list, list): report_list = []
new_stock = StockBuy(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku, # 自动生成的SKU
barcode=final_barcode, # 如果未输入则存入SKU值
in_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
status='在库',
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
unit_price=u_price,
total_price=in_qty * u_price,
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'),
buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'),
detail_link=data.get('detail_link'),
# 将列表转为 JSON 字符串存储
arrival_photo=json.dumps(arrival_list),
inspection_report=json.dumps(report_list)
)
db.session.add(new_stock)
db.session.commit()
# 返回创建的对象实例
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新入库逻辑
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
"""
更新入库记录
"""
try:
print(f"----- UPDATE DEBUG: ID={stock_id} -----")
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
# 基础字段映射
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number',
'batch_number': 'batch_number',
'status': 'status',
'inspection_status': 'inspection_status',
'supplier_name': 'supplier_name',
'detail_link': 'detail_link',
'currency': 'currency',
'exchange_rate': 'exchange_rate',
'purchaser': 'buyer_name',
'purchaser_email': 'buyer_email',
'source_link': 'original_link'
}
for frontend_key, db_attr in field_mapping.items():
if frontend_key in data:
setattr(stock, db_attr, data[frontend_key])
if 'arrival_photo' in data:
imgs = data['arrival_photo']
if isinstance(imgs, list):
stock.arrival_photo = json.dumps(imgs)
if 'inspection_report' in data:
imgs = data['inspection_report']
if isinstance(imgs, list):
stock.inspection_report = json.dumps(imgs)
# 数量与金额联动更新逻辑
qty_changed = False
price_changed = False
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
if new_qty != old_qty:
diff = new_qty - old_qty
stock.in_quantity = new_qty
# 注意:手动修改入库量时,同步调整总库存和可用库存
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
if 'unit_price' in data:
new_price = float(data['unit_price'])
old_price = float(stock.unit_price)
if new_price != old_price:
stock.unit_price = new_price
price_changed = True
if qty_changed or price_changed:
stock.total_price = float(stock.in_quantity) * float(stock.unit_price)
db.session.commit()
print("----- UPDATE SUCCESS -----")
return stock
except Exception as e:
db.session.rollback()
print(f"----- UPDATE FAILED: {str(e)} -----")
traceback.print_exc()
raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod
def delete_inbound(stock_id):
"""
删除入库记录
"""
try:
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. [新增] 获取出库流转历史 (挂钩出库记录)
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
"""
查询该入库单对应的所有出库记录
"""
try:
records = TransOutbound.query.filter_by(
source_table='stock_buy',
stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except Exception as e:
traceback.print_exc()
return []
# ============================================================
# 6. 获取列表 (含动态状态计算)
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None):
try:
# 1. 联表查询StockBuy join MaterialBase
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockBuy.batch_number.ilike(f'%{keyword}%'),
StockBuy.serial_number.ilike(f'%{keyword}%'),
StockBuy.sku.ilike(f'%{keyword}%'),
StockBuy.supplier_name.ilike(f'%{keyword}%')
)
)
pagination = query.order_by(StockBuy.id.desc()).paginate(page=page, per_page=limit, error_out=False)
current_items = pagination.items
base_ids = list(set([item.base_id for item in current_items if item.base_id]))
# 2. 聚合统计 (计算该种物料的总库存)
stock_map = {}
if base_ids:
aggregates = db.session.query(
StockBuy.base_id,
func.sum(StockBuy.stock_quantity).label('total_stock'),
func.sum(StockBuy.available_quantity).label('total_avail')
).filter(StockBuy.base_id.in_(base_ids)).group_by(StockBuy.base_id).all()
for agg in aggregates:
stock_map[agg.base_id] = {
'total_stock': float(agg.total_stock or 0),
'total_avail': float(agg.total_avail or 0)
}
# 辅助函数解析JSON图片列表
def parse_img_list(json_str):
if not json_str:
return []
try:
if not json_str.startswith('['):
return [json_str]
return json.loads(json_str)
except:
return []
items = []
for item in current_items:
# -------------------------------------------------------------
# [核心逻辑] 动态计算状态
# -------------------------------------------------------------
qty_in = float(item.in_quantity or 0)
qty_avail = float(item.available_quantity or 0)
# 默认使用数据库字段
current_status = item.status
# 如果有入库量但可用量为0说明已经全部出库
if qty_in > 0 and qty_avail <= 0:
current_status = '出库'
# 获取聚合数据
mat_name = item.material.name if item.material else '未知物料'
mat_spec = item.material.spec_model if item.material else ''
mat_cat = item.material.category if item.material else ''
mat_unit = item.material.unit if item.material else ''
mat_type = item.material.material_type if item.material else ''
stats = stock_map.get(item.base_id, {'total_stock': 0, 'total_avail': 0})
d = {
'id': item.id,
'base_id': item.base_id,
'material_name': mat_name,
'spec_model': mat_spec,
'category': mat_cat,
'unit': mat_unit,
'material_type': mat_type,
'sku': item.sku,
'inbound_date': str(item.in_date) if item.in_date else '',
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
# 使用动态计算的状态
'status': current_status,
'inspection_status': item.inspection_status,
'qty_inbound': qty_in,
'qty_stock': float(item.stock_quantity or 0),
'qty_available': qty_avail,
'sum_stock': stats['total_stock'],
'sum_available': stats['total_avail'],
'warehouse_loc': item.warehouse_location,
'unit_price': float(item.unit_price or 0),
'total_price': float(item.total_price or 0),
'currency': item.currency,
'exchange_rate': float(item.exchange_rate or 1),
'supplier_name': item.supplier_name,
'purchaser': item.buyer_name,
'purchaser_email': item.buyer_email,
'source_link': item.original_link,
'detail_link': item.detail_link,
'arrival_photo': parse_img_list(item.arrival_photo),
'inspection_report': parse_img_list(item.inspection_report),
'global_print_id': item.global_print_id,
'global_print_id_str': f"{item.global_print_id:08d}" if item.global_print_id else ""
}
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"List Error: {e}")
traceback.print_exc()
return {"total": 0, "items": []}