Files
KCGL/inventory-backend/app/services/inbound/service_service.py
dxc 9f0134b2e4 feat: add material search filters to semi, product, service modules
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
2026-02-11 14:42:16 +08:00

229 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/service_service.py
from app import db
from app.models.inbound.service import StockService
from app.models.base import MaterialBase
from datetime import datetime, timedelta
import re
class ServiceService:
"""服务权益库存业务逻辑"""
SKU_PREFIX = 'SRV'
SKU_DATE_FORMAT = '%Y%m%d'
SKU_SUFFIX_LEN = 4
@classmethod
def _generate_sku(cls):
"""生成唯一SKU格式 SRV-YYYYMMDD-XXXX"""
today_str = datetime.now().strftime(cls.SKU_DATE_FORMAT)
prefix = f'{cls.SKU_PREFIX}-{today_str}-'
# 查找今天已有的最大后缀
max_sku = db.session.query(db.func.max(StockService.sku)).filter(
StockService.sku.like(f'{prefix}%')
).scalar()
if not max_sku:
suffix_num = 1
else:
# 提取后缀数字
suffix_part = max_sku.replace(prefix, '')
match = re.match(r'^(\d+)', suffix_part)
suffix_num = int(match.group(1)) if match else 0
suffix_num += 1
# 格式化为4位数字左侧补零
suffix = str(suffix_num).zfill(cls.SKU_SUFFIX_LEN)
return f'{prefix}{suffix}'
@classmethod
def search_base_material(cls, keyword):
"""搜索基础物料,供前端远程选择"""
try:
# [核心修改] 只查询已启用的物料
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
db.or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
)
)
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
})
return results
except Exception as e:
import traceback
traceback.print_exc()
return []
@classmethod
def create_service(cls, data):
"""创建服务权益记录"""
# 检查基础物料是否存在
base_id = data.get('base_id')
base = MaterialBase.query.get(base_id)
if not base:
raise ValueError('基础物料不存在')
# [核心修改] 后端二次校验:如果物料已停用,禁止创建服务权益
if not base.is_enabled:
raise ValueError(f"物料【{base.name}】已停用,无法创建新的服务权益。")
# 生成SKU
sku = cls._generate_sku()
service = StockService(
base_id=data['base_id'],
sku=sku,
sale_price=data['sale_price'],
provider_name=data['provider_name'],
description=data.get('description', '')
)
db.session.add(service)
db.session.commit()
return service
@classmethod
def get_service(cls, service_id):
"""获取单个服务权益"""
service = StockService.query.filter_by(id=service_id, is_deleted=False).first()
if not service:
raise ValueError('服务权益记录不存在')
return service
@classmethod
def update_service(cls, service_id, data):
"""更新服务权益记录"""
service = cls.get_service(service_id)
# 不允许修改 base_id 和 sku业务上不允许变更基础物料
if 'sale_price' in data:
service.sale_price = data['sale_price']
if 'provider_name' in data:
service.provider_name = data['provider_name']
if 'description' in data:
service.description = data.get('description', '')
service.updated_at = datetime.now()
db.session.commit()
return service
@classmethod
def delete_service(cls, service_id):
"""软删除服务权益"""
service = cls.get_service(service_id)
service.is_deleted = True
service.updated_at = datetime.now()
db.session.commit()
return True
@classmethod
def get_service_list(cls, page=1, per_page=20, keyword=None,
start_date=None, end_date=None, provider_name=None):
"""分页查询服务权益列表"""
query = StockService.query.filter_by(is_deleted=False)
# 关键词搜索:可搜索 SKU 或 关联物料名称
if keyword:
# 子查询查找物料名称匹配的 base_id
subquery = MaterialBase.query.filter(
MaterialBase.name.ilike(f'%{keyword}%')
).subquery()
query = query.filter(
db.or_(
StockService.sku.ilike(f'%{keyword}%'),
StockService.base_id.in_([row.id for row in db.session.query(subquery.c.id)])
)
)
if start_date:
start = datetime.strptime(start_date, '%Y-%m-%d')
query = query.filter(StockService.created_at >= start)
if end_date:
end = datetime.strptime(end_date, '%Y-%m-%d')
# 包含当天
end = end + timedelta(days=1) - timedelta(seconds=1)
query = query.filter(StockService.created_at <= end)
if provider_name:
query = query.filter(StockService.provider_name.ilike(f'%{provider_name}%'))
# 总数
total = query.count()
# 分页
items = query.order_by(StockService.created_at.desc()) \
.offset((page - 1) * per_page) \
.limit(per_page).all()
return {
'items': [item.to_dict() for item in items],
'total': total,
'page': page,
'per_page': per_page
}
# ============================================================
# 供应商历史查询
# ============================================================
@classmethod
def get_history_providers(cls, base_id):
"""返回该物料关联的服务商列表(去重)"""
try:
query = db.session.query(StockService.provider_name).filter(
StockService.base_id == base_id,
StockService.provider_name.isnot(None)
).distinct().order_by(StockService.provider_name)
providers = [row[0] for row in query.all()]
return providers
except Exception:
return []
# ============================================================
# 系统用户搜索
# ============================================================
@classmethod
def search_system_users(cls, keyword):
"""搜索系统用户(活跃状态)"""
from app.models.system import SysUser
try:
query = SysUser.query.filter(SysUser.status == 'active')
if keyword:
kw = f'%{keyword}%'
query = query.filter(db.or_(
SysUser.username.ilike(kw),
SysUser.email.ilike(kw)
))
query = query.order_by(SysUser.username)
users = []
for u in query.limit(20).all():
users.append({
'value': u.username,
'email': u.email
})
return users
except Exception:
return []
# ============================================================
# 获取筛选选项(类别、类型)
# ============================================================
@classmethod
def get_filter_options(cls):
try:
from app.models.base import MaterialBase
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
return {
"categories": [r[0] for r in categories],
"types": [r[0] for r in types]
}
except Exception:
import traceback
traceback.print_exc()
return {"categories": [], "types": []}