feat: implement BOM versioning with bom_no and new management APIs

Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
This commit is contained in:
dxc
2026-02-12 09:37:27 +08:00
parent e900326571
commit 6d5d8a6aad
2 changed files with 264 additions and 40 deletions

View File

@ -8,6 +8,81 @@ from sqlalchemy import distinct
bom_bp = Blueprint('bom', __name__) bom_bp = Blueprint('bom', __name__)
# ==================== 新版 BOM 接口(基于 bom_no ====================
@bom_bp.route('/list', methods=['GET'])
@jwt_required()
def get_bom_list():
"""获取所有 BOM 配方列表(按 bom_no 分组)"""
try:
data = BomService.get_bom_list()
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM列表失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/detail/<bom_no>', methods=['GET'])
@jwt_required()
def get_bom_detail(bom_no):
"""根据 BOM 编号获取配方详情"""
try:
data = BomService.get_bom_detail(bom_no)
if not data:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM详情失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/save', methods=['POST'])
@jwt_required()
def save_bom():
"""保存或更新 BOM 配方(支持新建和另存为新版本)"""
try:
req_data = request.get_json()
# 必需字段校验
if 'parent_id' not in req_data or 'children' not in req_data:
return jsonify({'code': 400, 'msg': '缺少 parent_id 或 children 字段'}), 400
bom_no = BomService.save_bom(req_data)
return jsonify({
'code': 200,
'msg': '保存成功',
'data': {'bom_no': bom_no}
})
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
current_app.logger.error(f'保存BOM失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@bom_bp.route('/stock/<bom_no>', methods=['GET'])
@jwt_required()
def get_bom_with_stock_by_no(bom_no):
"""根据 BOM 编号获取配方详情及库存信息"""
try:
data = BomService.get_bom_with_stock_by_bom_no(bom_no)
if not data:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
except Exception as e:
current_app.logger.error(f'获取BOM库存信息失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# ==================== 兼容旧接口(保留不改动现有前端) ====================
@bom_bp.route('/<int:parent_id>', methods=['GET']) @bom_bp.route('/<int:parent_id>', methods=['GET'])
@jwt_required() @jwt_required()
def get_bom(parent_id): def get_bom(parent_id):
@ -24,7 +99,7 @@ def get_bom(parent_id):
@bom_bp.route('', methods=['POST']) @bom_bp.route('', methods=['POST'])
@jwt_required() @jwt_required()
def save_bom(): def save_bom_legacy():
try: try:
req_data = request.get_json() req_data = request.get_json()
parent_id = req_data.get('parent_id') parent_id = req_data.get('parent_id')
@ -61,7 +136,7 @@ def get_material_base_list():
@bom_bp.route('/parents', methods=['GET']) @bom_bp.route('/parents', methods=['GET'])
@jwt_required() @jwt_required()
def get_bom_parents(): def get_bom_parents():
"""获取所有已定义BOM的父件物料列表""" """获取所有已定义BOM的父件物料列表(兼容旧版)"""
try: try:
subq = db.session.query(BomTable.parent_id).distinct().subquery() subq = db.session.query(BomTable.parent_id).distinct().subquery()
parents = MaterialBase.query.join(subq, MaterialBase.id == subq.c.parent_id).all() parents = MaterialBase.query.join(subq, MaterialBase.id == subq.c.parent_id).all()

View File

@ -2,66 +2,215 @@ from app.extensions import db
from app.models.bom import BomTable from app.models.bom import BomTable
from app.models.base import MaterialBase from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy from app.models.inbound.buy import StockBuy
from sqlalchemy import func from sqlalchemy import func, distinct
import uuid
from datetime import datetime
class BomService: class BomService:
# ====================== 新版 BOM 逻辑(基于 bom_no ======================
@staticmethod @staticmethod
def create_or_update_bom(parent_id, child_list): def generate_bom_no():
"""生成唯一的 BOM 编号"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique = str(uuid.uuid4())[:8]
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list():
""" """
保存/更新父件的BOM子件关系 获取所有 BOM 配方(按 bom_no 分组)
child_list: [{"child_id": int, "dosage": float, "remark": str}, ...] 返回:每个 BOM 的编号、父件信息、版本、子件数量
"""
subq = db.session.query(
BomTable.bom_no,
BomTable.parent_id,
BomTable.version,
func.count(BomTable.child_id).label('child_count')
).group_by(BomTable.bom_no, BomTable.parent_id, BomTable.version).subquery()
query = db.session.query(
subq.c.bom_no,
subq.c.parent_id,
subq.c.version,
subq.c.child_count,
MaterialBase.name.label('parent_name')
).join(MaterialBase, subq.c.parent_id == MaterialBase.id)
results = query.all()
return [{
'bom_no': row.bom_no,
'parent_id': row.parent_id,
'parent_name': row.parent_name,
'version': row.version,
'child_count': row.child_count
} for row in results]
@staticmethod
def get_bom_detail(bom_no):
"""
根据 bom_no 获取配方详情
返回包含父件信息和子件列表的对象
"""
rows = db.session.query(
BomTable,
MaterialBase.name.label('child_name')
).join(
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
).all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
parent_name = db.session.query(MaterialBase.name)\
.filter(MaterialBase.id == parent_id).scalar() or ''
children = []
for bom, child_name in rows:
children.append({
'child_id': bom.child_id,
'child_name': child_name,
'dosage': float(bom.dosage) if bom.dosage else 0.0,
'remark': bom.remark or ''
})
return {
'bom_no': bom_no,
'parent_id': parent_id,
'parent_name': parent_name,
'version': first.BomTable.version,
'children': children
}
@staticmethod
def save_bom(data):
"""
保存或更新一个 BOM 配方
data 结构:
{
"bom_no": "可选,为空则新建",
"version": "版本号默认v1",
"parent_id": 父件ID,
"children": [
{"child_id": 1, "dosage": 2.5, "remark": ""},
...
]
}
"""
bom_no = data.get('bom_no')
version = data.get('version', 'v1')
parent_id = data['parent_id']
children = data['children']
# 校验父件不能与子件相同
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no则生成一个新的
if not bom_no:
bom_no = BomService.generate_bom_no()
else:
# 删除该 bom_no 下所有现有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新记录
for child in children:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return bom_no
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算每个子件的库存和最大可生产数量
"""
detail = BomService.get_bom_detail(bom_no)
if not detail:
return None
for child in detail['children']:
# 查询该子件在 StockBuy 中的可用库存总量
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == child['child_id']
).scalar() or 0
child['current_stock'] = float(stock_qty)
dosage = child['dosage']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== 兼容旧接口(基于 parent_id ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
"""
根据父件 ID 获取其最新的 BOM 编号(用于兼容旧接口)
"""
row = BomTable.query.filter_by(parent_id=parent_id)\
.order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='v1'):
"""
兼容旧接口的保存方法(保留原有调用方式)
如果提供了 bom_no则更新该 bom_no否则为父件创建新 BOM。
""" """
# 校验父件不能与子件相同 # 校验父件不能与子件相同
for item in child_list: for item in child_list:
if item['child_id'] == parent_id: if item['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料') raise ValueError('父件与子件不能是同一物料')
# 删除该父件原有的BOM记录
BomTable.query.filter_by(parent_id=parent_id).delete() # 如果未提供 bom_no尝试查找现有的否则新建
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
# 删除该 bom_no 下所有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新的 # 插入新的
for item in child_list: for item in child_list:
bom = BomTable( bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id, parent_id=parent_id,
child_id=item['child_id'], child_id=item['child_id'],
dosage=item.get('dosage', 0), dosage=item.get('dosage', 0),
remark=item.get('remark', '') remark=item.get('remark', '')
) )
db.session.add(bom) db.session.add(bom)
db.session.commit() db.session.commit()
return True return True
@staticmethod @staticmethod
def get_bom_with_stock(parent_id): def get_bom_with_stock(parent_id):
""" """
查询父件的BOM结构及库存信息 兼容旧接口:根据父件 ID 获取 BOM 及库存信息
(实际会找到对应的 bom_no 再调用新方法)
""" """
bom_items = db.session.query( bom_no = BomService.get_bom_no_by_parent(parent_id)
BomTable, if not bom_no:
MaterialBase.name.label('child_name') return []
).join( detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
MaterialBase, BomTable.child_id == MaterialBase.id if not detail:
).filter( return []
BomTable.parent_id == parent_id return detail['children']
).all()
result = []
for bom, child_name in bom_items:
# 查询该子件在 StockBuy 中的可用库存总量
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == bom.child_id
).scalar() or 0
# 计算最大可生产数量
dosage = float(bom.dosage) if bom.dosage else 0
max_producible = int(stock_qty // dosage) if dosage > 0 else 0
result.append({
'child_id': bom.child_id,
'child_name': child_name,
'dosage': dosage,
'current_stock': float(stock_qty),
'max_producible': max_producible,
'remark': bom.remark or ''
})
return result