Files
KCGL/inventory-backend/app/services/inbound/service_service.py

239 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# inventory-backend/app/services/inbound/service_service.py
from app.extensions import db
from app.models.inbound.service import StockService
from app.models.base import MaterialBase
from datetime import datetime, timedelta
import re
import traceback
class ServiceService:
"""服务权益库存业务逻辑"""
SKU_PREFIX = 'SRV'
SKU_DATE_FORMAT = '%Y%m%d'
SKU_SUFFIX_LEN = 4
@classmethod
def _generate_sku(cls):
"""生成唯一SKU格式 SRV-YYYYMMDD-XXXX"""
today_str = datetime.now().strftime(cls.SKU_DATE_FORMAT)
prefix = f'{cls.SKU_PREFIX}-{today_str}-'
# 查找今天已有的最大后缀
max_sku = db.session.query(db.func.max(StockService.sku)).filter(
StockService.sku.like(f'{prefix}%')
).scalar()
if not max_sku:
suffix_num = 1
else:
# 提取后缀数字
suffix_part = max_sku.replace(prefix, '')
try:
match = re.search(r'(\d+)$', suffix_part)
suffix_num = int(match.group(1)) if match else 0
except:
suffix_num = 0
suffix_num += 1
# 格式化为4位数字左侧补零
suffix = str(suffix_num).zfill(cls.SKU_SUFFIX_LEN)
return f'{prefix}{suffix}'
@classmethod
def search_base_material(cls, keyword):
"""搜索基础物料,供前端远程选择"""
try:
# 只查询已启用的物料
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
db.or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
)
)
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
})
return results
except Exception:
traceback.print_exc()
return []
@classmethod
def create_service(cls, data):
"""创建服务权益记录"""
# 1. 检查基础物料
base_id = data.get('base_id')
base = MaterialBase.query.get(base_id)
if not base:
raise ValueError('基础物料不存在')
if not base.is_enabled:
raise ValueError(f"物料【{base.name}】已停用,无法创建新的服务权益。")
# 2. 生成SKU
sku = cls._generate_sku()
# 3. 创建对象 (不包含库存数量字段)
service = StockService(
base_id=data['base_id'],
sku=sku,
sale_price=data.get('sale_price', 0),
provider_name=data.get('provider_name', ''),
description=data.get('description', ''),
# 可选字段映射
service_category=data.get('service_category', ''),
contract_id=data.get('contract_id', ''),
contact_person=data.get('contact_person', ''),
valid_period=data.get('valid_period', ''),
cost_price=data.get('cost_price', 0)
)
db.session.add(service)
db.session.commit()
return service
@classmethod
def get_service(cls, service_id):
"""获取单个详情"""
service = StockService.query.filter_by(id=service_id, is_deleted=False).first()
if not service:
raise ValueError('服务权益记录不存在')
return service
@classmethod
def update_service(cls, service_id, data):
"""更新服务权益"""
service = cls.get_service(service_id)
# 允许更新的字段
if 'sale_price' in data:
service.sale_price = data['sale_price']
if 'provider_name' in data:
service.provider_name = data['provider_name']
if 'description' in data:
service.description = data.get('description', '')
if 'cost_price' in data:
service.cost_price = data.get('cost_price', 0)
if 'contract_id' in data:
service.contract_id = data.get('contract_id', '')
if 'contact_person' in data:
service.contact_person = data.get('contact_person', '')
if 'valid_period' in data:
service.valid_period = data.get('valid_period', '')
service.updated_at = datetime.now()
db.session.commit()
return service
@classmethod
def delete_service(cls, service_id):
"""软删除"""
service = cls.get_service(service_id)
service.is_deleted = True
service.updated_at = datetime.now()
db.session.commit()
return True
@classmethod
def get_service_list(cls, page=1, per_page=20, keyword=None,
start_date=None, end_date=None, provider_name=None):
"""分页查询列表"""
try:
query = StockService.query.filter_by(is_deleted=False)
# 关键词联表搜索
if keyword:
query = query.join(StockService.base).filter(
db.or_(
StockService.sku.ilike(f'%{keyword}%'),
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
# 日期过滤
if start_date:
try:
start = datetime.strptime(start_date, '%Y-%m-%d')
query = query.filter(StockService.created_at >= start)
except ValueError:
pass
if end_date:
try:
end = datetime.strptime(end_date, '%Y-%m-%d')
# 包含当天结束
end = end + timedelta(days=1) - timedelta(seconds=1)
query = query.filter(StockService.created_at <= end)
except ValueError:
pass
# 服务商过滤
if provider_name:
query = query.filter(StockService.provider_name.ilike(f'%{provider_name}%'))
total = query.count()
items = query.order_by(StockService.created_at.desc()) \
.offset((page - 1) * per_page) \
.limit(per_page).all()
return {
'items': [item.to_dict() for item in items],
'total': total,
'page': page,
'per_page': per_page
}
except Exception as e:
traceback.print_exc()
raise e
@classmethod
def get_history_providers(cls, base_id):
"""获取历史供应商"""
try:
query = db.session.query(StockService.provider_name).filter(
StockService.base_id == base_id,
StockService.provider_name.isnot(None),
StockService.provider_name != ''
).distinct().order_by(StockService.provider_name)
return [row[0] for row in query.all()]
except Exception:
return []
@classmethod
def search_system_users(cls, keyword):
"""搜索系统用户(占位)"""
return []
@classmethod
def get_filter_options(cls):
"""获取筛选下拉选项"""
try:
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
return {
"categories": [r[0] for r in categories],
"types": [r[0] for r in types]
}
except Exception:
return {"categories": [], "types": []}