Files
KCGL/inventory-backend/app/services/inbound/repair_service.py

284 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# inventory-backend/app/services/inbound/repair_service.py
from app.extensions import db
from app.models.transaction import TransRepair
from app.models.base import MaterialBase
from datetime import datetime, timezone
from sqlalchemy import text
class RepairInboundService:
@staticmethod
def _generate_repair_no():
"""
生成唯一的维修单号
格式: REP-YYYYMMDD-0001 (按天递增)
策略: 查询当天最大的repair_no提取流水号+1
"""
today = datetime.now().strftime('%Y%m%d')
prefix = f"REP-{today}-"
# 查询当天最大的维修单号
latest = TransRepair.query.filter(
TransRepair.repair_no.like(f"{prefix}%")
).order_by(TransRepair.repair_no.desc()).first()
if latest and latest.repair_no:
try:
# 提取最后的流水号
last_seq = int(latest.repair_no.split('-')[-1])
new_seq = last_seq + 1
except (ValueError, IndexError):
new_seq = 1
else:
new_seq = 1
return f"{prefix}{new_seq:04d}"
@staticmethod
def _generate_sku():
"""
获取全局自增序列号生成10位SKU
格式: str(seq).zfill(10)
"""
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
return str(next_global_id).zfill(10) if next_global_id else None
except:
return None
@staticmethod
def get_list(params):
"""
获取维修单列表
支持按 repair_no, sku, material_name, serial_number, repair_status 模糊查询
实现分页
"""
page = params.get('page', 1)
page_size = params.get('page_size', 20)
query = TransRepair.query
# 模糊查询条件
if params.get('repair_no'):
query = query.filter(TransRepair.repair_no.ilike(f"%{params['repair_no']}%"))
if params.get('sku'):
query = query.filter(TransRepair.sku.ilike(f"%{params['sku']}%"))
if params.get('serial_number'):
query = query.filter(TransRepair.serial_number.ilike(f"%{params['serial_number']}%"))
if params.get('repair_status'):
status_value = params['repair_status']
if status_value == '未出库':
# 未出库:排除已出库状态
query = query.filter(TransRepair.repair_status != '已出库')
elif status_value not in ['全部', '']:
# 其他明确状态:精确匹配
query = query.filter(TransRepair.repair_status == status_value)
# '全部' 或为空:不过滤状态
# 关联 MaterialBase 查询物料名称 或 直接搜索 TransRepair.material_name
if params.get('material_name'):
material_name_filter = params['material_name']
# 优先搜索直接存储的 material_name其次搜索关联的 base.name
query = query.outerjoin(MaterialBase, TransRepair.base_id == MaterialBase.id).filter(
db.or_(
TransRepair.material_name.ilike(f"%{material_name_filter}%"),
MaterialBase.name.ilike(f"%{material_name_filter}%")
)
)
# 按接收时间升序(先进先出)+ id 升序
query = query.order_by(db.asc(TransRepair.arrival_date), db.asc(TransRepair.id))
# ============================================================
# 【全局特权】基于 JWT 与 global:cross_company_op 的跨组织隔离
# ============================================================
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
# 获取用户权限列表(用于检查 global:cross_company_op 特权)
from app.api.v1.inbound.base import get_current_user_permissions
user_perms = get_current_user_permissions() or []
normalized_perms = set(p.lower().replace('_', '').replace(':', '') for p in user_perms)
# 检查是否拥有全局特权或超管角色
has_cross_company = 'globalcrosscompanyop' in normalized_perms
# 维修表需要通过 base_id 关联 MaterialBase 进行公司过滤
if user_role != 'SUPER_ADMIN' and not has_cross_company:
# 无特权:强制绑定本公司
query = query.outerjoin(MaterialBase, TransRepair.base_id == MaterialBase.id)
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)
items = []
for item in pagination.items:
item_dict = item.to_dict()
# 如果有 base_id尝试获取物料名称
if item.base_id:
base = MaterialBase.query.get(item.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
items.append(item_dict)
return {
'list': items,
'total': pagination.total,
'page': page,
'page_size': page_size
}
@staticmethod
def create(data):
"""
新增维修单
核心要求:
1. 生成以 REP- 打头的自增维修单号 (按天递增)
2. 从全局序列获取10位SKU (global_print_seq)
3. 支持不关联 base_id (独立录入模式)
4. 新增客户名称和客户所在地字段
"""
# 生成维修单号
repair_no = RepairInboundService._generate_repair_no()
# 获取全局SKU
sku = data.get('sku')
if not sku:
sku = RepairInboundService._generate_sku()
# 获取物料信息 (可选)
material_name = data.get('material_name')
company_name = None
if data.get('base_id'):
base = MaterialBase.query.get(data['base_id'])
if base:
material_name = base.name or material_name
company_name = base.company_name
if not sku:
sku = base.code
repair = TransRepair(
repair_no=repair_no,
base_id=data.get('base_id'),
sku=sku,
material_name=material_name,
serial_number=data.get('serial_number'),
arrival_date=data.get('arrival_date'),
repair_status=data.get('repair_status', '待检测'),
fault_description=data.get('fault_description'),
expected_repair_time=data.get('expected_repair_time'),
repair_result=data.get('repair_result'),
repair_manager=data.get('repair_manager'),
shipping_date=data.get('shipping_date'),
related_contract_id=data.get('related_contract_id'),
# 新增客户字段
customer_name=data.get('customer_name'),
customer_location=data.get('customer_location'),
cost_price=data.get('cost_price'),
sale_price=data.get('sale_price'),
company_id=data.get('company_id'),
source_table=data.get('source_table'),
stock_id=data.get('stock_id'),
is_self_made=data.get('is_self_made', False),
related_product_id=data.get('related_product_id'),
)
db.session.add(repair)
db.session.commit()
result = repair.to_dict()
result['material_name'] = material_name
result['company_name'] = company_name
return result
@staticmethod
def update(id, data):
"""
更新维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return None
# 可更新字段
updatable_fields = [
'base_id', 'sku', 'material_name', 'serial_number', 'arrival_date', 'repair_status',
'fault_description', 'expected_repair_time', 'repair_result',
'repair_manager', 'shipping_date', 'related_contract_id',
'customer_name', 'customer_location', 'cost_price', 'sale_price', 'company_id'
]
for field in updatable_fields:
if field in data:
setattr(repair, field, data[field])
db.session.commit()
return repair.to_dict()
@staticmethod
def update_status(id, status, repair_log=None):
"""
专门用于更新维修状态和追加维修日志
"""
# 禁止手动变更为已出库状态,必须通过扫码出库模块进行
if status == '已出库':
raise ValueError("禁止手动变更为已出库状态,请通过扫码出库模块进行操作")
# 禁止手动变更为报废转出状态,必须通过扫码报废模块进行
if status == '报废转出':
raise ValueError("禁止手动变更为报废状态,请前往报废管理进行扫码操作")
repair = TransRepair.query.get(id)
if not repair:
return None
repair.repair_status = status
# 追加维修日志
if repair_log:
if repair.repair_result:
repair.repair_result = repair.repair_result + '\n' + f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
else:
repair.repair_result = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
db.session.commit()
return repair.to_dict()
@staticmethod
def delete(id):
"""
删除维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return False
db.session.delete(repair)
db.session.commit()
return True
@staticmethod
def get_by_id(id):
"""
根据ID获取维修单详情
"""
repair = TransRepair.query.get(id)
if not repair:
return None
item_dict = repair.to_dict()
if repair.base_id:
base = MaterialBase.query.get(repair.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
return item_dict