Files
KCGL/inventory-backend/app/services/inbound/service_service.py
dxc 6131b474a1 (no commit message provided)
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
2026-02-09 12:11:01 +08:00

156 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app import db
from app.models.inbound.service import StockService
from app.models.base import MaterialBase
from datetime import datetime, timedelta
import re
class ServiceService:
"""服务权益库存业务逻辑"""
SKU_PREFIX = 'SRV'
SKU_DATE_FORMAT = '%Y%m%d'
SKU_SUFFIX_LEN = 4
@classmethod
def _generate_sku(cls):
"""生成唯一SKU格式 SRV-YYYYMMDD-XXXX"""
today_str = datetime.now().strftime(cls.SKU_DATE_FORMAT)
prefix = f'{cls.SKU_PREFIX}-{today_str}-'
# 查找今天已有的最大后缀
max_sku = db.session.query(db.func.max(StockService.sku)).filter(
StockService.sku.like(f'{prefix}%')
).scalar()
if not max_sku:
suffix_num = 1
else:
# 提取后缀数字
suffix_part = max_sku.replace(prefix, '')
match = re.match(r'^(\d+)', suffix_part)
suffix_num = int(match.group(1)) if match else 0
suffix_num += 1
# 格式化为4位数字左侧补零
suffix = str(suffix_num).zfill(cls.SKU_SUFFIX_LEN)
return f'{prefix}{suffix}'
@classmethod
def search_base_material(cls, keyword):
"""搜索基础物料,供前端远程选择"""
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
db.or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
)
)
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
})
return results
except Exception as e:
import traceback
traceback.print_exc()
return []
@classmethod
def create_service(cls, data):
"""创建服务权益记录"""
# 检查基础物料是否存在
base = MaterialBase.query.get(data.get('base_id'))
if not base:
raise ValueError('基础物料不存在')
# 生成SKU
sku = cls._generate_sku()
service = StockService(
base_id=data['base_id'],
sku=sku,
sale_price=data['sale_price'],
provider_name=data['provider_name'],
description=data.get('description', '')
)
db.session.add(service)
db.session.commit()
return service
@classmethod
def get_service(cls, service_id):
"""获取单个服务权益"""
service = StockService.query.filter_by(id=service_id, is_deleted=False).first()
if not service:
raise ValueError('服务权益记录不存在')
return service
@classmethod
def update_service(cls, service_id, data):
"""更新服务权益记录"""
service = cls.get_service(service_id)
# 不允许修改 base_id 和 sku业务上不允许变更基础物料
if 'sale_price' in data:
service.sale_price = data['sale_price']
if 'provider_name' in data:
service.provider_name = data['provider_name']
if 'description' in data:
service.description = data.get('description', '')
service.updated_at = datetime.now()
db.session.commit()
return service
@classmethod
def delete_service(cls, service_id):
"""软删除服务权益"""
service = cls.get_service(service_id)
service.is_deleted = True
service.updated_at = datetime.now()
db.session.commit()
return True
@classmethod
def get_service_list(cls, page=1, per_page=20, keyword=None,
start_date=None, end_date=None, provider_name=None):
"""分页查询服务权益列表"""
query = StockService.query.filter_by(is_deleted=False)
# 关键词搜索:可搜索 SKU 或 关联物料名称
if keyword:
# 子查询查找物料名称匹配的 base_id
subquery = MaterialBase.query.filter(
MaterialBase.name.ilike(f'%{keyword}%')
).subquery()
query = query.filter(
db.or_(
StockService.sku.ilike(f'%{keyword}%'),
StockService.base_id.in_([row.id for row in db.session.query(subquery.c.id)])
)
)
if start_date:
start = datetime.strptime(start_date, '%Y-%m-%d')
query = query.filter(StockService.created_at >= start)
if end_date:
end = datetime.strptime(end_date, '%Y-%m-%d')
# 包含当天
end = end + timedelta(days=1) - timedelta(seconds=1)
query = query.filter(StockService.created_at <= end)
if provider_name:
query = query.filter(StockService.provider_name.ilike(f'%{provider_name}%'))
# 总数
total = query.count()
# 分页
items = query.order_by(StockService.created_at.desc())\
.offset((page - 1) * per_page)\
.limit(per_page).all()
return {
'items': [item.to_dict() for item in items],
'total': total,
'page': page,
'per_page': per_page
}