Files
KCGL/inventory-backend/app/services/inbound/product_service.py

226 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/product_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.product import StockProduct
from datetime import datetime
from sqlalchemy import or_, func, text
import traceback
class ProductInboundService:
@staticmethod
def search_base_material(keyword):
try:
if not keyword:
# 如果没有关键词返回最新的20条
query = MaterialBase.query.filter(MaterialBase.is_enabled == True).order_by(
MaterialBase.id.desc()).limit(20)
else:
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%'))
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id, 'name': item.name, 'spec': item.spec_model,
'category': item.category, 'unit': item.unit, 'type': item.material_type
})
return results
except Exception:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
# 兼容字符串格式日期处理
date_str = str(data['in_date'])
if len(date_str) > 10:
date_str = date_str[:10]
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except:
pass
in_qty = float(data.get('in_quantity') or 0)
# 处理生产时间范围
p_start = data.get('production_start_time', '')
p_end = data.get('production_end_time', '')
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
# ------------------------------------------------------------------
# 1. 获取全局打印流水号 (跨表唯一,用于打印逻辑)
# ------------------------------------------------------------------
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
# ------------------------------------------------------------------
# 2. 自动生成 SKU (格式: 10位数字补零)
# ------------------------------------------------------------------
generated_sku = str(next_global_id).zfill(10)
# ------------------------------------------------------------------
# 3. 条码逻辑处理
# 如果前端没传条码,则默认使用 SKU 作为条码
# ------------------------------------------------------------------
final_barcode = data.get('barcode')
if not final_barcode:
final_barcode = generated_sku
new_stock = StockProduct(
base_id=material.id,
global_print_id=next_global_id, # 新增全局打印ID
sku=generated_sku, # 使用自动生成的SKU
production_date=in_date_val,
barcode=final_barcode,
serial_number=data.get('serial_number'),
status='在库',
warehouse_location=data.get('warehouse_location'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_time_range=time_range,
raw_material_cost=float(data.get('raw_material_cost') or 0),
manual_cost=float(data.get('manual_cost') or 0),
quality_status=data.get('quality_status', '合格'),
quality_report_link=data.get('quality_report_link'),
detail_link=data.get('detail_link'),
sale_price=float(data.get('sale_price') or 0),
inspection_report_link=data.get('inspection_report_link'),
order_id=data.get('order_id')
)
db.session.add(new_stock)
db.session.commit()
# 返回对象实例以便上层调用 to_dict()
return new_stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_inbound(stock_id, data):
try:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# 允许更新的字段列表
fields = [
'barcode', 'serial_number', 'warehouse_location',
'status', 'quality_status', 'bom_code', 'bom_version',
'work_order_code', 'production_manager', 'quality_report_link',
'detail_link', 'inspection_report_link', 'order_id'
]
for f in fields:
if f in data: setattr(stock, f, data[f])
# 数值类型处理
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
if 'manual_cost' in data: stock.manual_cost = float(data['manual_cost'])
# 数量更新逻辑 (同步更新库存和可用量)
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
if new_qty != old_qty:
diff = new_qty - old_qty
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
# 时间范围处理
if 'production_start_time' in data or 'production_end_time' in data:
old_range = stock.production_time_range or " ~ "
parts = old_range.split(' ~ ')
# 获取原值防止越界
old_start = parts[0] if len(parts) > 0 else ''
old_end = parts[1] if len(parts) > 1 else ''
start = data.get('production_start_time', old_start)
end = data.get('production_end_time', old_end)
stock.production_time_range = f"{start} ~ {end}"
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockProduct.query.get(stock_id)
if stock:
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page, limit, keyword=None):
try:
# 联表查询
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
if keyword:
query = query.filter(or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockProduct.serial_number.ilike(f'%{keyword}%'),
StockProduct.work_order_code.ilike(f'%{keyword}%'),
StockProduct.order_id.ilike(f'%{keyword}%'),
StockProduct.sku.ilike(f'%{keyword}%')
))
pagination = query.order_by(StockProduct.id.desc()).paginate(page=page, per_page=limit, error_out=False)
# 计算聚合库存
current_items = pagination.items
base_ids = list(set([i.base_id for i in current_items]))
stock_map = {}
if base_ids:
aggs = db.session.query(
StockProduct.base_id,
func.sum(StockProduct.stock_quantity).label('s'),
func.sum(StockProduct.available_quantity).label('a')
).filter(StockProduct.base_id.in_(base_ids)).group_by(StockProduct.base_id).all()
for a in aggs:
stock_map[a.base_id] = {'s': float(a.s or 0), 'a': float(a.a or 0)}
items = []
for item in current_items:
d = item.to_dict()
stats = stock_map.get(item.base_id, {'s': 0, 'a': 0})
d['sum_stock'] = stats['s']
d['sum_available'] = stats['a']
items.append(d)
return {"total": pagination.total, "items": items}
except:
traceback.print_exc()
return {"total": 0, "items": []}