feat(repair): implement backend CRUD services and API routes with RBAC permissions for repair module

This commit is contained in:
DXC
2026-04-08 18:34:48 +08:00
parent ec468b266d
commit 41b5118ecd
3 changed files with 355 additions and 0 deletions

View File

@ -10,6 +10,7 @@ from .base import inbound_base_bp
from .product import inbound_product_bp
from .inbound_summary import bp as inbound_summary_bp
from .stock import bp as inbound_stock_bp
from .repair import inbound_repair_bp
# 导入 service 模块,使其路由装饰器可以正常注册到 inbound_bp 上
from . import service
@ -21,5 +22,6 @@ inbound_bp.register_blueprint(inbound_base_bp, url_prefix='/base')
inbound_bp.register_blueprint(inbound_product_bp, url_prefix='/product')
inbound_bp.register_blueprint(inbound_summary_bp, url_prefix='/summary')
inbound_bp.register_blueprint(inbound_stock_bp, url_prefix='/stock')
inbound_bp.register_blueprint(inbound_repair_bp, url_prefix='/repair')
# service 模块的路由已经直接附加到 inbound_bp无需再注册子蓝图

View File

@ -0,0 +1,137 @@
# inventory-backend/app/api/v1/inbound/repair.py
from flask import Blueprint, request, jsonify
from app.services.inbound.repair_service import RepairInboundService
from app.utils.decorators import permission_required, audit_log
import traceback
inbound_repair_bp = Blueprint('inbound_repair', __name__)
# ------------------------------------------------------------------
# 1. 获取维修单列表
# ------------------------------------------------------------------
@inbound_repair_bp.route('/list', methods=['GET'])
@permission_required('inbound_repair:list')
def get_list():
try:
params = {
'page': request.args.get('page', 1, type=int),
'page_size': request.args.get('page_size', 20, type=int),
'repair_no': request.args.get('repair_no'),
'sku': request.args.get('sku'),
'material_name': request.args.get('material_name'),
'serial_number': request.args.get('serial_number'),
'repair_status': request.args.get('repair_status'),
}
result = RepairInboundService.get_list(params)
return jsonify({'code': 200, 'msg': 'success', 'data': result})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500
# ------------------------------------------------------------------
# 2. 新增维修单
# ------------------------------------------------------------------
@inbound_repair_bp.route('/submit', methods=['POST'])
@permission_required('inbound_repair:add')
@audit_log(
module='维修管理',
action='新增',
get_target_name_fn=lambda: request.get_json().get('repair_no') if request.get_json() else None
)
def create():
try:
data = request.get_json()
result = RepairInboundService.create(data)
return jsonify({'code': 200, 'msg': 'success', 'data': result})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500
# ------------------------------------------------------------------
# 3. 更新维修单
# ------------------------------------------------------------------
@inbound_repair_bp.route('/<int:id>', methods=['PUT'])
@permission_required('inbound_repair:edit')
@audit_log(
module='维修管理',
action='更新',
get_target_name_fn=lambda: f"维修单ID:{request.view_args.get('id')}"
)
def update(id):
try:
data = request.get_json()
result = RepairInboundService.update(id, data)
if not result:
return jsonify({'code': 404, 'msg': '维修单不存在'}), 404
return jsonify({'code': 200, 'msg': 'success', 'data': result})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500
# ------------------------------------------------------------------
# 4. 更新维修状态
# ------------------------------------------------------------------
@inbound_repair_bp.route('/update-status', methods=['POST'])
@permission_required('inbound_repair:edit')
@audit_log(
module='维修管理',
action='更新状态',
get_target_name_fn=lambda: f"维修单ID:{request.get_json().get('id')}"
)
def update_status():
try:
data = request.get_json()
id = data.get('id')
status = data.get('status')
repair_log = data.get('repair_log')
if not id or not status:
return jsonify({'code': 400, 'msg': 'id 和 status 不能为空'}), 400
result = RepairInboundService.update_status(id, status, repair_log)
if not result:
return jsonify({'code': 404, 'msg': '维修单不存在'}), 404
return jsonify({'code': 200, 'msg': 'success', 'data': result})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500
# ------------------------------------------------------------------
# 5. 删除维修单
# ------------------------------------------------------------------
@inbound_repair_bp.route('/<int:id>', methods=['DELETE'])
@permission_required('inbound_repair:delete')
@audit_log(
module='维修管理',
action='删除',
get_target_name_fn=lambda: f"维修单ID:{request.view_args.get('id')}"
)
def delete(id):
try:
success = RepairInboundService.delete(id)
if not success:
return jsonify({'code': 404, 'msg': '维修单不存在'}), 404
return jsonify({'code': 200, 'msg': '删除成功'})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500
# ------------------------------------------------------------------
# 6. 获取维修单详情
# ------------------------------------------------------------------
@inbound_repair_bp.route('/<int:id>', methods=['GET'])
@permission_required('inbound_repair:list')
def get_detail(id):
try:
result = RepairInboundService.get_by_id(id)
if not result:
return jsonify({'code': 404, 'msg': '维修单不存在'}), 404
return jsonify({'code': 200, 'msg': 'success', 'data': result})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500

View File

