feat: refactor cost handling and add BOM cost calculation

This commit is contained in:
dxc
2026-03-02 11:51:24 +08:00
committed by dxc (aider)
parent d3510b0261
commit 9cfbdc7d13

View File

@ -7,11 +7,9 @@ from sqlalchemy import or_, func, text, and_
import traceback
import json
class SemiInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
from app.models.inbound.semi import StockSemi
@ -34,9 +32,6 @@ class SemiInboundService:
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复建单,建议在原批次上追加库存。")
# ============================================================
# 1. 基础物料搜索 (已修改支持分页)
# ============================================================
@staticmethod
def search_base_material(keyword, page=1, limit=50):
try:
@ -47,7 +42,7 @@ class SemiInboundService:
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw) # [新增] 支持搜公司
MaterialBase.company_name.ilike(kw)
)
)
query = query.order_by(MaterialBase.id.desc())
@ -56,7 +51,7 @@ class SemiInboundService:
for item in pagination.items:
results.append({
'id': item.id,
'company_name': item.company_name, # [新增]
'company_name': item.company_name,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
@ -64,19 +59,11 @@ class SemiInboundService:
'type': item.material_type,
'status': '启用'
})
return {
"items": results,
"total": pagination.total,
"page": page,
"has_next": pagination.has_next
}
return {"items": results, "total": pagination.total, "page": page, "has_next": pagination.has_next}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False}
# ============================================================
# 1.5 BOM 搜索逻辑
# ============================================================
@staticmethod
def search_bom_options(keyword):
from app.models.bom import BomTable
@ -113,9 +100,6 @@ class SemiInboundService:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
from app.models.inbound.semi import StockSemi
@ -172,9 +156,9 @@ class SemiInboundService:
in_qty = float(data.get('in_quantity') or 0)
raw_cost = float(data.get('raw_material_cost') or 0)
manual_cost = 0.0 # 字段已弃用,保持向后兼容
unit_total_cost = float(data.get('unit_total_cost') or raw_cost or 0)
total_value = unit_total_cost * in_qty
# 【重要修改】:把前端的 unit_total_cost单件成本存入原数据库的 manual_cost 字段中
unit_cost = float(data.get('unit_total_cost') or raw_cost)
total_value = unit_cost * in_qty
next_global_id = 0
try:
@ -220,8 +204,7 @@ class SemiInboundService:
production_end_time=p_end,
production_time_range=time_range_str,
raw_material_cost=raw_cost,
manual_cost=manual_cost, # 保持 0.0
unit_total_cost=unit_total_cost,
manual_cost=unit_cost, # 映射到 manual_cost 物理字段
total_price=total_value,
arrival_photo=json.dumps(arrival_list),
quality_report_link=json.dumps(quality_report_list),
@ -237,9 +220,6 @@ class SemiInboundService:
traceback.print_exc()
raise e
# ============================================================
# 3. 更新逻辑
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.semi import StockSemi
@ -325,11 +305,12 @@ class SemiInboundService:
if 'raw_material_cost' in data:
stock.raw_material_cost = float(data['raw_material_cost'])
if 'unit_total_cost' in data:
stock.unit_total_cost = float(data['unit_total_cost'])
stock.manual_cost = float(data['unit_total_cost']) # 映射到 manual_cost 物理字段
if 'unit_total_cost' in data or qty_changed:
qty = float(stock.in_quantity or 1)
stock.total_price = float(stock.unit_total_cost or 0) * qty
# 使用存入 manual_cost 的单价计算总价
stock.total_price = float(stock.manual_cost or 0) * qty
db.session.commit()
return stock
@ -337,9 +318,6 @@ class SemiInboundService:
db.session.rollback()
raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod
def delete_inbound(stock_id):
from app.models.inbound.semi import StockSemi
@ -354,9 +332,6 @@ class SemiInboundService:
db.session.rollback()
raise e
# ============================================================
# 5. 出库历史
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
try:
@ -367,9 +342,6 @@ class SemiInboundService:
except:
return []
# ============================================================
# 6. 获取列表
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None):
from app.models.inbound.semi import StockSemi
@ -381,7 +353,7 @@ class SemiInboundService:
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw), # [新增]
MaterialBase.company_name.ilike(kw),
StockSemi.batch_number.ilike(kw),
StockSemi.serial_number.ilike(kw),
StockSemi.sku.ilike(kw),
@ -394,7 +366,6 @@ class SemiInboundService:
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
# [新增] 公司筛选
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())
@ -411,18 +382,12 @@ class SemiInboundService:
)
pagination = query.order_by(StockSemi.production_date.desc()).paginate(page=page, per_page=limit,
error_out=False)
current_items = pagination.items
def parse_img(json_str):
if not json_str: return []
try:
return json.loads(json_str) if json_str.startswith('[') else [json_str]
except:
return []
items = []
for item in current_items:
items.append(item.to_dict()) # 直接使用 Model 的 to_dict (已包含 company_name)
for item in pagination.items:
# 把 manual_cost 伪装成 unit_total_cost 返回给前端
item_dict = item.to_dict()
item_dict['unit_total_cost'] = float(item.manual_cost or 0)
items.append(item_dict)
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"List Error: {e}")
@ -451,29 +416,18 @@ class SemiInboundService:
except Exception:
return []
# ============================================================
# 7. 获取筛选项 (排序)
# ============================================================
@staticmethod
def get_filter_options():
try:
from app.models.base import MaterialBase
# 类别
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
categories = db.session.query(MaterialBase.category).filter(MaterialBase.category != None,
MaterialBase.category != '').distinct().all()
sorted_categories = sorted([r[0] for r in categories])
# 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
types = db.session.query(MaterialBase.material_type).filter(MaterialBase.material_type != None,
MaterialBase.material_type != '').distinct().all()
sorted_types = sorted([r[0] for r in types])
# [新增] 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
companies = db.session.query(MaterialBase.company_name).filter(MaterialBase.company_name != None,
MaterialBase.company_name != '').distinct().all()
sorted_companies = sorted([r[0] for r in companies])
return {
@ -482,13 +436,9 @@ class SemiInboundService:
"companies": sorted_companies
}
except Exception:
import traceback
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
# ============================================================
# 8. 获取历史生产负责人 (修改为全局查询)
# ============================================================
@staticmethod
def get_history_managers(keyword=None):
from app.models.inbound.semi import StockSemi
@ -504,3 +454,49 @@ class SemiInboundService:
except Exception:
traceback.print_exc()
return []
@staticmethod
def calculate_bom_cost(bom_no, bom_version):
"""
【防崩修正版】查询物理字段:采购的税前价 / 半成品和成品的 manual_cost(映射后的单件成本)
"""
from app.models.bom import bom_table # 假设这可能是 SQLAlchemy 表对象或类
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import func
try:
# 兼容你的表名
bom_lines = db.session.execute(
text("SELECT child_id, dosage FROM bom_table WHERE bom_no = :bom_no AND version = :version"),
{'bom_no': bom_no, 'version': bom_version}
).fetchall()
total_cost = 0.0
for line in bom_lines:
component_base_id = line[0]
usage_qty = float(line[1] or 1.0)
# 查采购表最高价
buy_price = db.session.query(func.max(StockBuy.pre_tax_unit_price)).filter(
StockBuy.base_id == component_base_id
).scalar() or 0.0
# 查半成品表最高价 (直接查 manual_cost因为它存的就是单件成本)
semi_price = db.session.query(func.max(StockSemi.manual_cost)).filter(
StockSemi.base_id == component_base_id
).scalar() or 0.0
# 查成品表最高价
product_price = db.session.query(func.max(StockProduct.manual_cost)).filter(
StockProduct.base_id == component_base_id
).scalar() or 0.0
max_price = max(buy_price, semi_price, product_price)
total_cost += max_price * usage_qty
return round(total_cost, 2)
except Exception as e:
traceback.print_exc()
raise e