265 lines
9.6 KiB
Python
265 lines
9.6 KiB
Python
# 文件路径: app/services/inbound/base_service.py
|
||
|
||
from app.extensions import db
|
||
from app.models.base import MaterialBase
|
||
from app.models.inbound.buy import StockBuy
|
||
from app.models.inbound.semi import StockSemi
|
||
# 假设您有 StockProduct 和 StockService 的模型定义
|
||
# from app.models.inbound.product import StockProduct
|
||
# from app.models.inbound.service import StockService
|
||
from sqlalchemy import or_
|
||
import traceback
|
||
import json
|
||
|
||
|
||
class MaterialBaseService:
|
||
"""
|
||
基础物料服务层
|
||
负责处理 MaterialBase 的增删改查及搜索逻辑
|
||
"""
|
||
|
||
@staticmethod
|
||
def search_material(keyword):
|
||
"""
|
||
根据关键字搜索已启用的基础物料
|
||
(供 /api/v1/inbound/base/search 接口调用)
|
||
"""
|
||
try:
|
||
if not keyword:
|
||
return []
|
||
|
||
query = MaterialBase.query.filter(
|
||
MaterialBase.is_enabled == True,
|
||
or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.common_name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
).limit(20)
|
||
|
||
results = []
|
||
for item in query.all():
|
||
results.append({
|
||
'id': item.id,
|
||
'name': item.name,
|
||
'commonName': item.common_name,
|
||
'spec': item.spec_model,
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type,
|
||
'status': '启用'
|
||
})
|
||
return results
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
@staticmethod
|
||
def _get_stock_counts(stock_query):
|
||
"""
|
||
辅助函数:安全计算库存列表的总数量
|
||
修复逻辑:优先查找 'stock_quantity' (Buy/Semi/Product表中实际使用的字段)
|
||
"""
|
||
total_inv = 0
|
||
total_avail = 0
|
||
|
||
# 如果 stock_query 是动态加载的查询对象 (AppenderQuery),需要迭代它
|
||
# 如果是列表,直接迭代
|
||
try:
|
||
items = list(stock_query) # 触发查询
|
||
except:
|
||
items = []
|
||
|
||
for x in items:
|
||
# 1. 获取库存数
|
||
# 【修复点】根据你提供的 Service 代码,Buy/Semi/Product 均使用 stock_quantity
|
||
# Service 使用 actual_quantity,这里做兼容查找
|
||
q = getattr(x, 'stock_quantity', getattr(x, 'actual_quantity', getattr(x, 'quantity', 0)))
|
||
|
||
# 2. 获取可用数
|
||
# 这里的字段名通常都是 available_quantity
|
||
a = getattr(x, 'available_quantity', q)
|
||
|
||
# 累加 (转 float 防止 None 或 Decimal 计算报错)
|
||
try:
|
||
total_inv += float(q if q is not None else 0)
|
||
total_avail += float(a if a is not None else 0)
|
||
except:
|
||
pass
|
||
|
||
return total_inv, total_avail
|
||
|
||
@staticmethod
|
||
def get_list(page, limit, filters=None):
|
||
"""
|
||
获取基础信息列表 (带分页和筛选)
|
||
并聚合库存总数和可用总数
|
||
"""
|
||
try:
|
||
query = MaterialBase.query
|
||
|
||
if filters:
|
||
# 1. 关键词模糊搜索 (名称 或 俗名 或 规格型号)
|
||
if filters.get('keyword'):
|
||
kw = f"%{filters['keyword']}%"
|
||
query = query.filter(or_(
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.common_name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw)
|
||
))
|
||
|
||
# 2. 精确筛选
|
||
if filters.get('category'):
|
||
query = query.filter_by(category=filters['category'])
|
||
|
||
if filters.get('type'):
|
||
query = query.filter_by(material_type=filters['type'])
|
||
|
||
if filters.get('isEnabled') is not None:
|
||
# 前端传 1/0,转为 Boolean
|
||
is_active = bool(int(filters['isEnabled']))
|
||
query = query.filter_by(is_enabled=is_active)
|
||
|
||
# 按 ID 倒序排列
|
||
pagination = query.order_by(MaterialBase.id.desc()).paginate(page=page, per_page=limit, error_out=False)
|
||
|
||
items_list = []
|
||
for item in pagination.items:
|
||
# 获取基础字典
|
||
item_dict = item.to_dict()
|
||
|
||
# [调用修复后的辅助函数]
|
||
|
||
# 1. 采购库存 (StockBuy)
|
||
buy_inv, buy_avail = MaterialBaseService._get_stock_counts(item.stock_buys)
|
||
|
||
# 2. 半成品库存 (StockSemi)
|
||
semi_inv, semi_avail = MaterialBaseService._get_stock_counts(item.stock_semis)
|
||
|
||
# 3. 成品库存 (StockProduct)
|
||
prod_inv, prod_avail = MaterialBaseService._get_stock_counts(item.stock_products)
|
||
|
||
# 4. 服务库存 (StockService)
|
||
# 使用 getattr 防止关联不存在时报错
|
||
serv_inv, serv_avail = MaterialBaseService._get_stock_counts(getattr(item, 'stock_services', []))
|
||
|
||
# 合并总数
|
||
item_dict['inventoryCount'] = buy_inv + semi_inv + prod_inv + serv_inv
|
||
item_dict['availableCount'] = buy_avail + semi_avail + prod_avail + serv_avail
|
||
|
||
items_list.append(item_dict)
|
||
|
||
return {"total": pagination.total, "items": items_list}
|
||
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
print(f"查询基础信息列表失败: {e}")
|
||
return {"total": 0, "items": []}
|
||
|
||
@staticmethod
|
||
def create_material(data):
|
||
"""新增基础信息"""
|
||
try:
|
||
# 0. 基础校验
|
||
if not data.get('name') or not data.get('spec'):
|
||
raise ValueError("名称和规格型号不能为空")
|
||
|
||
# 1. 查重
|
||
exist = MaterialBase.query.filter_by(
|
||
name=data['name'],
|
||
spec_model=data['spec']
|
||
).first()
|
||
if exist:
|
||
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
|
||
|
||
# 2. 创建对象 (列表转JSON字符串)
|
||
new_material = MaterialBase(
|
||
name=data['name'],
|
||
common_name=data.get('commonName'),
|
||
spec_model=data['spec'],
|
||
category=data.get('category'),
|
||
material_type=data.get('type'),
|
||
unit=data.get('unit'),
|
||
visibility_level=data.get('visibilityLevel'),
|
||
# 修改:将列表 dumps 为字符串
|
||
manual_link=json.dumps(data.get('generalManual', [])),
|
||
product_image=json.dumps(data.get('generalImage', [])),
|
||
is_enabled=True if data.get('isEnabled', 1) == 1 else False
|
||
)
|
||
|
||
db.session.add(new_material)
|
||
db.session.commit()
|
||
return new_material
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def update_material(m_id, data):
|
||
"""修改基础信息"""
|
||
try:
|
||
material = MaterialBase.query.get(m_id)
|
||
if not material:
|
||
raise ValueError("数据不存在")
|
||
|
||
# 更新字段
|
||
if 'name' in data: material.name = data['name']
|
||
if 'commonName' in data: material.common_name = data['commonName']
|
||
if 'spec' in data: material.spec_model = data['spec']
|
||
if 'category' in data: material.category = data['category']
|
||
if 'type' in data: material.material_type = data['type']
|
||
if 'unit' in data: material.unit = data['unit']
|
||
if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel']
|
||
|
||
# 修改:将列表 dumps 为字符串
|
||
if 'generalManual' in data:
|
||
material.manual_link = json.dumps(data['generalManual'])
|
||
if 'generalImage' in data:
|
||
material.product_image = json.dumps(data['generalImage'])
|
||
|
||
if 'isEnabled' in data:
|
||
material.is_enabled = bool(int(data['isEnabled']))
|
||
|
||
db.session.commit()
|
||
return material
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def delete_material(m_id):
|
||
"""
|
||
删除基础信息 (带依赖检查)
|
||
"""
|
||
try:
|
||
material = MaterialBase.query.get(m_id)
|
||
if not material:
|
||
raise ValueError("数据不存在")
|
||
|
||
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
|
||
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
|
||
|
||
# 如果需要检查成品和服务,可以解开注释
|
||
# from app.models.inbound.product import StockProduct
|
||
# prod_usage_count = StockProduct.query.filter_by(base_id=m_id).count()
|
||
|
||
total_usage = buy_usage_count + semi_usage_count
|
||
|
||
if total_usage > 0:
|
||
raise ValueError(
|
||
f"无法删除:该基础物料正被使用中。\n"
|
||
f"- 采购库存记录: {buy_usage_count} 条\n"
|
||
f"- 半成品库存记录: {semi_usage_count} 条\n"
|
||
f"请先清理相关库存或仅‘禁用’此条目。"
|
||
)
|
||
|
||
db.session.delete(material)
|
||
db.session.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print(f"删除基础信息失败: {e}")
|
||
raise e |