Files
KCGL/inventory-backend/app/api/v1/bom.py

202 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify, current_app
from app.services.bom_service import BomService
from app.models.base import MaterialBase
from app.models.bom import BomTable
from app.extensions import db
from flask_jwt_extended import jwt_required
bom_bp = Blueprint('bom', __name__)
# ==================== 新版 BOM 接口(基于 bom_no ====================
@bom_bp.route('/list', methods=['GET'])
@jwt_required()
def get_bom_list():
"""获取所有 BOM 配方列表,支持 keyword 搜索和 active_only 过滤"""
try:
keyword = request.args.get('keyword', '').strip()
# 将字符串 'true' 转为布尔值
active_only = request.args.get('active_only', 'false').lower() == 'true'
data = BomService.get_bom_list(keyword=keyword, active_only=active_only)
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM列表失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/detail/<bom_no>', methods=['GET'])
@jwt_required()
def get_bom_detail(bom_no):
"""
根据 BOM 编号获取配方详情
Query参数: ?version=V1.0 (如果不传则取最新)
"""
try:
version = request.args.get('version')
data = BomService.get_bom_detail(bom_no, version=version)
if not data:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM详情失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/save', methods=['POST'])
@jwt_required()
def save_bom():
"""保存或更新 BOM 配方(支持自定义 bom_no 和 多版本)"""
try:
req_data = request.get_json()
# 必需字段校验
if 'parent_id' not in req_data or 'children' not in req_data:
return jsonify({'code': 400, 'msg': '缺少 parent_id 或 children 字段'}), 400
# 校验 bom_no 不能为空
if 'bom_no' in req_data and not req_data['bom_no']:
return jsonify({'code': 400, 'msg': 'BOM编号不能为空'}), 400
bom_no = BomService.save_bom(req_data)
return jsonify({
'code': 200,
'msg': '保存成功',
'data': {'bom_no': bom_no}
})
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
current_app.logger.error(f'保存BOM失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/stock/<bom_no>', methods=['GET'])
@jwt_required()
def get_bom_with_stock_by_no(bom_no):
"""根据 BOM 编号获取配方详情及库存信息"""
try:
data = BomService.get_bom_with_stock_by_bom_no(bom_no)
if not data:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM库存信息失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# ==================== 删除BOM接口 ====================
@bom_bp.route('/<bom_no>', methods=['DELETE'])
@jwt_required()
def delete_bom(bom_no):
"""
根据 BOM 编号删除
Query参数: ?version=V1.0 (如果不传,删除该编号下所有版本)
"""
try:
version = request.args.get('version')
query = BomTable.query.filter_by(bom_no=bom_no)
if version:
query = query.filter_by(version=version)
exist = query.first()
if not exist:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
# 删除
query.delete()
db.session.commit()
return jsonify({
'code': 200,
'msg': '删除成功'
})
except Exception as e:
current_app.logger.error(f'删除BOM失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# ==================== 兼容旧接口 ====================
@bom_bp.route('/<int:parent_id>', methods=['GET'])
@jwt_required()
def get_bom(parent_id):
try:
data = BomService.get_bom_with_stock(parent_id)
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('', methods=['POST'])
@jwt_required()
def save_bom_legacy():
try:
req_data = request.get_json()
parent_id = req_data.get('parent_id')
child_list = req_data.get('children', [])
if not parent_id or not isinstance(child_list, list):
return jsonify({'code': 400, 'msg': '参数错误'}), 400
BomService.create_or_update_bom(parent_id, child_list)
return jsonify({
'code': 200,
'msg': '保存成功'
})
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
current_app.logger.error(f'保存BOM失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/base/list', methods=['GET'])
@jwt_required()
def get_material_base_list():
"""获取所有基础物料列表,用于前端下拉框"""
try:
materials = MaterialBase.query.filter_by(is_enabled=True).order_by(MaterialBase.id.desc()).all()
data = [item.to_dict() for item in materials]
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取基础物料列表失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/parents', methods=['GET'])
@jwt_required()
def get_bom_parents():
"""获取所有已定义BOM的父件物料列表兼容旧版"""
try:
subq = db.session.query(BomTable.parent_id).distinct().subquery()
parents = MaterialBase.query.join(subq, MaterialBase.id == subq.c.parent_id).all()
data = [item.to_dict() for item in parents]
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM父件列表失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500