对于出库选单和Bom库的逻辑完善以及功能美化

This commit is contained in:
dxc
2026-02-12 16:54:26 +08:00
parent d61668bc4b
commit b682d4b02f
5 changed files with 402 additions and 477 deletions

View File

@ -4,7 +4,6 @@ from app.models.base import MaterialBase
from app.models.bom import BomTable
from app.extensions import db
from flask_jwt_extended import jwt_required
from sqlalchemy import distinct
bom_bp = Blueprint('bom', __name__)
@ -14,10 +13,13 @@ bom_bp = Blueprint('bom', __name__)
@bom_bp.route('/list', methods=['GET'])
@jwt_required()
def get_bom_list():
"""获取所有 BOM 配方列表(按 bom_no 分组),支持 keyword 搜索"""
"""获取所有 BOM 配方列表,支持 keyword 搜索和 active_only 过滤"""
try:
keyword = request.args.get('keyword', '').strip()
data = BomService.get_bom_list(keyword=keyword)
# 将字符串 'true' 转为布尔值
active_only = request.args.get('active_only', 'false').lower() == 'true'
data = BomService.get_bom_list(keyword=keyword, active_only=active_only)
return jsonify({
'code': 200,
'msg': 'success',
@ -31,9 +33,13 @@ def get_bom_list():
@bom_bp.route('/detail/<bom_no>', methods=['GET'])
@jwt_required()
def get_bom_detail(bom_no):
"""根据 BOM 编号获取配方详情"""
"""
根据 BOM 编号获取配方详情
Query参数: ?version=V1.0 (如果不传则取最新)
"""
try:
data = BomService.get_bom_detail(bom_no)
version = request.args.get('version')
data = BomService.get_bom_detail(bom_no, version=version)
if not data:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
return jsonify({
@ -49,14 +55,14 @@ def get_bom_detail(bom_no):
@bom_bp.route('/save', methods=['POST'])
@jwt_required()
def save_bom():
"""保存或更新 BOM 配方(支持自定义 bom_no"""
"""保存或更新 BOM 配方(支持自定义 bom_no 和 多版本"""
try:
req_data = request.get_json()
# 必需字段校验
if 'parent_id' not in req_data or 'children' not in req_data:
return jsonify({'code': 400, 'msg': '缺少 parent_id 或 children 字段'}), 400
# 校验 bom_no 不能为空(如果前端要求必须填)
# 校验 bom_no 不能为空
if 'bom_no' in req_data and not req_data['bom_no']:
return jsonify({'code': 400, 'msg': 'BOM编号不能为空'}), 400
@ -91,20 +97,28 @@ def get_bom_with_stock_by_no(bom_no):
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# ==================== 删除BOM接口根据bom_no删除整个配方 ====================
# ==================== 删除BOM接口 ====================
@bom_bp.route('/<bom_no>', methods=['DELETE'])
@jwt_required()
def delete_bom(bom_no):
"""根据 BOM 编号删除整个配方(包括所有子件记录)"""
"""
根据 BOM 编号删除
Query参数: ?version=V1.0 (如果不传,删除该编号下所有版本)
"""
try:
# 先检查是否存在
exist = BomTable.query.filter_by(bom_no=bom_no).first()
version = request.args.get('version')
query = BomTable.query.filter_by(bom_no=bom_no)
if version:
query = query.filter_by(version=version)
exist = query.first()
if not exist:
return jsonify({'code': 404, 'msg': 'BOM 不存在'}), 404
# 删除该 bom_no 下所有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 删除
query.delete()
db.session.commit()
return jsonify({
'code': 200,
@ -115,7 +129,7 @@ def delete_bom(bom_no):
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# ==================== 兼容旧接口(保留不改动现有前端) ====================
# ==================== 兼容旧接口 ====================
@bom_bp.route('/<int:parent_id>', methods=['GET'])
@jwt_required()

View File

@ -1,21 +1,28 @@
from app.extensions import db
class BomTable(db.Model):
__tablename__ = 'bom_table'
id = db.Column(db.Integer, primary_key=True)
parent_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
child_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
bom_no = db.Column(db.String(100), nullable=False, comment='BOM编号')
version = db.Column(db.String(50), nullable=False, default='v1', comment='版本')
version = db.Column(db.String(50), nullable=False, default='V1.0', comment='版本')
dosage = db.Column(db.Numeric(19, 4), comment='个数')
loss_rate = db.Column(db.Numeric(5, 2), comment='损耗率%(已废弃)', default=0, nullable=True)
loss_rate = db.Column(db.Numeric(5, 2), comment='损耗率%', default=0, nullable=True)
remark = db.Column(db.Text, comment='备注')
# ★ 新增:启用状态
is_enabled = db.Column(db.Boolean, default=True, comment='是否启用')
# 约束: 保证同一版本下的父子对唯一,允许不同版本存在
__table_args__ = (
db.UniqueConstraint('bom_no', 'parent_id', 'child_id', name='uniq_bom_no_parent_child'),
db.UniqueConstraint('bom_no', 'version', 'parent_id', 'child_id', name='uniq_bom_pair_in_version'),
)
# relationships
parent = db.relationship('MaterialBase', foreign_keys=[parent_id], backref='bom_parents')
child = db.relationship('MaterialBase', foreign_keys=[child_id], backref='bom_children')
child = db.relationship('MaterialBase', foreign_keys=[child_id], backref='bom_children')

View File

@ -2,7 +2,7 @@ from app.extensions import db
from app.models.bom import BomTable
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from sqlalchemy import func, distinct, or_
from sqlalchemy import func, distinct, or_, case
import uuid
from datetime import datetime
@ -18,90 +18,83 @@ class BomService:
return f'BOM-{timestamp}-{unique}'
@staticmethod
def get_bom_list(keyword=None):
def get_bom_list(keyword=None, active_only=False):
"""
获取所有 BOM 配方(按 bom_no 分组)
获取所有 BOM 配方(按 bom_no + version 分组)
支持模糊搜索BOM编号、父件名称/规格、子件名称/规格
"""
# 1. 如果有搜索关键词,先筛选出符合条件的 bom_no
filtered_bom_nos = None
# 1. 关键词过滤:先找出符合条件的 (bom_no, version) 组
query_base = db.session.query(
BomTable.bom_no,
BomTable.version
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
)
# ★ 过滤禁用状态
if active_only:
query_base = query_base.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
# 条件A: 匹配 BOM编号 或 父件信息
# 需要 join 父件表
q1 = db.session.query(BomTable.bom_no).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
# 关联子件表以支持子件搜索
child_alias = db.aliased(MaterialBase)
query_base = query_base.outerjoin(
child_alias, BomTable.child_id == child_alias.id
).filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
MaterialBase.spec_model.ilike(kw),
child_alias.name.ilike(kw),
child_alias.spec_model.ilike(kw)
)
)
# 条件B: 匹配 子件信息
# 需要 join 子件表
q2 = db.session.query(BomTable.bom_no).join(
MaterialBase, BomTable.child_id == MaterialBase.id
# 获取符合条件的唯一组合
target_pairs = query_base.distinct().all()
if not target_pairs:
return []
# 2. 聚合查询详情
results = []
for bom_no, version in target_pairs:
summary = db.session.query(
BomTable.parent_id,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec'),
BomTable.is_enabled,
func.count(BomTable.child_id).label('child_count')
).join(
MaterialBase, BomTable.parent_id == MaterialBase.id
).filter(
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
BomTable.bom_no == bom_no,
BomTable.version == version
).group_by(
BomTable.parent_id, MaterialBase.name, MaterialBase.spec_model, BomTable.is_enabled
).first()
# 取并集 (Union)
filtered_bom_nos = q1.union(q2).distinct().all()
filtered_bom_nos = [row[0] for row in filtered_bom_nos]
if summary:
results.append({
'bom_no': bom_no,
'version': version,
'parent_id': summary.parent_id,
'parent_name': summary.parent_name,
'parent_spec': summary.parent_spec or '',
'is_enabled': summary.is_enabled,
'child_count': summary.child_count
})
# 如果搜不到任何结果,直接返回空
if not filtered_bom_nos:
return []
# 2. 原有的分组聚合查询
subq_query = db.session.query(
BomTable.bom_no,
BomTable.parent_id,
BomTable.version,
func.count(BomTable.child_id).label('child_count')
)
# 应用筛选
if filtered_bom_nos is not None:
subq_query = subq_query.filter(BomTable.bom_no.in_(filtered_bom_nos))
subq = subq_query.group_by(BomTable.bom_no, BomTable.parent_id, BomTable.version).subquery()
query = db.session.query(
subq.c.bom_no,
subq.c.parent_id,
subq.c.version,
subq.c.child_count,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec')
).join(MaterialBase, subq.c.parent_id == MaterialBase.id)
# 按 bom_no 倒序排列
query = query.order_by(subq.c.bom_no.desc())
results = query.all()
return [{
'bom_no': row.bom_no,
'parent_id': row.parent_id,
'parent_name': row.parent_name,
'parent_spec': row.parent_spec or '',
'version': row.version,
'child_count': row.child_count
} for row in results]
results.sort(key=lambda x: (x['bom_no'], x['version']), reverse=True)
return results
@staticmethod
def get_bom_detail(bom_no):
def get_bom_detail(bom_no, version=None):
"""
根据 bom_no 获取配方详情
返回包含父件信息和子件列表的对象
根据 bom_no (和 version) 获取配方详情
"""
rows = db.session.query(
query = db.session.query(
BomTable,
MaterialBase.name.label('child_name'),
MaterialBase.spec_model.label('child_spec')
@ -109,21 +102,24 @@ class BomService:
MaterialBase, BomTable.child_id == MaterialBase.id
).filter(
BomTable.bom_no == bom_no
).all()
)
if version:
query = query.filter(BomTable.version == version)
else:
latest_ver = db.session.query(BomTable.version).filter_by(bom_no=bom_no) \
.order_by(BomTable.version.desc()).limit(1).scalar()
if not latest_ver:
return None
query = query.filter(BomTable.version == latest_ver)
rows = query.all()
if not rows:
return None
first = rows[0]
parent_id = first.BomTable.parent_id
# 获取父件的名称和规格
parent_material = MaterialBase.query.filter(MaterialBase.id == parent_id).first()
if parent_material:
parent_name = parent_material.name
parent_spec = parent_material.spec_model or ''
else:
parent_name = ''
parent_spec = ''
parent_material = MaterialBase.query.get(parent_id)
children = []
for bom, child_name, child_spec in rows:
@ -137,45 +133,33 @@ class BomService:
return {
'bom_no': bom_no,
'parent_id': parent_id,
'parent_name': parent_name,
'parent_spec': parent_spec,
'version': first.BomTable.version,
'parent_id': parent_id,
'parent_name': parent_material.name if parent_material else '',
'parent_spec': parent_material.spec_model if parent_material else '',
'is_enabled': first.BomTable.is_enabled,
'children': children
}
@staticmethod
def save_bom(data):
"""
保存或更新一个 BOM 配方
data 结构:
{
"bom_no": "用户输入或自动生成",
"version": "版本号默认v1",
"parent_id": 父件ID,
"children": [...]
}
"""
"""保存 BOM (支持多版本)"""
bom_no = data.get('bom_no')
version = data.get('version', 'v1')
version = data.get('version', 'V1.0')
parent_id = data['parent_id']
children = data['children']
is_enabled = data.get('is_enabled', True)
if not bom_no:
raise ValueError('BOM编号不能为空')
# 校验父件不能与子件相同
for child in children:
if child['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no则生成一个新的 (兼容旧逻辑,但现在前端会传值)
if not bom_no or not bom_no.strip():
# 如果前端没传,抛出异常要求用户填写,或者自动生成
# 这里选择自动生成作为兜底,但推荐前端校验必填
bom_no = BomService.generate_bom_no()
# 仅删除当前版本的旧记录
BomTable.query.filter_by(bom_no=bom_no, version=version).delete()
# 删除该 bom_no 下所有现有记录 (全量更新模式)
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新记录
for child in children:
bom = BomTable(
bom_no=bom_no,
@ -183,7 +167,8 @@ class BomService:
parent_id=parent_id,
child_id=child['child_id'],
dosage=child.get('dosage', 0),
remark=child.get('remark', '')
remark=child.get('remark', ''),
is_enabled=is_enabled
)
db.session.add(bom)
@ -193,81 +178,69 @@ class BomService:
@staticmethod
def get_bom_with_stock_by_bom_no(bom_no):
"""
根据 bom_no 获取配方详情,并计算每个子件的库存和最大可生产数量
根据 bom_no 获取配方详情,并计算
1. 总可用库存
2. 最大可生产套数
3. ★ 聚合库位信息 (warehouse_locations)
"""
detail = BomService.get_bom_detail(bom_no)
if not detail:
return None
for child in detail['children']:
# 查询该子件在 StockBuy 中的可用库存总量
# 1. 查询该子件的总库存
stock_qty = db.session.query(
func.coalesce(func.sum(StockBuy.available_quantity), 0)
).filter(
StockBuy.base_id == child['child_id']
).scalar() or 0
# 2. ★ 查询该子件涉及的所有库位,并去重拼接 (PostgreSQL 使用 string_agg)
# 注意:这里假设主要是 stock_buy 表,如果是成品或半成品也需要做类似 Union 查询
# 为简化,这里演示只查 stock_buy 的库位
locations = db.session.query(
# 去除空值和重复值
func.string_agg(distinct(StockBuy.warehouse_location), ', ')
).filter(
StockBuy.base_id == child['child_id'],
StockBuy.available_quantity > 0, # 只看有货的库位
StockBuy.warehouse_location != None,
StockBuy.warehouse_location != ''
).scalar()
child['current_stock'] = float(stock_qty)
child['warehouse_location'] = locations or '' # 返回给前端
dosage = child['dosage']
child['max_producible'] = int(stock_qty // dosage) if dosage > 0 else 0
return detail
# ====================== 兼容旧接口(基于 parent_id ======================
# ====================== 兼容旧接口 ======================
@staticmethod
def get_bom_no_by_parent(parent_id):
"""
根据父件 ID 获取其最新的 BOM 编号(用于兼容旧接口)
"""
row = BomTable.query.filter_by(parent_id=parent_id) \
.order_by(BomTable.version.desc()).first()
row = BomTable.query.filter_by(parent_id=parent_id).order_by(BomTable.version.desc()).first()
return row.bom_no if row else None
@staticmethod
def create_or_update_bom(parent_id, child_list, bom_no=None, version='v1'):
"""
兼容旧接口的保存方法(保留原有调用方式)
如果提供了 bom_no则更新该 bom_no否则为父件创建新 BOM。
"""
# 校验父件不能与子件相同
for item in child_list:
if item['child_id'] == parent_id:
raise ValueError('父件与子件不能是同一物料')
# 如果未提供 bom_no尝试查找现有的否则新建
def create_or_update_bom(parent_id, child_list, bom_no=None, version='V1.0'):
if not bom_no:
existing = BomTable.query.filter_by(parent_id=parent_id).first()
bom_no = existing.bom_no if existing else BomService.generate_bom_no()
# 删除该 bom_no 下所有记录
BomTable.query.filter_by(bom_no=bom_no).delete()
# 插入新的
BomTable.query.filter_by(bom_no=bom_no, version=version).delete()
for item in child_list:
bom = BomTable(
bom_no=bom_no,
version=version,
parent_id=parent_id,
child_id=item['child_id'],
dosage=item.get('dosage', 0),
remark=item.get('remark', '')
bom_no=bom_no, version=version, parent_id=parent_id,
child_id=item['child_id'], dosage=item.get('dosage', 0), remark=item.get('remark', '')
)
db.session.add(bom)
db.session.commit()
return True
@staticmethod
def get_bom_with_stock(parent_id):
"""
兼容旧接口:根据父件 ID 获取 BOM 及库存信息
(实际会找到对应的 bom_no 再调用新方法)
"""
bom_no = BomService.get_bom_no_by_parent(parent_id)
if not bom_no:
return []
if not bom_no: return []
detail = BomService.get_bom_with_stock_by_bom_no(bom_no)
if not detail:
return []
return detail['children']
return detail['children'] if detail else []