331 lines
13 KiB
Python
331 lines
13 KiB
Python
# app/services/inbound/semi_service.py
|
||
from app.extensions import db
|
||
from app.models.base import MaterialBase
|
||
# ---------------------------------------------------------------------
|
||
# ❌ 头部禁止导入 StockSemi,防止 Circular Import
|
||
# ---------------------------------------------------------------------
|
||
from datetime import datetime
|
||
from sqlalchemy import or_, func, text
|
||
import traceback
|
||
|
||
|
||
class SemiInboundService:
|
||
@staticmethod
|
||
def search_base_material(keyword):
|
||
try:
|
||
if not keyword:
|
||
return []
|
||
|
||
query = MaterialBase.query.filter(
|
||
MaterialBase.is_enabled == True,
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
).limit(20)
|
||
|
||
results = []
|
||
for item in query.all():
|
||
results.append({
|
||
'id': item.id,
|
||
'name': item.name,
|
||
'spec': item.spec_model,
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type,
|
||
'status': '启用'
|
||
})
|
||
return results
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
@staticmethod
|
||
def handle_inbound(data):
|
||
# ✅ 【关键修复】局部导入 Model,解决循环引用
|
||
from app.models.inbound.semi import StockSemi
|
||
|
||
try:
|
||
base_id = data.get('base_id')
|
||
if not base_id:
|
||
raise ValueError("必须选择基础物料 (缺少 base_id)")
|
||
|
||
material = MaterialBase.query.get(base_id)
|
||
if not material:
|
||
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
|
||
|
||
# 1. 处理入库日期
|
||
in_date_val = datetime.utcnow().date()
|
||
if data.get('in_date'):
|
||
try:
|
||
date_str = str(data['in_date'])
|
||
if len(date_str) > 10:
|
||
date_str = date_str[:10]
|
||
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
|
||
except ValueError:
|
||
pass
|
||
|
||
# 2. 处理生产时间
|
||
p_start = None
|
||
p_end = None
|
||
if data.get('production_start_time'):
|
||
try:
|
||
p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S')
|
||
except:
|
||
pass
|
||
if data.get('production_end_time'):
|
||
try:
|
||
p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S')
|
||
except:
|
||
pass
|
||
|
||
# ✅ 优化:处理 time_range 字符串,防止前端传数组导致存入 None
|
||
time_range_str = None
|
||
raw_range = data.get('production_time_range')
|
||
if isinstance(raw_range, list):
|
||
time_range_str = " ~ ".join([str(x) for x in raw_range])
|
||
elif isinstance(raw_range, str):
|
||
time_range_str = raw_range
|
||
|
||
# 3. 处理数值和成本
|
||
in_qty = float(data.get('in_quantity') or 0)
|
||
raw_cost = float(data.get('raw_material_cost') or 0)
|
||
manual_cost = float(data.get('manual_cost') or 0)
|
||
unit_total_cost = raw_cost + manual_cost
|
||
total_value = unit_total_cost * in_qty
|
||
|
||
# ------------------------------------------------------------------
|
||
# 4. 获取全局打印流水号 (跨表唯一)
|
||
# ------------------------------------------------------------------
|
||
next_global_id = 0
|
||
try:
|
||
seq_sql = text("SELECT nextval('global_print_seq')")
|
||
result = db.session.execute(seq_sql)
|
||
next_global_id = result.scalar()
|
||
except Exception as e:
|
||
print("❌ 数据库序列 global_print_seq 不存在,请执行SQL创建!")
|
||
raise e
|
||
|
||
# ------------------------------------------------------------------
|
||
# 5. 自动生成 SKU (格式: 00000001)
|
||
# ------------------------------------------------------------------
|
||
generated_sku = str(next_global_id).zfill(10)
|
||
final_sku = data.get('sku')
|
||
if not final_sku:
|
||
final_sku = generated_sku
|
||
|
||
# ------------------------------------------------------------------
|
||
# 6. 条码逻辑处理
|
||
# ------------------------------------------------------------------
|
||
final_barcode = data.get('barcode')
|
||
if not final_barcode:
|
||
final_barcode = final_sku
|
||
|
||
# 7. 创建记录
|
||
new_stock = StockSemi(
|
||
base_id=material.id,
|
||
global_print_id=next_global_id,
|
||
sku=final_sku,
|
||
production_date=in_date_val,
|
||
|
||
serial_number=data.get('serial_number'),
|
||
batch_number=data.get('batch_number'),
|
||
barcode=final_barcode,
|
||
|
||
status='在库',
|
||
quality_status=data.get('quality_status', '合格'),
|
||
in_quantity=in_qty,
|
||
stock_quantity=in_qty,
|
||
available_quantity=in_qty,
|
||
warehouse_location=data.get('warehouse_location'),
|
||
|
||
bom_code=data.get('bom_code'),
|
||
bom_version=data.get('bom_version'),
|
||
work_order_code=data.get('work_order_code'),
|
||
production_manager=data.get('production_manager'),
|
||
|
||
production_start_time=p_start,
|
||
production_end_time=p_end,
|
||
production_time_range=time_range_str,
|
||
|
||
raw_material_cost=raw_cost,
|
||
manual_cost=manual_cost,
|
||
total_price=total_value,
|
||
|
||
quality_report_link=data.get('quality_report_link'),
|
||
detail_link=data.get('detail_link'),
|
||
remark=data.get('remark')
|
||
)
|
||
|
||
db.session.add(new_stock)
|
||
db.session.commit()
|
||
return new_stock
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print("----- SemiInboundService Error -----")
|
||
traceback.print_exc()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def update_inbound(stock_id, data):
|
||
from app.models.inbound.semi import StockSemi
|
||
|
||
try:
|
||
print(f"----- UPDATE SEMI DEBUG: ID={stock_id} -----")
|
||
|
||
stock = StockSemi.query.get(stock_id)
|
||
if not stock:
|
||
raise ValueError("记录不存在")
|
||
|
||
field_mapping = {
|
||
'sku': 'sku',
|
||
'barcode': 'barcode',
|
||
'warehouse_location': 'warehouse_location',
|
||
'serial_number': 'serial_number',
|
||
'batch_number': 'batch_number',
|
||
'status': 'status',
|
||
'quality_status': 'quality_status',
|
||
'bom_code': 'bom_code',
|
||
'bom_version': 'bom_version',
|
||
'work_order_code': 'work_order_code',
|
||
'production_manager': 'production_manager',
|
||
'quality_report_link': 'quality_report_link',
|
||
'detail_link': 'detail_link',
|
||
'remark': 'remark'
|
||
}
|
||
|
||
for frontend_key, db_attr in field_mapping.items():
|
||
if frontend_key in data:
|
||
setattr(stock, db_attr, data[frontend_key])
|
||
|
||
# 时间处理
|
||
if 'production_start_time' in data:
|
||
try:
|
||
if data['production_start_time']:
|
||
stock.production_start_time = datetime.strptime(str(data['production_start_time']),
|
||
'%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
stock.production_start_time = None
|
||
except:
|
||
pass
|
||
|
||
if 'production_end_time' in data:
|
||
try:
|
||
if data['production_end_time']:
|
||
stock.production_end_time = datetime.strptime(str(data['production_end_time']),
|
||
'%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
stock.production_end_time = None
|
||
except:
|
||
pass
|
||
|
||
# 更新 production_time_range 字符串
|
||
if 'production_time_range' in data:
|
||
raw_range = data['production_time_range']
|
||
if isinstance(raw_range, list):
|
||
stock.production_time_range = " ~ ".join([str(x) for x in raw_range])
|
||
else:
|
||
stock.production_time_range = raw_range
|
||
|
||
qty_changed = False
|
||
cost_changed = False
|
||
|
||
if 'in_quantity' in data:
|
||
new_qty = float(data['in_quantity'])
|
||
old_qty = float(stock.in_quantity)
|
||
if new_qty != old_qty:
|
||
diff = new_qty - old_qty
|
||
stock.in_quantity = new_qty
|
||
stock.stock_quantity = float(stock.stock_quantity) + diff
|
||
stock.available_quantity = float(stock.available_quantity) + diff
|
||
qty_changed = True
|
||
|
||
if 'raw_material_cost' in data:
|
||
stock.raw_material_cost = float(data['raw_material_cost'])
|
||
cost_changed = True
|
||
|
||
if 'manual_cost' in data:
|
||
stock.manual_cost = float(data['manual_cost'])
|
||
cost_changed = True
|
||
|
||
if cost_changed or qty_changed:
|
||
unit_total = float(stock.raw_material_cost) + float(stock.manual_cost)
|
||
stock.total_price = float(stock.in_quantity) * unit_total
|
||
|
||
db.session.commit()
|
||
return stock
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print(f"----- UPDATE SEMI FAILED: {str(e)} -----")
|
||
traceback.print_exc()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def delete_inbound(stock_id):
|
||
from app.models.inbound.semi import StockSemi
|
||
try:
|
||
stock = StockSemi.query.get(stock_id)
|
||
if not stock:
|
||
raise ValueError("记录不存在")
|
||
db.session.delete(stock)
|
||
db.session.commit()
|
||
return True
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def get_list(page, limit, keyword=None):
|
||
from app.models.inbound.semi import StockSemi
|
||
try:
|
||
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
|
||
|
||
if keyword:
|
||
query = query.filter(
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%'),
|
||
StockSemi.batch_number.ilike(f'%{keyword}%'),
|
||
StockSemi.serial_number.ilike(f'%{keyword}%'),
|
||
StockSemi.sku.ilike(f'%{keyword}%'),
|
||
StockSemi.work_order_code.ilike(f'%{keyword}%'),
|
||
StockSemi.bom_code.ilike(f'%{keyword}%')
|
||
)
|
||
)
|
||
|
||
pagination = query.order_by(StockSemi.id.desc()).paginate(page=page, per_page=limit, error_out=False)
|
||
|
||
current_items = pagination.items
|
||
base_ids = list(set([item.base_id for item in current_items if item.base_id]))
|
||
|
||
stock_map = {}
|
||
if base_ids:
|
||
aggregates = db.session.query(
|
||
StockSemi.base_id,
|
||
func.sum(StockSemi.stock_quantity).label('total_stock'),
|
||
func.sum(StockSemi.available_quantity).label('total_avail')
|
||
).filter(StockSemi.base_id.in_(base_ids)).group_by(StockSemi.base_id).all()
|
||
|
||
for agg in aggregates:
|
||
stock_map[agg.base_id] = {
|
||
'total_stock': float(agg.total_stock or 0),
|
||
'total_avail': float(agg.total_avail or 0)
|
||
}
|
||
|
||
items = []
|
||
for item in current_items:
|
||
stats = stock_map.get(item.base_id, {'total_stock': 0, 'total_avail': 0})
|
||
|
||
d = item.to_dict()
|
||
d['sum_stock'] = stats['total_stock']
|
||
d['sum_available'] = stats['total_avail']
|
||
|
||
items.append(d)
|
||
|
||
return {"total": pagination.total, "items": items}
|
||
except Exception as e:
|
||
print(f"List Error: {e}")
|
||
traceback.print_exc()
|
||
return {"total": 0, "items": []} |