Files
KCGL/inventory-backend/app/services/inbound/semi_service.py

340 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/semi_service.py
from app.extensions import db
from app.models.base import MaterialBase
from datetime import datetime
from sqlalchemy import or_, func, text
import traceback
import json
class SemiInboundService:
@staticmethod
def search_base_material(keyword):
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
# 局部导入 Model解决循环引用
from app.models.inbound.semi import StockSemi
try:
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料 (缺少 base_id)")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
# 1. 处理入库日期
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
date_str = date_str[:10]
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass
# 2. 处理生产时间
p_start = None
p_end = None
if data.get('production_start_time'):
try:
p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
if data.get('production_end_time'):
try:
p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
time_range_str = None
raw_range = data.get('production_time_range')
if isinstance(raw_range, list):
time_range_str = " ~ ".join([str(x) for x in raw_range])
elif isinstance(raw_range, str):
time_range_str = raw_range
# 3. 处理数值和成本
in_qty = float(data.get('in_quantity') or 0)
raw_cost = float(data.get('raw_material_cost') or 0)
manual_cost = float(data.get('manual_cost') or 0)
unit_total_cost = raw_cost + manual_cost
total_value = unit_total_cost * in_qty
# 4. 获取全局打印流水号
next_global_id = 0
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except Exception as e:
print("❌ 数据库序列 global_print_seq 不存在请执行SQL创建")
raise e
# 5. 自动生成 SKU
generated_sku = str(next_global_id).zfill(10)
final_sku = data.get('sku')
if not final_sku:
final_sku = generated_sku
# 6. 条码逻辑处理
final_barcode = data.get('barcode')
if not final_barcode:
final_barcode = final_sku
# 7. 图片列表转 JSON 字符串处理
arrival_list = data.get('arrival_photo', [])
quality_report_list = data.get('quality_report_link', [])
if not isinstance(arrival_list, list): arrival_list = []
if not isinstance(quality_report_list, list): quality_report_list = []
# 8. 创建记录
new_stock = StockSemi(
base_id=material.id,
global_print_id=next_global_id,
sku=final_sku,
production_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
barcode=final_barcode,
status='在库',
quality_status=data.get('quality_status', '合格'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
warehouse_location=data.get('warehouse_location'),
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_start_time=p_start,
production_end_time=p_end,
production_time_range=time_range_str,
raw_material_cost=raw_cost,
manual_cost=manual_cost,
total_price=total_value,
# [核心修改] 将列表转为 JSON 字符串存储
arrival_photo=json.dumps(arrival_list),
quality_report_link=json.dumps(quality_report_list),
detail_link=data.get('detail_link'),
remark=data.get('remark')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
print("----- SemiInboundService Error -----")
traceback.print_exc()
raise e
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.semi import StockSemi
try:
print(f"----- UPDATE SEMI DEBUG: ID={stock_id} -----")
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number',
'batch_number': 'batch_number',
'status': 'status',
'quality_status': 'quality_status',
'bom_code': 'bom_code',
'bom_version': 'bom_version',
'work_order_code': 'work_order_code',
'production_manager': 'production_manager',
'detail_link': 'detail_link',
'remark': 'remark'
}
for frontend_key, db_attr in field_mapping.items():
if frontend_key in data:
setattr(stock, db_attr, data[frontend_key])
# [核心修改] 图片字段更新 (List -> JSON String)
if 'arrival_photo' in data:
imgs = data['arrival_photo']
if isinstance(imgs, list):
stock.arrival_photo = json.dumps(imgs)
if 'quality_report_link' in data:
imgs = data['quality_report_link']
if isinstance(imgs, list):
stock.quality_report_link = json.dumps(imgs)
# 时间处理
if 'production_start_time' in data:
try:
if data['production_start_time']:
stock.production_start_time = datetime.strptime(str(data['production_start_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_start_time = None
except:
pass
if 'production_end_time' in data:
try:
if data['production_end_time']:
stock.production_end_time = datetime.strptime(str(data['production_end_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_end_time = None
except:
pass
# 更新 production_time_range 字符串
if 'production_time_range' in data:
raw_range = data['production_time_range']
if isinstance(raw_range, list):
stock.production_time_range = " ~ ".join([str(x) for x in raw_range])
else:
stock.production_time_range = raw_range
qty_changed = False
cost_changed = False
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
if new_qty != old_qty:
diff = new_qty - old_qty
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
if 'raw_material_cost' in data:
stock.raw_material_cost = float(data['raw_material_cost'])
cost_changed = True
if 'manual_cost' in data:
stock.manual_cost = float(data['manual_cost'])
cost_changed = True
if cost_changed or qty_changed:
unit_total = float(stock.raw_material_cost) + float(stock.manual_cost)
stock.total_price = float(stock.in_quantity) * unit_total
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
print(f"----- UPDATE SEMI FAILED: {str(e)} -----")
traceback.print_exc()
raise e
@staticmethod
def delete_inbound(stock_id):
from app.models.inbound.semi import StockSemi
try:
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page, limit, keyword=None):
from app.models.inbound.semi import StockSemi
try:
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockSemi.batch_number.ilike(f'%{keyword}%'),
StockSemi.serial_number.ilike(f'%{keyword}%'),
StockSemi.sku.ilike(f'%{keyword}%'),
StockSemi.work_order_code.ilike(f'%{keyword}%'),
StockSemi.bom_code.ilike(f'%{keyword}%')
)
)
pagination = query.order_by(StockSemi.id.desc()).paginate(page=page, per_page=limit, error_out=False)
current_items = pagination.items
base_ids = list(set([item.base_id for item in current_items if item.base_id]))
stock_map = {}
if base_ids:
aggregates = db.session.query(
StockSemi.base_id,
func.sum(StockSemi.stock_quantity).label('total_stock'),
func.sum(StockSemi.available_quantity).label('total_avail')
).filter(StockSemi.base_id.in_(base_ids)).group_by(StockSemi.base_id).all()
for agg in aggregates:
stock_map[agg.base_id] = {
'total_stock': float(agg.total_stock or 0),
'total_avail': float(agg.total_avail or 0)
}
items = []
for item in current_items:
stats = stock_map.get(item.base_id, {'total_stock': 0, 'total_avail': 0})
d = item.to_dict()
d['sum_stock'] = stats['total_stock']
d['sum_available'] = stats['total_avail']
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"List Error: {e}")
traceback.print_exc()
return {"total": 0, "items": []}