347 lines
14 KiB
Python
347 lines
14 KiB
Python
# app/services/inbound/product_service.py
|
||
from app.extensions import db
|
||
from app.models.base import MaterialBase
|
||
from app.models.outbound import TransOutbound
|
||
from datetime import datetime, timedelta, timezone
|
||
from sqlalchemy import or_, func, text, and_
|
||
import traceback
|
||
import json
|
||
|
||
|
||
class ProductInboundService:
|
||
|
||
# ============================================================
|
||
# 0. 辅助:唯一性校验 (新增核心逻辑)
|
||
# ============================================================
|
||
@staticmethod
|
||
def _check_unique(serial_number, exclude_id=None):
|
||
"""
|
||
校验成品的唯一性
|
||
:param serial_number: 序列号
|
||
:param exclude_id: 排除的ID (编辑模式用)
|
||
"""
|
||
from app.models.inbound.product import StockProduct
|
||
|
||
# 成品强校验序列号 (SN) - SN应该是全局唯一的
|
||
if serial_number:
|
||
query = StockProduct.query.filter(StockProduct.serial_number == serial_number)
|
||
if exclude_id:
|
||
query = query.filter(StockProduct.id != exclude_id)
|
||
|
||
exists = query.first()
|
||
if exists:
|
||
occupied_name = exists.material.name if (hasattr(exists, 'material') and exists.material) else "未知物料"
|
||
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
|
||
|
||
# ============================================================
|
||
# 1. 基础物料搜索
|
||
# ============================================================
|
||
@staticmethod
|
||
def search_base_material(keyword):
|
||
try:
|
||
# 1. 基础查询:必须是已启用的物料
|
||
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
|
||
|
||
# 2. 动态条件:如果传入了关键词,则增加模糊匹配条件
|
||
if keyword:
|
||
query = query.filter(
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
)
|
||
|
||
# 3. 排序与限制:按ID倒序,取最新20条
|
||
query = query.order_by(MaterialBase.id.desc()).limit(20)
|
||
|
||
# 4. 结果封装
|
||
results = []
|
||
for item in query.all():
|
||
results.append({
|
||
'id': item.id,
|
||
'name': item.name,
|
||
'spec': item.spec_model,
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type,
|
||
'status': '启用'
|
||
})
|
||
return results
|
||
except Exception:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
# ============================================================
|
||
# 2. 新增入库逻辑 (强制北京时间 + 唯一性校验)
|
||
# ============================================================
|
||
@staticmethod
|
||
def handle_inbound(data):
|
||
from app.models.inbound.product import StockProduct
|
||
|
||
try:
|
||
base_id = data.get('base_id')
|
||
if not base_id: raise ValueError("必须选择基础物料")
|
||
material = MaterialBase.query.get(base_id)
|
||
if not material: raise ValueError("物料不存在")
|
||
|
||
# --- [核心修改] 执行唯一性校验 ---
|
||
ProductInboundService._check_unique(
|
||
serial_number=data.get('serial_number')
|
||
)
|
||
|
||
# [核心修改] 强制北京时间
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
|
||
|
||
in_date_val = current_time
|
||
|
||
if data.get('in_date'):
|
||
try:
|
||
date_str = str(data['in_date'])
|
||
if len(date_str) > 10:
|
||
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
|
||
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
|
||
current_time.hour, current_time.minute, current_time.second)
|
||
except:
|
||
in_date_val = current_time
|
||
|
||
in_qty = float(data.get('in_quantity') or 0)
|
||
|
||
p_start = data.get('production_start_time', '')
|
||
p_end = data.get('production_end_time', '')
|
||
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
|
||
|
||
# 全局流水号
|
||
try:
|
||
seq_sql = text("SELECT nextval('global_print_seq')")
|
||
result = db.session.execute(seq_sql)
|
||
next_global_id = result.scalar()
|
||
except:
|
||
next_global_id = None
|
||
|
||
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
|
||
final_barcode = data.get('barcode') or generated_sku
|
||
|
||
photo_list = data.get('product_photo', [])
|
||
quality_list = data.get('quality_report_link', [])
|
||
inspection_list = data.get('inspection_report_link', [])
|
||
|
||
if not isinstance(photo_list, list): photo_list = []
|
||
if not isinstance(quality_list, list): quality_list = []
|
||
if not isinstance(inspection_list, list): inspection_list = []
|
||
|
||
new_stock = StockProduct(
|
||
base_id=material.id,
|
||
global_print_id=next_global_id,
|
||
sku=generated_sku,
|
||
production_date=in_date_val, # 存入 DateTime
|
||
barcode=final_barcode,
|
||
serial_number=data.get('serial_number'),
|
||
|
||
status=data.get('status', '在库'),
|
||
warehouse_location=data.get('warehouse_location'),
|
||
|
||
in_quantity=in_qty,
|
||
stock_quantity=in_qty,
|
||
available_quantity=in_qty,
|
||
|
||
bom_code=data.get('bom_code'),
|
||
bom_version=data.get('bom_version'),
|
||
work_order_code=data.get('work_order_code'),
|
||
production_manager=data.get('production_manager'),
|
||
production_time_range=time_range,
|
||
|
||
raw_material_cost=float(data.get('raw_material_cost') or 0),
|
||
manual_cost=float(data.get('manual_cost') or 0),
|
||
|
||
quality_status=data.get('quality_status', '合格'),
|
||
|
||
product_photo=json.dumps(photo_list),
|
||
quality_report_link=json.dumps(quality_list),
|
||
inspection_report_link=json.dumps(inspection_list),
|
||
|
||
detail_link=data.get('detail_link'),
|
||
remark=data.get('remark'),
|
||
|
||
sale_price=float(data.get('sale_price') or 0),
|
||
order_id=data.get('order_id')
|
||
)
|
||
|
||
db.session.add(new_stock)
|
||
db.session.commit()
|
||
return new_stock
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
# ============================================================
|
||
# 3. 更新逻辑
|
||
# ============================================================
|
||
@staticmethod
|
||
def update_inbound(stock_id, data):
|
||
from app.models.inbound.product import StockProduct
|
||
|
||
try:
|
||
stock = StockProduct.query.get(stock_id)
|
||
if not stock: raise ValueError("记录不存在")
|
||
|
||
# --- [核心修改] 编辑时也要校验唯一性 ---
|
||
if 'serial_number' in data:
|
||
ProductInboundService._check_unique(
|
||
serial_number=data['serial_number'],
|
||
exclude_id=stock_id
|
||
)
|
||
|
||
fields = [
|
||
'barcode', 'serial_number', 'warehouse_location',
|
||
'status', 'quality_status', 'bom_code', 'bom_version',
|
||
'work_order_code', 'production_manager',
|
||
'detail_link', 'order_id', 'remark'
|
||
]
|
||
for f in fields:
|
||
if f in data: setattr(stock, f, data[f])
|
||
|
||
if 'product_photo' in data:
|
||
imgs = data['product_photo']
|
||
if isinstance(imgs, list): stock.product_photo = json.dumps(imgs)
|
||
|
||
if 'quality_report_link' in data:
|
||
imgs = data['quality_report_link']
|
||
if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs)
|
||
|
||
if 'inspection_report_link' in data:
|
||
imgs = data['inspection_report_link']
|
||
if isinstance(imgs, list): stock.inspection_report_link = json.dumps(imgs)
|
||
|
||
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
|
||
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
|
||
if 'manual_cost' in data: stock.manual_cost = float(data['manual_cost'])
|
||
|
||
if 'in_quantity' in data:
|
||
new_qty = float(data['in_quantity'])
|
||
diff = new_qty - float(stock.in_quantity)
|
||
stock.in_quantity = new_qty
|
||
stock.stock_quantity = float(stock.stock_quantity) + diff
|
||
stock.available_quantity = float(stock.available_quantity) + diff
|
||
|
||
if 'production_start_time' in data or 'production_end_time' in data:
|
||
old_range = stock.production_time_range or " ~ "
|
||
parts = old_range.split(' ~ ')
|
||
old_start = parts[0] if len(parts) > 0 else ''
|
||
old_end = parts[1] if len(parts) > 1 else ''
|
||
start = data.get('production_start_time', old_start)
|
||
end = data.get('production_end_time', old_end)
|
||
stock.production_time_range = f"{start} ~ {end}"
|
||
|
||
db.session.commit()
|
||
return stock
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
# ============================================================
|
||
# 4. 删除逻辑
|
||
# ============================================================
|
||
@staticmethod
|
||
def delete_inbound(stock_id):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
stock = StockProduct.query.get(stock_id)
|
||
if stock:
|
||
db.session.delete(stock)
|
||
db.session.commit()
|
||
return True
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
# ============================================================
|
||
# 5. 出库历史
|
||
# ============================================================
|
||
@staticmethod
|
||
def get_outbound_history(stock_id):
|
||
"""获取出库历史"""
|
||
try:
|
||
records = TransOutbound.query.filter_by(
|
||
source_table='stock_product', stock_id=stock_id
|
||
).order_by(TransOutbound.outbound_time.desc()).all()
|
||
return [r.to_dict() for r in records]
|
||
except:
|
||
return []
|
||
|
||
# ============================================================
|
||
# 6. 获取列表
|
||
# ============================================================
|
||
@staticmethod
|
||
def get_list(page, limit, keyword=None, statuses=None):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
|
||
|
||
if keyword:
|
||
query = query.filter(or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%'),
|
||
StockProduct.serial_number.ilike(f'%{keyword}%'),
|
||
StockProduct.work_order_code.ilike(f'%{keyword}%'),
|
||
StockProduct.order_id.ilike(f'%{keyword}%'),
|
||
StockProduct.sku.ilike(f'%{keyword}%')
|
||
))
|
||
|
||
if not statuses:
|
||
statuses = ['在库', '借库']
|
||
|
||
if '已出库' in statuses:
|
||
query = query.filter(StockProduct.status.in_(statuses))
|
||
else:
|
||
query = query.filter(
|
||
and_(
|
||
StockProduct.status.in_(statuses),
|
||
StockProduct.stock_quantity > 0
|
||
)
|
||
)
|
||
|
||
# 按照 production_date (入库日期) 倒序排序
|
||
pagination = query.order_by(StockProduct.production_date.desc()).paginate(page=page, per_page=limit,
|
||
error_out=False)
|
||
|
||
current_items = pagination.items
|
||
|
||
def parse_img(json_str):
|
||
if not json_str: return []
|
||
try:
|
||
return json.loads(json_str) if json_str.startswith('[') else [json_str]
|
||
except:
|
||
return []
|
||
|
||
items = []
|
||
for item in current_items:
|
||
d = item.to_dict()
|
||
|
||
# 格式化日期
|
||
date_display = ''
|
||
if item.production_date:
|
||
try:
|
||
date_display = item.production_date.strftime('%Y-%m-%d')
|
||
except:
|
||
date_display = str(item.production_date)[:10]
|
||
d['inbound_date'] = date_display
|
||
|
||
d['qty_stock'] = float(item.stock_quantity or 0)
|
||
d['qty_available'] = float(item.available_quantity or 0)
|
||
d['sum_stock'] = d['qty_stock']
|
||
d['sum_available'] = d['qty_available']
|
||
|
||
d['product_photo'] = parse_img(item.product_photo)
|
||
d['quality_report_link'] = parse_img(item.quality_report_link)
|
||
d['inspection_report_link'] = parse_img(item.inspection_report_link)
|
||
d['global_print_id'] = item.global_print_id
|
||
|
||
items.append(d)
|
||
|
||
return {"total": pagination.total, "items": items}
|
||
except:
|
||
traceback.print_exc()
|
||
return {"total": 0, "items": []} |