Files
KCGL/inventory-backend/app/services/inbound/repair_service.py

245 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# inventory-backend/app/services/inbound/repair_service.py
from app.extensions import db
from app.models.transaction import TransRepair
from app.models.base import MaterialBase
from datetime import datetime, timezone
from sqlalchemy import text
class RepairInboundService:
@staticmethod
def _generate_repair_no():
"""
生成唯一的维修单号
格式: REP-YYYYMMDD-0001 (按天递增)
策略: 查询当天最大的repair_no提取流水号+1
"""
today = datetime.now().strftime('%Y%m%d')
prefix = f"REP-{today}-"
# 查询当天最大的维修单号
latest = TransRepair.query.filter(
TransRepair.repair_no.like(f"{prefix}%")
).order_by(TransRepair.repair_no.desc()).first()
if latest and latest.repair_no:
try:
# 提取最后的流水号
last_seq = int(latest.repair_no.split('-')[-1])
new_seq = last_seq + 1
except (ValueError, IndexError):
new_seq = 1
else:
new_seq = 1
return f"{prefix}{new_seq:04d}"
@staticmethod
def _generate_sku():
"""
获取全局自增序列号生成10位SKU
格式: str(seq).zfill(10)
"""
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
return str(next_global_id).zfill(10) if next_global_id else None
except:
return None
@staticmethod
def get_list(params):
"""
获取维修单列表
支持按 repair_no, sku, material_name, serial_number, repair_status 模糊查询
实现分页
"""
page = params.get('page', 1)
page_size = params.get('page_size', 20)
query = TransRepair.query
# 模糊查询条件
if params.get('repair_no'):
query = query.filter(TransRepair.repair_no.ilike(f"%{params['repair_no']}%"))
if params.get('sku'):
query = query.filter(TransRepair.sku.ilike(f"%{params['sku']}%"))
if params.get('serial_number'):
query = query.filter(TransRepair.serial_number.ilike(f"%{params['serial_number']}%"))
if params.get('repair_status'):
query = query.filter(TransRepair.repair_status == params['repair_status'])
# 关联 MaterialBase 查询物料名称 或 直接搜索 TransRepair.material_name
if params.get('material_name'):
material_name_filter = params['material_name']
# 优先搜索直接存储的 material_name其次搜索关联的 base.name
query = query.outerjoin(MaterialBase, TransRepair.base_id == MaterialBase.id).filter(
db.or_(
TransRepair.material_name.ilike(f"%{material_name_filter}%"),
MaterialBase.name.ilike(f"%{material_name_filter}%")
)
)
# 按创建时间倒序
query = query.order_by(db.desc(TransRepair.id))
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)
items = []
for item in pagination.items:
item_dict = item.to_dict()
# 如果有 base_id尝试获取物料名称
if item.base_id:
base = MaterialBase.query.get(item.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
items.append(item_dict)
return {
'list': items,
'total': pagination.total,
'page': page,
'page_size': page_size
}
@staticmethod
def create(data):
"""
新增维修单
核心要求:
1. 生成以 REP- 打头的自增维修单号 (按天递增)
2. 从全局序列获取10位SKU (global_print_seq)
3. 支持不关联 base_id (独立录入模式)
4. 新增客户名称和客户所在地字段
"""
# 生成维修单号
repair_no = RepairInboundService._generate_repair_no()
# 获取全局SKU
sku = data.get('sku')
if not sku:
sku = RepairInboundService._generate_sku()
# 获取物料信息 (可选)
material_name = data.get('material_name')
company_name = None
if data.get('base_id'):
base = MaterialBase.query.get(data['base_id'])
if base:
material_name = base.name or material_name
company_name = base.company_name
if not sku:
sku = base.code
repair = TransRepair(
repair_no=repair_no,
base_id=data.get('base_id'),
sku=sku,
material_name=material_name,
serial_number=data.get('serial_number'),
arrival_date=data.get('arrival_date'),
repair_status=data.get('repair_status', '待检测'),
fault_description=data.get('fault_description'),
expected_repair_time=data.get('expected_repair_time'),
repair_result=data.get('repair_result'),
repair_manager=data.get('repair_manager'),
shipping_date=data.get('shipping_date'),
related_contract_id=data.get('related_contract_id'),
# 新增客户字段
customer_name=data.get('customer_name'),
customer_location=data.get('customer_location'),
cost_price=data.get('cost_price'),
sale_price=data.get('sale_price'),
company_id=data.get('company_id'),
source_table=data.get('source_table'),
stock_id=data.get('stock_id'),
is_self_made=data.get('is_self_made', False),
related_product_id=data.get('related_product_id'),
)
db.session.add(repair)
db.session.commit()
result = repair.to_dict()
result['material_name'] = material_name
result['company_name'] = company_name
return result
@staticmethod
def update(id, data):
"""
更新维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return None
# 可更新字段
updatable_fields = [
'base_id', 'sku', 'material_name', 'serial_number', 'arrival_date', 'repair_status',
'fault_description', 'expected_repair_time', 'repair_result',
'repair_manager', 'shipping_date', 'related_contract_id',
'customer_name', 'customer_location', 'cost_price', 'sale_price', 'company_id'
]
for field in updatable_fields:
if field in data:
setattr(repair, field, data[field])
db.session.commit()
return repair.to_dict()
@staticmethod
def update_status(id, status, repair_log=None):
"""
专门用于更新维修状态和追加维修日志
"""
repair = TransRepair.query.get(id)
if not repair:
return None
repair.repair_status = status
# 追加维修日志
if repair_log:
if repair.repair_result:
repair.repair_result = repair.repair_result + '\n' + f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
else:
repair.repair_result = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
db.session.commit()
return repair.to_dict()
@staticmethod
def delete(id):
"""
删除维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return False
db.session.delete(repair)
db.session.commit()
return True
@staticmethod
def get_by_id(id):
"""
根据ID获取维修单详情
"""
repair = TransRepair.query.get(id)
if not repair:
return None
item_dict = repair.to_dict()
if repair.base_id:
base = MaterialBase.query.get(repair.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
return item_dict