239 lines
8.1 KiB
Python
239 lines
8.1 KiB
Python
# inventory-backend/app/services/inbound/service_service.py
|
||
from app.extensions import db
|
||
from app.models.inbound.service import StockService
|
||
from app.models.base import MaterialBase
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
import traceback
|
||
|
||
|
||
class ServiceService:
|
||
"""服务权益库存业务逻辑"""
|
||
|
||
SKU_PREFIX = 'SRV'
|
||
SKU_DATE_FORMAT = '%Y%m%d'
|
||
SKU_SUFFIX_LEN = 4
|
||
|
||
@classmethod
|
||
def _generate_sku(cls):
|
||
"""生成唯一SKU,格式 SRV-YYYYMMDD-XXXX"""
|
||
today_str = datetime.now().strftime(cls.SKU_DATE_FORMAT)
|
||
prefix = f'{cls.SKU_PREFIX}-{today_str}-'
|
||
|
||
# 查找今天已有的最大后缀
|
||
max_sku = db.session.query(db.func.max(StockService.sku)).filter(
|
||
StockService.sku.like(f'{prefix}%')
|
||
).scalar()
|
||
|
||
if not max_sku:
|
||
suffix_num = 1
|
||
else:
|
||
# 提取后缀数字
|
||
suffix_part = max_sku.replace(prefix, '')
|
||
try:
|
||
match = re.search(r'(\d+)$', suffix_part)
|
||
suffix_num = int(match.group(1)) if match else 0
|
||
except:
|
||
suffix_num = 0
|
||
suffix_num += 1
|
||
|
||
# 格式化为4位数字,左侧补零
|
||
suffix = str(suffix_num).zfill(cls.SKU_SUFFIX_LEN)
|
||
return f'{prefix}{suffix}'
|
||
|
||
@classmethod
|
||
def search_base_material(cls, keyword):
|
||
"""搜索基础物料,供前端远程选择"""
|
||
try:
|
||
# 只查询已启用的物料
|
||
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
|
||
|
||
if keyword:
|
||
query = query.filter(
|
||
db.or_(
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%'),
|
||
)
|
||
)
|
||
query = query.order_by(MaterialBase.id.desc()).limit(20)
|
||
|
||
results = []
|
||
for item in query.all():
|
||
results.append({
|
||
'id': item.id,
|
||
'name': item.name,
|
||
'spec': item.spec_model,
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type,
|
||
})
|
||
return results
|
||
except Exception:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
@classmethod
|
||
def create_service(cls, data):
|
||
"""创建服务权益记录"""
|
||
# 1. 检查基础物料
|
||
base_id = data.get('base_id')
|
||
base = MaterialBase.query.get(base_id)
|
||
if not base:
|
||
raise ValueError('基础物料不存在')
|
||
|
||
if not base.is_enabled:
|
||
raise ValueError(f"物料【{base.name}】已停用,无法创建新的服务权益。")
|
||
|
||
# 2. 生成SKU
|
||
sku = cls._generate_sku()
|
||
|
||
# 3. 创建对象 (不包含库存数量字段)
|
||
service = StockService(
|
||
base_id=data['base_id'],
|
||
sku=sku,
|
||
sale_price=data.get('sale_price', 0),
|
||
provider_name=data.get('provider_name', ''),
|
||
description=data.get('description', ''),
|
||
|
||
# 可选字段映射
|
||
service_category=data.get('service_category', ''),
|
||
contract_id=data.get('contract_id', ''),
|
||
contact_person=data.get('contact_person', ''),
|
||
valid_period=data.get('valid_period', ''),
|
||
cost_price=data.get('cost_price', 0)
|
||
)
|
||
|
||
db.session.add(service)
|
||
db.session.commit()
|
||
return service
|
||
|
||
@classmethod
|
||
def get_service(cls, service_id):
|
||
"""获取单个详情"""
|
||
service = StockService.query.filter_by(id=service_id, is_deleted=False).first()
|
||
if not service:
|
||
raise ValueError('服务权益记录不存在')
|
||
return service
|
||
|
||
@classmethod
|
||
def update_service(cls, service_id, data):
|
||
"""更新服务权益"""
|
||
service = cls.get_service(service_id)
|
||
|
||
# 允许更新的字段
|
||
if 'sale_price' in data:
|
||
service.sale_price = data['sale_price']
|
||
if 'provider_name' in data:
|
||
service.provider_name = data['provider_name']
|
||
if 'description' in data:
|
||
service.description = data.get('description', '')
|
||
if 'cost_price' in data:
|
||
service.cost_price = data.get('cost_price', 0)
|
||
if 'contract_id' in data:
|
||
service.contract_id = data.get('contract_id', '')
|
||
if 'contact_person' in data:
|
||
service.contact_person = data.get('contact_person', '')
|
||
if 'valid_period' in data:
|
||
service.valid_period = data.get('valid_period', '')
|
||
|
||
service.updated_at = datetime.now()
|
||
db.session.commit()
|
||
return service
|
||
|
||
@classmethod
|
||
def delete_service(cls, service_id):
|
||
"""软删除"""
|
||
service = cls.get_service(service_id)
|
||
service.is_deleted = True
|
||
service.updated_at = datetime.now()
|
||
db.session.commit()
|
||
return True
|
||
|
||
@classmethod
|
||
def get_service_list(cls, page=1, per_page=20, keyword=None,
|
||
start_date=None, end_date=None, provider_name=None):
|
||
"""分页查询列表"""
|
||
try:
|
||
query = StockService.query.filter_by(is_deleted=False)
|
||
|
||
# 关键词联表搜索
|
||
if keyword:
|
||
query = query.join(StockService.base).filter(
|
||
db.or_(
|
||
StockService.sku.ilike(f'%{keyword}%'),
|
||
MaterialBase.name.ilike(f'%{keyword}%'),
|
||
MaterialBase.spec_model.ilike(f'%{keyword}%')
|
||
)
|
||
)
|
||
|
||
# 日期过滤
|
||
if start_date:
|
||
try:
|
||
start = datetime.strptime(start_date, '%Y-%m-%d')
|
||
query = query.filter(StockService.created_at >= start)
|
||
except ValueError:
|
||
pass
|
||
|
||
if end_date:
|
||
try:
|
||
end = datetime.strptime(end_date, '%Y-%m-%d')
|
||
# 包含当天结束
|
||
end = end + timedelta(days=1) - timedelta(seconds=1)
|
||
query = query.filter(StockService.created_at <= end)
|
||
except ValueError:
|
||
pass
|
||
|
||
# 服务商过滤
|
||
if provider_name:
|
||
query = query.filter(StockService.provider_name.ilike(f'%{provider_name}%'))
|
||
|
||
total = query.count()
|
||
|
||
items = query.order_by(StockService.created_at.desc()) \
|
||
.offset((page - 1) * per_page) \
|
||
.limit(per_page).all()
|
||
|
||
return {
|
||
'items': [item.to_dict() for item in items],
|
||
'total': total,
|
||
'page': page,
|
||
'per_page': per_page
|
||
}
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
raise e
|
||
|
||
@classmethod
|
||
def get_history_providers(cls, base_id):
|
||
"""获取历史供应商"""
|
||
try:
|
||
query = db.session.query(StockService.provider_name).filter(
|
||
StockService.base_id == base_id,
|
||
StockService.provider_name.isnot(None),
|
||
StockService.provider_name != ''
|
||
).distinct().order_by(StockService.provider_name)
|
||
return [row[0] for row in query.all()]
|
||
except Exception:
|
||
return []
|
||
|
||
@classmethod
|
||
def search_system_users(cls, keyword):
|
||
"""搜索系统用户(占位)"""
|
||
return []
|
||
|
||
@classmethod
|
||
def get_filter_options(cls):
|
||
"""获取筛选下拉选项"""
|
||
try:
|
||
categories = db.session.query(MaterialBase.category) \
|
||
.filter(MaterialBase.category != None, MaterialBase.category != '') \
|
||
.distinct().all()
|
||
types = db.session.query(MaterialBase.material_type) \
|
||
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
|
||
.distinct().all()
|
||
return {
|
||
"categories": [r[0] for r in categories],
|
||
"types": [r[0] for r in types]
|
||
}
|
||
except Exception:
|
||
return {"categories": [], "types": []} |