将半成品和成品跟bom表进行相关联

This commit is contained in:
dxc
2026-02-12 17:16:24 +08:00
parent b682d4b02f
commit 853374de5d
8 changed files with 292 additions and 192 deletions

View File

@ -11,26 +11,17 @@ import json
class ProductInboundService:
# ============================================================
# 0. 辅助:唯一性校验 (新增核心逻辑)
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(serial_number, exclude_id=None):
"""
校验成品的唯一性
:param serial_number: 序列号
:param exclude_id: 排除的ID (编辑模式用)
"""
from app.models.inbound.product import StockProduct
# 成品强校验序列号 (SN) - SN应该是全局唯一的
if serial_number:
query = StockProduct.query.filter(StockProduct.serial_number == serial_number)
if exclude_id:
query = query.filter(StockProduct.id != exclude_id)
exists = query.first()
if exists:
# [修改] material -> base
occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
@ -40,10 +31,7 @@ class ProductInboundService:
@staticmethod
def search_base_material(keyword):
try:
# [核心修改] 只查询已启用的物料
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
# 2. 动态条件:如果传入了关键词,则增加模糊匹配条件
if keyword:
query = query.filter(
or_(
@ -51,11 +39,7 @@ class ProductInboundService:
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
# 3. 排序与限制按ID倒序取最新20条
query = query.order_by(MaterialBase.id.desc()).limit(20)
# 4. 结果封装
results = []
for item in query.all():
results.append({
@ -73,33 +57,69 @@ class ProductInboundService:
return []
# ============================================================
# 2. 新增入库逻辑 (强制北京时间 + 唯一性校验)
# 1.5 [新增] BOM 搜索逻辑
# ============================================================
@staticmethod
def search_bom_options(keyword):
from app.models.bom import BomTable
try:
# 关联查询BOM表 + 父件基础信息表
query = db.session.query(
BomTable.bom_no,
BomTable.version,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec')
).join(MaterialBase, BomTable.parent_id == MaterialBase.id)
# 只查询启用的BOM
if hasattr(BomTable, 'is_enabled'):
query = query.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
# 支持搜索BOM编号、父件名称、父件规格
query = query.filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
# 去重并限制数量
results = query.distinct().limit(20).all()
return [{
'bom_no': r.bom_no,
'version': r.version,
'parent_name': r.parent_name,
'parent_spec': r.parent_spec or ''
} for r in results]
except Exception:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
from app.models.inbound.product import StockProduct
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
# [核心修改] 后端二次校验:如果物料已停用,禁止入库
if not material.is_enabled:
raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。")
# --- [核心修改] 执行唯一性校验 ---
ProductInboundService._check_unique(
serial_number=data.get('serial_number')
)
# [核心修改] 强制北京时间
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
@ -113,12 +133,10 @@ class ProductInboundService:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
p_start = data.get('production_start_time', '')
p_end = data.get('production_end_time', '')
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
# 全局流水号
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
@ -132,7 +150,6 @@ class ProductInboundService:
photo_list = data.get('product_photo', [])
quality_list = data.get('quality_report_link', [])
inspection_list = data.get('inspection_report_link', [])
if not isinstance(photo_list, list): photo_list = []
if not isinstance(quality_list, list): quality_list = []
if not isinstance(inspection_list, list): inspection_list = []
@ -141,39 +158,30 @@ class ProductInboundService:
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
production_date=in_date_val, # 存入 DateTime
production_date=in_date_val,
barcode=final_barcode,
serial_number=data.get('serial_number'),
status=data.get('status', '在库'),
warehouse_location=data.get('warehouse_location'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_time_range=time_range,
raw_material_cost=float(data.get('raw_material_cost') or 0),
manual_cost=float(data.get('manual_cost') or 0),
quality_status=data.get('quality_status', '合格'),
product_photo=json.dumps(photo_list),
quality_report_link=json.dumps(quality_list),
inspection_report_link=json.dumps(inspection_list),
detail_link=data.get('detail_link'),
remark=data.get('remark'),
sale_price=float(data.get('sale_price') or 0),
order_id=data.get('order_id')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
@ -187,12 +195,10 @@ class ProductInboundService:
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# --- [核心修改] 编辑时也要校验唯一性 ---
if 'serial_number' in data:
ProductInboundService._check_unique(
serial_number=data['serial_number'],
@ -211,11 +217,9 @@ class ProductInboundService:
if 'product_photo' in data:
imgs = data['product_photo']
if isinstance(imgs, list): stock.product_photo = json.dumps(imgs)
if 'quality_report_link' in data:
imgs = data['quality_report_link']
if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs)
if 'inspection_report_link' in data:
imgs = data['inspection_report_link']
if isinstance(imgs, list): stock.inspection_report_link = json.dumps(imgs)
@ -267,7 +271,6 @@ class ProductInboundService:
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
"""获取出库历史"""
try:
records = TransOutbound.query.filter_by(
source_table='stock_product', stock_id=stock_id
@ -284,7 +287,6 @@ class ProductInboundService:
from app.models.inbound.product import StockProduct
try:
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
if keyword:
query = query.filter(or_(
MaterialBase.name.ilike(f'%{keyword}%'),
@ -294,17 +296,12 @@ class ProductInboundService:
StockProduct.order_id.ilike(f'%{keyword}%'),
StockProduct.sku.ilike(f'%{keyword}%')
))
# 类别筛选
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip())
# 类型筛选
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockProduct.status.in_(statuses))
else:
@ -314,11 +311,8 @@ class ProductInboundService:
StockProduct.stock_quantity > 0
)
)
# 按照 production_date (入库日期) 倒序排序
pagination = query.order_by(StockProduct.production_date.desc()).paginate(page=page, per_page=limit,
error_out=False)
current_items = pagination.items
def parse_img(json_str):
@ -330,10 +324,7 @@ class ProductInboundService:
items = []
for item in current_items:
# [注意] 因为Model层已经修改了 to_dict 内部的 material -> base所以这里直接调 to_dict 即可
d = item.to_dict()
# 格式化日期
date_display = ''
if item.production_date:
try:
@ -341,30 +332,22 @@ class ProductInboundService:
except:
date_display = str(item.production_date)[:10]
d['inbound_date'] = date_display
d['qty_stock'] = float(item.stock_quantity or 0)
d['qty_available'] = float(item.available_quantity or 0)
d['sum_stock'] = d['qty_stock']
d['sum_available'] = d['qty_available']
d['product_photo'] = parse_img(item.product_photo)
d['quality_report_link'] = parse_img(item.quality_report_link)
d['inspection_report_link'] = parse_img(item.inspection_report_link)
d['global_print_id'] = item.global_print_id
items.append(d)
return {"total": pagination.total, "items": items}
except:
traceback.print_exc()
return {"total": 0, "items": []}
# ============================================================
# 7. 系统用户搜索
# ============================================================
@staticmethod
def search_system_users(keyword):
"""搜索系统用户(活跃状态)"""
from app.models.system import SysUser
try:
query = SysUser.query.filter(SysUser.status == 'active')
@ -385,9 +368,6 @@ class ProductInboundService:
except Exception:
return []
# ============================================================
# 8. 获取筛选选项(类别、类型)
# ============================================================
@staticmethod
def get_filter_options():
try:
@ -405,4 +385,4 @@ class ProductInboundService:
except Exception:
import traceback
traceback.print_exc()
return {"categories": [], "types": []}
return {"categories": [], "types": []}