449 lines
17 KiB
Python
449 lines
17 KiB
Python
# app/services/inbound/semi_service.py
|
||
from app.extensions import db
|
||
from app.models.base import MaterialBase
|
||
from app.models.outbound import TransOutbound
|
||
from datetime import datetime, timedelta, timezone
|
||
from sqlalchemy import or_, func, text, and_
|
||
import traceback
|
||
import json
|
||
|
||
|
||
class SemiInboundService:
|
||
|
||
# ============================================================
|
||
# 0. 辅助:唯一性校验 (新增核心逻辑)
|
||
# ============================================================
|
||
@staticmethod
|
||
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
|
||
"""
|
||
校验半成品的唯一性
|
||
:param base_id: 基础物料ID
|
||
:param serial_number: 序列号
|
||
:param batch_number: 批号
|
||
:param exclude_id: 排除的ID
|
||
"""
|
||
from app.models.inbound.semi import StockSemi
|
||
|
||
# 1. 序列号 (SN) 校验 - 全局唯一
|
||
if serial_number:
|
||
query = StockSemi.query.filter(StockSemi.serial_number == serial_number)
|
||
if exclude_id:
|
||
query = query.filter(StockSemi.id != exclude_id)
|
||
|
||
exists = query.first()
|
||
if exists:
|
||
occupied_name = exists.material.name if (hasattr(exists, 'material') and exists.material) else "未知物料"
|
||
raise ValueError(f"序列号【{serial_number}】已存在!被半成品 [{occupied_name}] 占用,请核查。")
|
||
|
||
# 2. 批号 (BN) 校验 - 同物料下不能重复开单
|
||
if batch_number and base_id:
|
||
query = StockSemi.query.filter(
|
||
StockSemi.base_id == base_id,
|
||
StockSemi.batch_number == batch_number
|
||
)
|
||
if exclude_id:
|
||
query = query.filter(StockSemi.id != exclude_id)
|
||
|
||
if query.first():
|
||
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复建单,建议在原批次上追加库存。")
|
||
|
||
# ============================================================
|
||
# 1. 基础物料搜索
|
||
# ============================================================
|
||
@staticmethod
|
||
def search_base_material(keyword):
|
||
try:
|
||
# 基础查询:必须是已启用的物料
|
||
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
|
||
|
||
# 如果有关键词,进行模糊匹配
|
||
if keyword:
|
||
query = query.filter(
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
)
|
||
|
||
# 统一逻辑:按ID倒序,限制20条
|
||
query = query.order_by(MaterialBase.id.desc()).limit(20)
|
||
|
||
results = []
|
||
for item in query.all():
|
||
results.append({
|
||
'id': item.id,
|
||
'name': item.name,
|
||
'spec': item.spec_model, # 对应前端 item.spec
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type, # 对应前端 item.type
|
||
'status': '启用'
|
||
})
|
||
return results
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
# ============================================================
|
||
# 2. 新增入库逻辑
|
||
# ============================================================
|
||
@staticmethod
|
||
def handle_inbound(data):
|
||
from app.models.inbound.semi import StockSemi
|
||
|
||
try:
|
||
base_id = data.get('base_id')
|
||
if not base_id:
|
||
raise ValueError("必须选择基础物料 (缺少 base_id)")
|
||
|
||
material = MaterialBase.query.get(base_id)
|
||
if not material:
|
||
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
|
||
|
||
# --- [核心修改] 执行唯一性校验 ---
|
||
SemiInboundService._check_unique(
|
||
base_id=base_id,
|
||
serial_number=data.get('serial_number'),
|
||
batch_number=data.get('batch_number')
|
||
)
|
||
|
||
# [核心修改] 强制北京时间
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
|
||
|
||
in_date_val = current_time
|
||
|
||
if data.get('in_date'):
|
||
try:
|
||
date_str = str(data['in_date'])
|
||
if len(date_str) > 10:
|
||
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
|
||
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
|
||
current_time.hour, current_time.minute, current_time.second)
|
||
except ValueError:
|
||
in_date_val = current_time
|
||
|
||
# 2. 处理生产时间
|
||
p_start = None
|
||
p_end = None
|
||
if data.get('production_start_time'):
|
||
try:
|
||
p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S')
|
||
except:
|
||
pass
|
||
if data.get('production_end_time'):
|
||
try:
|
||
p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S')
|
||
except:
|
||
pass
|
||
|
||
time_range_str = None
|
||
raw_range = data.get('production_time_range')
|
||
if isinstance(raw_range, list):
|
||
time_range_str = " ~ ".join([str(x) for x in raw_range])
|
||
elif isinstance(raw_range, str):
|
||
time_range_str = raw_range
|
||
|
||
# 3. 处理数值和成本
|
||
in_qty = float(data.get('in_quantity') or 0)
|
||
raw_cost = float(data.get('raw_material_cost') or 0)
|
||
manual_cost = float(data.get('manual_cost') or 0)
|
||
unit_total_cost = raw_cost + manual_cost
|
||
total_value = unit_total_cost * in_qty
|
||
|
||
# 4. 获取全局打印流水号
|
||
next_global_id = 0
|
||
try:
|
||
seq_sql = text("SELECT nextval('global_print_seq')")
|
||
result = db.session.execute(seq_sql)
|
||
next_global_id = result.scalar()
|
||
except Exception as e:
|
||
print("❌ 数据库序列 global_print_seq 不存在,请执行SQL创建!")
|
||
raise e
|
||
|
||
generated_sku = str(next_global_id).zfill(10)
|
||
final_sku = data.get('sku')
|
||
if not final_sku:
|
||
final_sku = generated_sku
|
||
|
||
final_barcode = data.get('barcode')
|
||
if not final_barcode:
|
||
final_barcode = final_sku
|
||
|
||
arrival_list = data.get('arrival_photo', [])
|
||
quality_report_list = data.get('quality_report_link', [])
|
||
|
||
if not isinstance(arrival_list, list): arrival_list = []
|
||
if not isinstance(quality_report_list, list): quality_report_list = []
|
||
|
||
# 8. 创建记录
|
||
new_stock = StockSemi(
|
||
base_id=material.id,
|
||
global_print_id=next_global_id,
|
||
sku=final_sku,
|
||
production_date=in_date_val, # 存入 DateTime
|
||
|
||
serial_number=data.get('serial_number'),
|
||
batch_number=data.get('batch_number'),
|
||
barcode=final_barcode,
|
||
|
||
status='在库',
|
||
quality_status=data.get('quality_status', '合格'),
|
||
in_quantity=in_qty,
|
||
stock_quantity=in_qty,
|
||
available_quantity=in_qty,
|
||
warehouse_location=data.get('warehouse_location'),
|
||
|
||
bom_code=data.get('bom_code'),
|
||
bom_version=data.get('bom_version'),
|
||
work_order_code=data.get('work_order_code'),
|
||
production_manager=data.get('production_manager'),
|
||
|
||
production_start_time=p_start,
|
||
production_end_time=p_end,
|
||
production_time_range=time_range_str,
|
||
|
||
raw_material_cost=raw_cost,
|
||
manual_cost=manual_cost,
|
||
total_price=total_value,
|
||
|
||
arrival_photo=json.dumps(arrival_list),
|
||
quality_report_link=json.dumps(quality_report_list),
|
||
|
||
detail_link=data.get('detail_link'),
|
||
remark=data.get('remark')
|
||
)
|
||
|
||
db.session.add(new_stock)
|
||
db.session.commit()
|
||
return new_stock
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print("----- SemiInboundService Error -----")
|
||
traceback.print_exc()
|
||
raise e
|
||
|
||
# ============================================================
|
||
# 3. 更新逻辑
|
||
# ============================================================
|
||
@staticmethod
|
||
def update_inbound(stock_id, data):
|
||
from app.models.inbound.semi import StockSemi
|
||
|
||
try:
|
||
stock = StockSemi.query.get(stock_id)
|
||
if not stock:
|
||
raise ValueError("记录不存在")
|
||
|
||
# --- [核心修改] 编辑时也要校验唯一性 ---
|
||
new_base_id = data.get('base_id', stock.base_id)
|
||
new_sn = data.get('serial_number', stock.serial_number)
|
||
new_bn = data.get('batch_number', stock.batch_number)
|
||
|
||
SemiInboundService._check_unique(
|
||
base_id=new_base_id,
|
||
serial_number=new_sn,
|
||
batch_number=new_bn,
|
||
exclude_id=stock_id
|
||
)
|
||
|
||
field_mapping = {
|
||
'sku': 'sku',
|
||
'barcode': 'barcode',
|
||
'warehouse_location': 'warehouse_location',
|
||
'serial_number': 'serial_number',
|
||
'batch_number': 'batch_number',
|
||
'status': 'status',
|
||
'quality_status': 'quality_status',
|
||
'bom_code': 'bom_code',
|
||
'bom_version': 'bom_version',
|
||
'work_order_code': 'work_order_code',
|
||
'production_manager': 'production_manager',
|
||
'detail_link': 'detail_link',
|
||
'remark': 'remark'
|
||
}
|
||
|
||
for frontend_key, db_attr in field_mapping.items():
|
||
if frontend_key in data:
|
||
setattr(stock, db_attr, data[frontend_key])
|
||
|
||
if 'arrival_photo' in data:
|
||
imgs = data['arrival_photo']
|
||
if isinstance(imgs, list):
|
||
stock.arrival_photo = json.dumps(imgs)
|
||
|
||
if 'quality_report_link' in data:
|
||
imgs = data['quality_report_link']
|
||
if isinstance(imgs, list):
|
||
stock.quality_report_link = json.dumps(imgs)
|
||
|
||
if 'production_start_time' in data:
|
||
try:
|
||
if data['production_start_time']:
|
||
stock.production_start_time = datetime.strptime(str(data['production_start_time']),
|
||
'%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
stock.production_start_time = None
|
||
except:
|
||
pass
|
||
|
||
if 'production_end_time' in data:
|
||
try:
|
||
if data['production_end_time']:
|
||
stock.production_end_time = datetime.strptime(str(data['production_end_time']),
|
||
'%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
stock.production_end_time = None
|
||
except:
|
||
pass
|
||
|
||
if 'production_time_range' in data:
|
||
raw_range = data['production_time_range']
|
||
if isinstance(raw_range, list):
|
||
stock.production_time_range = " ~ ".join([str(x) for x in raw_range])
|
||
else:
|
||
stock.production_time_range = raw_range
|
||
|
||
qty_changed = False
|
||
cost_changed = False
|
||
|
||
if 'in_quantity' in data:
|
||
new_qty = float(data['in_quantity'])
|
||
diff = new_qty - float(stock.in_quantity)
|
||
if diff != 0:
|
||
stock.in_quantity = new_qty
|
||
stock.stock_quantity = float(stock.stock_quantity) + diff
|
||
stock.available_quantity = float(stock.available_quantity) + diff
|
||
qty_changed = True
|
||
|
||
if 'raw_material_cost' in data:
|
||
stock.raw_material_cost = float(data['raw_material_cost'])
|
||
cost_changed = True
|
||
|
||
if 'manual_cost' in data:
|
||
stock.manual_cost = float(data['manual_cost'])
|
||
cost_changed = True
|
||
|
||
if cost_changed or qty_changed:
|
||
unit_total = float(stock.raw_material_cost) + float(stock.manual_cost)
|
||
stock.total_price = float(stock.in_quantity) * unit_total
|
||
|
||
db.session.commit()
|
||
return stock
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
# ============================================================
|
||
# 4. 删除逻辑
|
||
# ============================================================
|
||
@staticmethod
|
||
def delete_inbound(stock_id):
|
||
from app.models.inbound.semi import StockSemi
|
||
try:
|
||
stock = StockSemi.query.get(stock_id)
|
||
if not stock:
|
||
raise ValueError("记录不存在")
|
||
db.session.delete(stock)
|
||
db.session.commit()
|
||
return True
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
# ============================================================
|
||
# 5. 出库历史
|
||
# ============================================================
|
||
@staticmethod
|
||
def get_outbound_history(stock_id):
|
||
"""获取出库历史"""
|
||
try:
|
||
records = TransOutbound.query.filter_by(
|
||
source_table='stock_semi', stock_id=stock_id
|
||
).order_by(TransOutbound.outbound_time.desc()).all()
|
||
return [r.to_dict() for r in records]
|
||
except:
|
||
return []
|
||
|
||
# ============================================================
|
||
# 6. 获取列表
|
||
# ============================================================
|
||
@staticmethod
|
||
def get_list(page, limit, keyword=None, statuses=None):
|
||
from app.models.inbound.semi import StockSemi
|
||
try:
|
||
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
|
||
|
||
if keyword:
|
||
kw = f'%{keyword}%'
|
||
query = query.filter(
|
||
or_(
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw),
|
||
StockSemi.batch_number.ilike(kw),
|
||
StockSemi.serial_number.ilike(kw),
|
||
StockSemi.sku.ilike(kw),
|
||
StockSemi.work_order_code.ilike(kw),
|
||
StockSemi.bom_code.ilike(kw)
|
||
)
|
||
)
|
||
|
||
if not statuses:
|
||
statuses = ['在库', '借库']
|
||
|
||
if '已出库' in statuses:
|
||
query = query.filter(StockSemi.status.in_(statuses))
|
||
else:
|
||
query = query.filter(
|
||
and_(
|
||
StockSemi.status.in_(statuses),
|
||
StockSemi.stock_quantity > 0
|
||
)
|
||
)
|
||
|
||
# 按照 production_date (入库日期) 倒序排序
|
||
pagination = query.order_by(StockSemi.production_date.desc()).paginate(page=page, per_page=limit,
|
||
error_out=False)
|
||
|
||
current_items = pagination.items
|
||
|
||
def parse_img(json_str):
|
||
if not json_str: return []
|
||
try:
|
||
return json.loads(json_str) if json_str.startswith('[') else [json_str]
|
||
except:
|
||
return []
|
||
|
||
items = []
|
||
for item in current_items:
|
||
d = item.to_dict()
|
||
|
||
# 格式化展示日期
|
||
date_display = ''
|
||
if item.production_date:
|
||
try:
|
||
date_display = item.production_date.strftime('%Y-%m-%d')
|
||
except:
|
||
date_display = str(item.production_date)[:10]
|
||
d['inbound_date'] = date_display
|
||
|
||
d['qty_stock'] = float(item.stock_quantity or 0)
|
||
d['qty_available'] = float(item.available_quantity or 0)
|
||
d['sum_stock'] = d['qty_stock']
|
||
d['sum_available'] = d['qty_available']
|
||
|
||
d['arrival_photo'] = parse_img(item.arrival_photo)
|
||
d['quality_report_link'] = parse_img(item.quality_report_link)
|
||
d['global_print_id'] = item.global_print_id
|
||
|
||
items.append(d)
|
||
|
||
return {"total": pagination.total, "items": items}
|
||
except Exception as e:
|
||
print(f"List Error: {e}")
|
||
traceback.print_exc()
|
||
return {"total": 0, "items": []} |