Files
KCGL/inventory-backend/app/services/inbound/repair_service.py

216 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# inventory-backend/app/services/inbound/repair_service.py
from app.extensions import db
from app.models.transaction import TransRepair
from app.models.base import MaterialBase
from datetime import datetime, timezone
import random
import string
class RepairInboundService:
@staticmethod
def _generate_repair_no():
"""
生成唯一的维修单号
格式: REP-YYYYMMDD-XXXX (X为随机大写字母或数字)
防重复策略:
1. 先尝试生成4位随机序列
2. 检查数据库中是否存在该单号
3. 如果冲突重试最多10次
4. 如果10次都冲突加入时间戳毫秒数确保唯一
"""
for _ in range(10):
date_str = datetime.now().strftime('%Y%m%d')
random_str = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
repair_no = f"REP-{date_str}-{random_str}"
# 检查是否已存在
exists = TransRepair.query.filter_by(repair_no=repair_no).first()
if not exists:
return repair_no
# 兜底策略:使用时间戳毫秒
timestamp = int(datetime.now().timestamp() * 1000) % 100000
date_str = datetime.now().strftime('%Y%m%d')
return f"REP-{date_str}-{timestamp:05d}"
@staticmethod
def get_list(params):
"""
获取维修单列表
支持按 repair_no, sku, material_name, serial_number, repair_status 模糊查询
实现分页
"""
page = params.get('page', 1)
page_size = params.get('page_size', 20)
query = TransRepair.query
# 模糊查询条件
if params.get('repair_no'):
query = query.filter(TransRepair.repair_no.ilike(f"%{params['repair_no']}%"))
if params.get('sku'):
query = query.filter(TransRepair.sku.ilike(f"%{params['sku']}%"))
if params.get('serial_number'):
query = query.filter(TransRepair.serial_number.ilike(f"%{params['serial_number']}%"))
if params.get('repair_status'):
query = query.filter(TransRepair.repair_status == params['repair_status'])
# 关联 MaterialBase 查询物料名称
if params.get('material_name'):
query = query.join(MaterialBase, TransRepair.base_id == MaterialBase.id).filter(
MaterialBase.name.ilike(f"%{params['material_name']}%")
)
# 按创建时间倒序
query = query.order_by(db.desc(TransRepair.id))
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)
items = []
for item in pagination.items:
item_dict = item.to_dict()
# 如果有 base_id尝试获取物料名称
if item.base_id:
base = MaterialBase.query.get(item.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
items.append(item_dict)
return {
'list': items,
'total': pagination.total,
'page': page,
'page_size': page_size
}
@staticmethod
def create(data):
"""
新增维修单
核心要求:
1. 生成以 REP- 打头的自增维修单号
2. 支持自动获取传入的 base_id 获取基础物料名称
"""
# 生成维修单号
repair_no = RepairInboundService._generate_repair_no()
# 获取物料信息
material_name = None
company_name = None
sku = data.get('sku')
if data.get('base_id'):
base = MaterialBase.query.get(data['base_id'])
if base:
material_name = base.name
company_name = base.company_name
if not sku:
sku = base.code
repair = TransRepair(
repair_no=repair_no,
base_id=data.get('base_id'),
sku=sku or data.get('sku'),
serial_number=data.get('serial_number'),
arrival_date=data.get('arrival_date'),
repair_status=data.get('repair_status', '待检测'),
fault_description=data.get('fault_description'),
expected_repair_time=data.get('expected_repair_time'),
repair_result=data.get('repair_result'),
repair_manager=data.get('repair_manager'),
shipping_date=data.get('shipping_date'),
related_contract_id=data.get('related_contract_id'),
cost_price=data.get('cost_price'),
sale_price=data.get('sale_price'),
company_id=data.get('company_id'),
source_table=data.get('source_table'),
stock_id=data.get('stock_id'),
is_self_made=data.get('is_self_made', False),
related_product_id=data.get('related_product_id'),
)
db.session.add(repair)
db.session.commit()
result = repair.to_dict()
result['material_name'] = material_name
result['company_name'] = company_name
return result
@staticmethod
def update(id, data):
"""
更新维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return None
# 可更新字段
updatable_fields = [
'base_id', 'sku', 'serial_number', 'arrival_date', 'repair_status',
'fault_description', 'expected_repair_time', 'repair_result',
'repair_manager', 'shipping_date', 'related_contract_id',
'cost_price', 'sale_price', 'company_id'
]
for field in updatable_fields:
if field in data:
setattr(repair, field, data[field])
db.session.commit()
return repair.to_dict()
@staticmethod
def update_status(id, status, repair_log=None):
"""
专门用于更新维修状态和追加维修日志
"""
repair = TransRepair.query.get(id)
if not repair:
return None
repair.repair_status = status
# 追加维修日志
if repair_log:
if repair.repair_result:
repair.repair_result = repair.repair_result + '\n' + f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
else:
repair.repair_result = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
db.session.commit()
return repair.to_dict()
@staticmethod
def delete(id):
"""
删除维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return False
db.session.delete(repair)
db.session.commit()
return True
@staticmethod
def get_by_id(id):
"""
根据ID获取维修单详情
"""
repair = TransRepair.query.get(id)
if not repair:
return None
item_dict = repair.to_dict()
if repair.base_id:
base = MaterialBase.query.get(repair.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
return item_dict