Files
KCGL/inventory-backend/app/services/inbound/semi_service.py

339 lines
13 KiB
Python

from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.semi import StockSemi
from datetime import datetime
from sqlalchemy import or_, func
import traceback
class SemiInboundService:
@staticmethod
def search_base_material(keyword):
"""
搜索基础物料,逻辑与采购入库基本一致。
如果需要只搜索'半成品'类型的物料,可以在 filter 中增加条件。
"""
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料 (缺少 base_id)")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
# 1. 处理入库日期
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
date_str = str(data['in_date'])[:10]
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass
# 2. 处理生产时间 (前端传来的是字符串 "YYYY-MM-DD HH:mm:ss")
p_start = None
p_end = None
if data.get('production_start_time'):
try:
p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
if data.get('production_end_time'):
try:
p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
# 3. 处理数值和成本
in_qty = float(data.get('in_quantity') or 0)
raw_cost = float(data.get('raw_material_cost') or 0)
manual_cost = float(data.get('manual_cost') or 0)
# 单件总成本 = 原料 + 人工
unit_total_cost = raw_cost + manual_cost
# 总价值 = 单件总成本 * 数量
total_value = unit_total_cost * in_qty
# 4. 创建记录
new_stock = StockSemi(
base_id=material.id,
sku=data.get('sku'),
in_date=in_date_val,
# 标识信息
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
barcode=data.get('barcode'),
# 状态与数量
status='在库',
quality_status=data.get('quality_status', '合格'), # 半成品使用质量状态
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
warehouse_location=data.get('warehouse_location'),
# 生产任务信息 (半成品特有)
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_start_time=p_start,
production_end_time=p_end,
# 成本信息 (半成品特有)
raw_material_cost=raw_cost,
manual_cost=manual_cost,
unit_total_cost=unit_total_cost,
total_price=total_value, # 数据库字段可能复用 total_price 或叫 total_value
# 链接
quality_report_link=data.get('quality_report_link'),
detail_link=data.get('detail_link'),
remark=data.get('remark')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_inbound(stock_id, data):
try:
print(f"----- UPDATE SEMI DEBUG: ID={stock_id} -----")
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
# 1. 简单字段映射
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number',
'batch_number': 'batch_number',
'status': 'status',
'quality_status': 'quality_status', # 质量状态
# 生产信息
'bom_code': 'bom_code',
'bom_version': 'bom_version',
'work_order_code': 'work_order_code',
'production_manager': 'production_manager',
'quality_report_link': 'quality_report_link',
'detail_link': 'detail_link'
}
for frontend_key, db_attr in field_mapping.items():
if frontend_key in data:
setattr(stock, db_attr, data[frontend_key])
# 2. 处理时间更新
if 'production_start_time' in data:
try:
if data['production_start_time']:
stock.production_start_time = datetime.strptime(str(data['production_start_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_start_time = None
except:
pass
if 'production_end_time' in data:
try:
if data['production_end_time']:
stock.production_end_time = datetime.strptime(str(data['production_end_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_end_time = None
except:
pass
# 3. 处理数量和成本变更联动
qty_changed = False
cost_changed = False
# 更新数量
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
if new_qty != old_qty:
diff = new_qty - old_qty
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
# 更新成本 (原材料 or 人工)
if 'raw_material_cost' in data:
stock.raw_material_cost = float(data['raw_material_cost'])
cost_changed = True
if 'manual_cost' in data:
stock.manual_cost = float(data['manual_cost'])
cost_changed = True
# 如果成本或数量变了,重新计算单价和总价
if cost_changed:
stock.unit_total_cost = float(stock.raw_material_cost) + float(stock.manual_cost)
if cost_changed or qty_changed:
stock.total_price = float(stock.in_quantity) * float(stock.unit_total_cost)
db.session.commit()
print("----- UPDATE SEMI SUCCESS -----")
return stock
except Exception as e:
db.session.rollback()
print(f"----- UPDATE SEMI FAILED: {str(e)} -----")
traceback.print_exc()
raise e
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page, limit, keyword=None):
try:
# 1. 查询分页数据
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockSemi.batch_number.ilike(f'%{keyword}%'),
StockSemi.serial_number.ilike(f'%{keyword}%'),
StockSemi.sku.ilike(f'%{keyword}%'),
# 增加半成品特有的搜索字段
StockSemi.work_order_code.ilike(f'%{keyword}%'),
StockSemi.bom_code.ilike(f'%{keyword}%')
)
)
pagination = query.order_by(StockSemi.id.desc()).paginate(page=page, per_page=limit, error_out=False)
# 2. 聚合统计 (计算该物料的总库存)
current_items = pagination.items
base_ids = list(set([item.base_id for item in current_items if item.base_id]))
stock_map = {}
if base_ids:
aggregates = db.session.query(
StockSemi.base_id,
func.sum(StockSemi.stock_quantity).label('total_stock'),
func.sum(StockSemi.available_quantity).label('total_avail')
).filter(StockSemi.base_id.in_(base_ids)).group_by(StockSemi.base_id).all()
for agg in aggregates:
stock_map[agg.base_id] = {
'total_stock': float(agg.total_stock or 0),
'total_avail': float(agg.total_avail or 0)
}
items = []
for item in current_items:
mat_name = item.material.name if item.material else '未知物料'
mat_spec = item.material.spec_model if item.material else ''
mat_cat = item.material.category if item.material else ''
mat_unit = item.material.unit if item.material else ''
mat_type = item.material.material_type if item.material else ''
stats = stock_map.get(item.base_id, {'total_stock': 0, 'total_avail': 0})
d = {
'id': item.id,
'base_id': item.base_id,
'material_name': mat_name,
'spec_model': mat_spec,
'category': mat_cat,
'unit': mat_unit,
'material_type': mat_type,
'sku': item.sku,
'inbound_date': str(item.in_date) if item.in_date else '',
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
'status': item.status,
'quality_status': item.quality_status, # 半成品使用质量状态
'qty_inbound': float(item.in_quantity or 0),
'qty_stock': float(item.stock_quantity or 0),
'qty_available': float(item.available_quantity or 0),
'sum_stock': stats['total_stock'],
'sum_available': stats['total_avail'],
'warehouse_loc': item.warehouse_location,
# 半成品特有字段返回
'bom_code': item.bom_code,
'bom_version': item.bom_version,
'work_order_code': item.work_order_code,
'raw_material_cost': float(item.raw_material_cost or 0),
'manual_cost': float(item.manual_cost or 0),
'unit_total_cost': float(item.unit_total_cost or 0),
'production_manager': item.production_manager,
'production_start_time': str(item.production_start_time) if item.production_start_time else '',
'production_end_time': str(item.production_end_time) if item.production_end_time else '',
'quality_report_link': item.quality_report_link,
'detail_link': item.detail_link,
'remark': item.remark
}
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"List Error: {e}")
traceback.print_exc()
return {"total": 0, "items": []}