@ -0,0 +1,216 @@
# inventory-backend/app/services/inbound/repair_service.py
from app.extensions import db
from app.models.transaction import TransRepair
from app.models.base import MaterialBase
from datetime import datetime, timezone
import random
import string
class RepairInboundService:
@staticmethod
def _generate_repair_no():
"""
生成唯一的维修单号
格式: REP-YYYYMMDD-XXXX (X为随机大写字母或数字)
防重复策略:
1. 先尝试生成4位随机序列
2. 检查数据库中是否存在该单号
3. 如果冲突重试最多10次
4. 如果10次都冲突加入时间戳毫秒数确保唯一
"""
for _ in range(10):
date_str = datetime.now().strftime('%Y%m%d')
random_str = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
repair_no = f"REP-{date_str}-{random_str}"
# 检查是否已存在
exists = TransRepair.query.filter_by(repair_no=repair_no).first()
if not exists:
return repair_no
# 兜底策略:使用时间戳毫秒
timestamp = int(datetime.now().timestamp() * 1000) % 100000
date_str = datetime.now().strftime('%Y%m%d')
return f"REP-{date_str}-{timestamp:05d}"
@staticmethod
def get_list(params):
"""
获取维修单列表
支持按 repair_no, sku, material_name, serial_number, repair_status 模糊查询
实现分页
"""
page = params.get('page', 1)
page_size = params.get('page_size', 20)
query = TransRepair.query
# 模糊查询条件
if params.get('repair_no'):
query = query.filter(TransRepair.repair_no.ilike(f"%{params['repair_no']}%"))
if params.get('sku'):
query = query.filter(TransRepair.sku.ilike(f"%{params['sku']}%"))
if params.get('serial_number'):
query = query.filter(TransRepair.serial_number.ilike(f"%{params['serial_number']}%"))
if params.get('repair_status'):
query = query.filter(TransRepair.repair_status == params['repair_status'])
# 关联 MaterialBase 查询物料名称
if params.get('material_name'):
query = query.join(MaterialBase, TransRepair.base_id == MaterialBase.id).filter(
MaterialBase.name.ilike(f"%{params['material_name']}%")
)
# 按创建时间倒序
query = query.order_by(db.desc(TransRepair.id))
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)
items = []
for item in pagination.items:
item_dict = item.to_dict()
# 如果有 base_id尝试获取物料名称
if item.base_id:
base = MaterialBase.query.get(item.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
items.append(item_dict)
return {
'list': items,
'total': pagination.total,
'page': page,
'page_size': page_size
}
@staticmethod
def create(data):
"""
新增维修单
核心要求:
1. 生成以 REP- 打头的自增维修单号
2. 支持自动获取传入的 base_id 获取基础物料名称
"""
# 生成维修单号
repair_no = RepairInboundService._generate_repair_no()
# 获取物料信息
material_name = None
company_name = None
sku = data.get('sku')
if data.get('base_id'):
base = MaterialBase.query.get(data['base_id'])
if base:
material_name = base.name
company_name = base.company_name
if not sku:
sku = base.code
repair = TransRepair(
repair_no=repair_no,
base_id=data.get('base_id'),
sku=sku or data.get('sku'),
serial_number=data.get('serial_number'),
arrival_date=data.get('arrival_date'),
repair_status=data.get('repair_status', '待检测'),
fault_description=data.get('fault_description'),
expected_repair_time=data.get('expected_repair_time'),
repair_result=data.get('repair_result'),
repair_manager=data.get('repair_manager'),
shipping_date=data.get('shipping_date'),
related_contract_id=data.get('related_contract_id'),
cost_price=data.get('cost_price'),
sale_price=data.get('sale_price'),
company_id=data.get('company_id'),
source_table=data.get('source_table'),
stock_id=data.get('stock_id'),
is_self_made=data.get('is_self_made', False),
related_product_id=data.get('related_product_id'),
)
db.session.add(repair)
db.session.commit()
result = repair.to_dict()
result['material_name'] = material_name
result['company_name'] = company_name
return result
@staticmethod
def update(id, data):
"""
更新维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return None
# 可更新字段
updatable_fields = [
'base_id', 'sku', 'serial_number', 'arrival_date', 'repair_status',
'fault_description', 'expected_repair_time', 'repair_result',
'repair_manager', 'shipping_date', 'related_contract_id',
'cost_price', 'sale_price', 'company_id'
]
for field in updatable_fields:
if field in data:
setattr(repair, field, data[field])
db.session.commit()
return repair.to_dict()
@staticmethod
def update_status(id, status, repair_log=None):
"""
专门用于更新维修状态和追加维修日志
"""
repair = TransRepair.query.get(id)
if not repair:
return None
repair.repair_status = status
# 追加维修日志
if repair_log:
if repair.repair_result:
repair.repair_result = repair.repair_result + '\n' + f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
else:
repair.repair_result = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {repair_log}"
db.session.commit()
return repair.to_dict()
@staticmethod
def delete(id):
"""
删除维修单
"""
repair = TransRepair.query.get(id)
if not repair:
return False
db.session.delete(repair)
db.session.commit()
return True
@staticmethod
def get_by_id(id):
"""
根据ID获取维修单详情
"""
repair = TransRepair.query.get(id)
if not repair:
return None
item_dict = repair.to_dict()
if repair.base_id:
base = MaterialBase.query.get(repair.base_id)
if base:
item_dict['material_name'] = base.name
item_dict['company_name'] = base.company_name
return item_dict