Files
KCGL/inventory-backend/app/services/inbound/base_service.py

279 lines
10 KiB
Python

# 文件路径: app/services/inbound/base_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
# from app.models.inbound.product import StockProduct
# from app.models.inbound.service import StockService
from sqlalchemy import or_
import traceback
import json
class MaterialBaseService:
"""
基础物料服务层
负责处理 MaterialBase 的增删改查及搜索逻辑
"""
@staticmethod
def search_material(keyword):
"""
根据关键字搜索已启用的基础物料
(供 /api/v1/inbound/base/search 接口调用)
"""
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.common_name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
# 支持搜索公司名
MaterialBase.company_name.ilike(f'%{keyword}%')
)
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'companyName': item.company_name,
'name': item.name,
'commonName': item.common_name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def _get_stock_counts(stock_query):
"""
辅助函数:安全计算库存列表的总数量
"""
total_inv = 0
total_avail = 0
try:
items = list(stock_query) # 触发查询
except:
items = []
for x in items:
# 1. 获取库存数 (兼容不同字段名)
q = getattr(x, 'stock_quantity', getattr(x, 'actual_quantity', getattr(x, 'quantity', 0)))
# 2. 获取可用数
a = getattr(x, 'available_quantity', q)
try:
total_inv += float(q if q is not None else 0)
total_avail += float(a if a is not None else 0)
except:
pass
return total_inv, total_avail
@staticmethod
def get_list(page, limit, filters=None):
"""
获取基础信息列表 (带分页和筛选)
"""
try:
query = MaterialBase.query
if filters:
# 1. 关键词模糊搜索
if filters.get('keyword'):
kw = f"%{filters['keyword']}%"
query = query.filter(or_(
MaterialBase.name.ilike(kw),
MaterialBase.common_name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
))
# 2. 精确筛选
# 公司筛选
if filters.get('company'):
query = query.filter_by(company_name=filters['company'])
if filters.get('category'):
query = query.filter_by(category=filters['category'])
if filters.get('type'):
query = query.filter_by(material_type=filters['type'])
if filters.get('isEnabled') is not None:
is_active = bool(int(filters['isEnabled']))
query = query.filter_by(is_enabled=is_active)
# 按 ID 倒序排列
pagination = query.order_by(MaterialBase.id.desc()).paginate(page=page, per_page=limit, error_out=False)
items_list = []
for item in pagination.items:
item_dict = item.to_dict()
# 聚合库存
buy_inv, buy_avail = MaterialBaseService._get_stock_counts(item.stock_buys)
semi_inv, semi_avail = MaterialBaseService._get_stock_counts(item.stock_semis)
prod_inv, prod_avail = MaterialBaseService._get_stock_counts(item.stock_products)
serv_inv, serv_avail = MaterialBaseService._get_stock_counts(getattr(item, 'stock_services', []))
item_dict['inventoryCount'] = buy_inv + semi_inv + prod_inv + serv_inv
item_dict['availableCount'] = buy_avail + semi_avail + prod_avail + serv_avail
items_list.append(item_dict)
return {"total": pagination.total, "items": items_list}
except Exception as e:
traceback.print_exc()
print(f"查询基础信息列表失败: {e}")
return {"total": 0, "items": []}
@staticmethod
def get_distinct_options():
"""
获取所有已存在的类别、类型、公司 (去重且排序)
"""
try:
# 1. 类别 (获取后在内存或前端做层级处理,这里先按字母序返回扁平列表)
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
# 对类别进行排序
sorted_categories = sorted([c[0] for c in categories])
# 2. 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([t[0] for t in types])
# 3. 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([c[0] for c in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception as e:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
@staticmethod
def create_material(data):
"""新增基础信息"""
try:
if not data.get('name') or not data.get('spec'):
raise ValueError("名称和规格型号不能为空")
exist = MaterialBase.query.filter_by(
name=data['name'],
spec_model=data['spec']
).first()
if exist:
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
new_material = MaterialBase(
# [修改] 移除了 'IRIS' 默认值
company_name=data.get('companyName'),
name=data['name'],
common_name=data.get('commonName'),
spec_model=data['spec'],
category=data.get('category'),
material_type=data.get('type'),
unit=data.get('unit'),
visibility_level=data.get('visibilityLevel'),
manual_link=json.dumps(data.get('generalManual', [])),
product_image=json.dumps(data.get('generalImage', [])),
is_enabled=True if data.get('isEnabled', 1) == 1 else False
)
db.session.add(new_material)
db.session.commit()
return new_material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_material(m_id, data):
"""修改基础信息"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
# 更新字段
if 'companyName' in data: material.company_name = data['companyName']
if 'name' in data: material.name = data['name']
if 'commonName' in data: material.common_name = data['commonName']
if 'spec' in data: material.spec_model = data['spec']
if 'category' in data: material.category = data['category']
if 'type' in data: material.material_type = data['type']
if 'unit' in data: material.unit = data['unit']
if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel']
if 'generalManual' in data:
material.manual_link = json.dumps(data['generalManual'])
if 'generalImage' in data:
material.product_image = json.dumps(data['generalImage'])
if 'isEnabled' in data:
material.is_enabled = bool(int(data['isEnabled']))
db.session.commit()
return material
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_material(m_id):
"""
删除基础信息 (带依赖检查)
"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
total_usage = buy_usage_count + semi_usage_count
if total_usage > 0:
raise ValueError(
f"无法删除:该基础物料正被使用中。\n"
f"- 采购库存记录: {buy_usage_count}\n"
f"- 半成品库存记录: {semi_usage_count}\n"
f"请先清理相关库存或仅‘禁用’此条目。"
)
db.session.delete(material)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"删除基础信息失败: {e}")
raise e