Files
KCGL/inventory-backend/app/services/inbound/buy_service.py
2026-02-05 11:37:06 +08:00

301 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/buy_service.py
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.base import MaterialBase
from app.models.outbound import TransOutbound
from datetime import datetime
from sqlalchemy import or_, func, text, and_
import traceback
import json
class BuyInboundService:
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword):
"""搜索基础物料"""
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id, 'name': item.name, 'spec': item.spec_model,
'category': item.category, 'unit': item.unit,
'type': item.material_type, 'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
"""新增入库"""
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10: date_str = date_str[:10]
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except:
pass
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
generated_sku = str(next_global_id).zfill(10)
final_barcode = data.get('barcode') or generated_sku
arrival_list = data.get('arrival_photo', [])
report_list = data.get('inspection_report', [])
if not isinstance(arrival_list, list): arrival_list = []
if not isinstance(report_list, list): report_list = []
new_stock = StockBuy(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
barcode=final_barcode,
in_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
status=data.get('status', '在库'), # 默认在库
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
unit_price=u_price,
total_price=in_qty * u_price,
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'),
buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'),
detail_link=data.get('detail_link'),
arrival_photo=json.dumps(arrival_list),
inspection_report=json.dumps(report_list)
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新入库逻辑
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
"""更新入库"""
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
field_mapping = {
'sku': 'sku', 'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number', 'batch_number': 'batch_number',
'status': 'status', 'inspection_status': 'inspection_status',
'supplier_name': 'supplier_name', 'detail_link': 'detail_link',
'currency': 'currency', 'exchange_rate': 'exchange_rate',
'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email',
'source_link': 'original_link'
}
for k, v in field_mapping.items():
if k in data: setattr(stock, v, data[k])
if 'arrival_photo' in data and isinstance(data['arrival_photo'], list):
stock.arrival_photo = json.dumps(data['arrival_photo'])
if 'inspection_report' in data and isinstance(data['inspection_report'], list):
stock.inspection_report = json.dumps(data['inspection_report'])
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'unit_price' in data:
stock.unit_price = float(data['unit_price'])
stock.total_price = float(stock.in_quantity) * float(stock.unit_price)
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod
def delete_inbound(stock_id):
"""删除入库"""
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 获取出库流转历史
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
"""获取出库历史"""
try:
records = TransOutbound.query.filter_by(
source_table='stock_buy', stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except:
return []
# ============================================================
# 6. 获取列表 (核心逻辑修改)
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None):
"""
获取列表
:param statuses: 状态列表 (e.g. ['在库', '借库', '已出库'])
"""
try:
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
# 1. 关键词搜索 (覆盖所有关键字段)
if keyword:
kw = f'%{keyword}%'
query = query.filter(
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
StockBuy.batch_number.ilike(kw),
StockBuy.serial_number.ilike(kw),
StockBuy.sku.ilike(kw),
StockBuy.supplier_name.ilike(kw)
)
)
# 2. 状态筛选与零库存隐藏逻辑
# 用户要求:
# - 默认显示:'在库', '借库'。
# - 零库存规则库存为0时不在页面显示除非筛选了'已出库')。
if not statuses:
# 默认情况:只查 '在库' 和 '借库'
statuses = ['在库', '借库']
# 构建筛选条件
# 如果筛选条件中 包含 '已出库',则允许显示 stock_quantity >= 0 (即显示所有)
# 如果筛选条件中 不包含 '已出库',则强制要求 stock_quantity > 0 (隐藏零库存)
if '已出库' in statuses:
# 用户想看已出库的,直接按状态查,不做数量限制
query = query.filter(StockBuy.status.in_(statuses))
else:
# 用户不想看已出库的,按状态查 AND 数量必须 > 0
query = query.filter(
and_(
StockBuy.status.in_(statuses),
StockBuy.stock_quantity > 0
)
)
pagination = query.order_by(StockBuy.id.desc()).paginate(page=page, per_page=limit, error_out=False)
current_items = pagination.items
def parse_img(json_str):
if not json_str: return []
try:
return json.loads(json_str) if json_str.startswith('[') else [json_str]
except:
return []
items = []
for item in current_items:
# 获取单行数据,不再进行聚合计算
qty_stock = float(item.stock_quantity or 0)
qty_avail = float(item.available_quantity or 0)
d = {
'id': item.id,
'base_id': item.base_id,
'material_name': item.material.name if item.material else '',
'spec_model': item.material.spec_model if item.material else '',
'category': item.material.category if item.material else '',
'unit': item.material.unit if item.material else '',
'material_type': item.material.material_type if item.material else '',
'sku': item.sku,
'inbound_date': str(item.in_date) if item.in_date else '',
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
'status': item.status,
'inspection_status': item.inspection_status,
'qty_inbound': float(item.in_quantity or 0),
'qty_stock': qty_stock,
'qty_available': qty_avail,
# 解除挂钩:不再返回所有批次的总和,直接返回当前批次的数量
# 为了兼容前端字段名,这里直接用当前行数量填充
'sum_stock': qty_stock,
'sum_available': qty_avail,
'warehouse_loc': item.warehouse_location,
'unit_price': float(item.unit_price or 0),
'total_price': float(item.total_price or 0),
'currency': item.currency,
'exchange_rate': float(item.exchange_rate or 1),
'supplier_name': item.supplier_name,
'purchaser': item.buyer_name,
'purchaser_email': item.buyer_email,
'source_link': item.original_link,
'detail_link': item.detail_link,
'arrival_photo': parse_img(item.arrival_photo),
'inspection_report': parse_img(item.inspection_report),
'global_print_id': item.global_print_id,
'global_print_id_str': f"{item.global_print_id:08d}" if item.global_print_id else ""
}
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
traceback.print_exc()
return {"total": 0, "items": []}