feat: add BOM cost calculation for product inbound service

This commit is contained in:
dxc
2026-03-02 11:44:50 +08:00
committed by dxc (aider)
parent b08196c479
commit 7b0082c6e0

View File

@ -7,11 +7,9 @@ from sqlalchemy import or_, func, text, and_
import traceback import traceback
import json import json
class ProductInboundService: class ProductInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod @staticmethod
def _check_unique(serial_number, exclude_id=None): def _check_unique(serial_number, exclude_id=None):
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -24,9 +22,6 @@ class ProductInboundService:
occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料" occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。") raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
# ============================================================
# 1. 基础物料搜索 (已修改支持分页)
# ============================================================
@staticmethod @staticmethod
def search_base_material(keyword, page=1, limit=50): def search_base_material(keyword, page=1, limit=50):
try: try:
@ -37,7 +32,7 @@ class ProductInboundService:
or_( or_(
MaterialBase.name.ilike(kw), MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw), MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw) # [新增] MaterialBase.company_name.ilike(kw)
) )
) )
query = query.order_by(MaterialBase.id.desc()) query = query.order_by(MaterialBase.id.desc())
@ -46,7 +41,7 @@ class ProductInboundService:
for item in pagination.items: for item in pagination.items:
results.append({ results.append({
'id': item.id, 'id': item.id,
'company_name': item.company_name, # [新增] 'company_name': item.company_name,
'name': item.name, 'name': item.name,
'spec': item.spec_model, 'spec': item.spec_model,
'category': item.category, 'category': item.category,
@ -64,9 +59,6 @@ class ProductInboundService:
traceback.print_exc() traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False} return {"items": [], "total": 0, "page": 1, "has_next": False}
# ============================================================
# 1.5 BOM 搜索逻辑
# ============================================================
@staticmethod @staticmethod
def search_bom_options(keyword): def search_bom_options(keyword):
from app.models.bom import BomTable from app.models.bom import BomTable
@ -103,9 +95,6 @@ class ProductInboundService:
traceback.print_exc() traceback.print_exc()
return [] return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod @staticmethod
def handle_inbound(data): def handle_inbound(data):
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -200,9 +189,6 @@ class ProductInboundService:
db.session.rollback() db.session.rollback()
raise e raise e
# ============================================================
# 3. 更新逻辑
# ============================================================
@staticmethod @staticmethod
def update_inbound(stock_id, data): def update_inbound(stock_id, data):
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -265,9 +251,6 @@ class ProductInboundService:
db.session.rollback() db.session.rollback()
raise e raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod @staticmethod
def delete_inbound(stock_id): def delete_inbound(stock_id):
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -281,9 +264,6 @@ class ProductInboundService:
db.session.rollback() db.session.rollback()
raise e raise e
# ============================================================
# 5. 出库历史
# ============================================================
@staticmethod @staticmethod
def get_outbound_history(stock_id): def get_outbound_history(stock_id):
try: try:
@ -294,9 +274,6 @@ class ProductInboundService:
except: except:
return [] return []
# ============================================================
# 6. 获取列表
# ============================================================
@staticmethod @staticmethod
def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None): def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None):
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -307,7 +284,7 @@ class ProductInboundService:
query = query.filter(or_( query = query.filter(or_(
MaterialBase.name.ilike(kw), MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw), MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw), # [新增] MaterialBase.company_name.ilike(kw),
StockProduct.serial_number.ilike(kw), StockProduct.serial_number.ilike(kw),
StockProduct.work_order_code.ilike(kw), StockProduct.work_order_code.ilike(kw),
StockProduct.order_id.ilike(kw), StockProduct.order_id.ilike(kw),
@ -318,7 +295,6 @@ class ProductInboundService:
if material_type and material_type.strip(): if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip()) query = query.filter(MaterialBase.material_type == material_type.strip())
# [新增]
if company and company.strip(): if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip()) query = query.filter(MaterialBase.company_name == company.strip())
@ -346,7 +322,7 @@ class ProductInboundService:
items = [] items = []
for item in current_items: for item in current_items:
items.append(item.to_dict()) # 使用 Model to_dict items.append(item.to_dict())
return {"total": pagination.total, "items": items} return {"total": pagination.total, "items": items}
except: except:
traceback.print_exc() traceback.print_exc()
@ -374,29 +350,20 @@ class ProductInboundService:
except Exception: except Exception:
return [] return []
# ============================================================
# 7. 获取筛选项
# ============================================================
@staticmethod @staticmethod
def get_filter_options(): def get_filter_options():
try: try:
from app.models.base import MaterialBase from app.models.base import MaterialBase
# 类别 categories = db.session.query(MaterialBase.category).filter(MaterialBase.category != None,
categories = db.session.query(MaterialBase.category) \ MaterialBase.category != '').distinct().all()
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
sorted_categories = sorted([r[0] for r in categories]) sorted_categories = sorted([r[0] for r in categories])
# 类型 types = db.session.query(MaterialBase.material_type).filter(MaterialBase.material_type != None,
types = db.session.query(MaterialBase.material_type) \ MaterialBase.material_type != '').distinct().all()
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([r[0] for r in types]) sorted_types = sorted([r[0] for r in types])
# [新增] 公司 companies = db.session.query(MaterialBase.company_name).filter(MaterialBase.company_name != None,
companies = db.session.query(MaterialBase.company_name) \ MaterialBase.company_name != '').distinct().all()
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([r[0] for r in companies]) sorted_companies = sorted([r[0] for r in companies])
return { return {
@ -409,9 +376,6 @@ class ProductInboundService:
traceback.print_exc() traceback.print_exc()
return {"categories": [], "types": [], "companies": []} return {"categories": [], "types": [], "companies": []}
# ============================================================
# 8. 获取历史负责人建议 (修改为全局查询)
# ============================================================
@staticmethod @staticmethod
def get_history_managers(keyword=None): def get_history_managers(keyword=None):
from app.models.inbound.product import StockProduct from app.models.inbound.product import StockProduct
@ -427,3 +391,42 @@ class ProductInboundService:
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
return [] return []
# ============================================================
# 9. BOM 原材料成本自动核算 (新增)
# ============================================================
@staticmethod
def calculate_bom_cost(bom_no, bom_version):
"""
根据 BOM 编号和版本计算原材料总成本
遍历 BOM 子件,取每个子件在采购、半成品、成品三个表中的最高单价,乘以用量后累加
"""
from app.models.bom import BomLine
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import func
try:
bom_lines = BomLine.query.filter(
BomLine.bom_no == bom_no,
BomLine.bom_version == bom_version
).all()
total_cost = 0.0
for line in bom_lines:
component_base_id = line.component_id
usage_qty = float(line.usage_quantity or 1.0)
buy_price = db.session.query(func.max(StockBuy.pre_tax_unit_price)).filter(
StockBuy.base_id == component_base_id).scalar() or 0.0
semi_price = db.session.query(func.max(StockSemi.unit_total_cost)).filter(
StockSemi.base_id == component_base_id).scalar() or 0.0
product_price = db.session.query(func.max(StockProduct.unit_total_cost)).filter(
StockProduct.base_id == component_base_id).scalar() or 0.0
max_price = max(buy_price, semi_price, product_price)
total_cost += max_price * usage_qty
return round(total_cost, 2)
except Exception as e:
traceback.print_exc()
raise e