fix: sort warehouse tree by name, fix tree batch delete cascade, and implement safe history location autofill

This commit is contained in:
DXC
2026-04-08 17:32:00 +08:00
parent c58a0a6d14
commit 4a4baa2f8f
11 changed files with 228 additions and 9 deletions

View File

@ -390,3 +390,21 @@ def get_location_suggestions():
return jsonify({"code": 400, "msg": "base_id required"}), 400
data = BuyInboundService.get_history_locations(base_id)
return jsonify({"code": 200, "msg": "success", "data": data})
# ------------------------------------------------------------------
# 11. 获取最近一次入库的库位(跨表查询)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/last-location', methods=['GET'])
@permission_required('inbound_buy')
def get_last_location():
"""
获取指定物料最近一次入库的库位
查询顺序:采购入库 -> 成品入库 -> 半成品入库,返回最新入库的库位
"""
base_id = request.args.get('base_id', type=int)
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
location = BuyInboundService.get_last_location_by_base_id(base_id)
return jsonify({"code": 200, "msg": "success", "data": {"location": location}})

View File

@ -245,3 +245,21 @@ def calculate_bom_cost():
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 获取最近一次入库的库位(跨表查询)
# ------------------------------------------------------------------
@inbound_product_bp.route('/last-location', methods=['GET'])
@permission_required('inbound_product')
def get_last_location():
"""
获取指定物料最近一次入库的库位
查询顺序:成品入库 -> 采购入库 -> 半成品入库,返回最新入库的库位
"""
base_id = request.args.get('base_id', type=int)
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
location = ProductInboundService.get_last_location_by_base_id(base_id)
return jsonify({"code": 200, "msg": "success", "data": {"location": location}})

View File

@ -240,3 +240,21 @@ def calculate_bom_cost():
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 获取最近一次入库的库位(跨表查询)
# ------------------------------------------------------------------
@inbound_semi_bp.route('/last-location', methods=['GET'])
@permission_required('inbound_semi')
def get_last_location():
"""
获取指定物料最近一次入库的库位
查询顺序:半成品入库 -> 采购入库 -> 成品入库,返回最新入库的库位
"""
base_id = request.args.get('base_id', type=int)
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
location = SemiInboundService.get_last_location_by_base_id(base_id)
return jsonify({"code": 200, "msg": "success", "data": {"location": location}})

View File

@ -18,11 +18,15 @@ def build_tree(nodes, parent_id=None):
children = build_tree(nodes, node.id)
node_dict = node.to_dict()
if children:
node_dict['children'] = children
# 子节点按 name 升序排序
children_sorted = sorted(children, key=lambda x: x.get('name', ''))
node_dict['children'] = children_sorted
else:
node_dict['children'] = []
tree.append(node_dict)
return tree
# 当前层级按 name 升序排序
tree_sorted = sorted(tree, key=lambda x: x.get('name', ''))
return tree_sorted
@warehouse_bp.route('/tree', methods=['GET'])
@ -31,9 +35,9 @@ def get_tree():
获取库位树形结构
"""
try:
# 查询所有库位
all_locations = SysWarehouseLocation.query.order_by(SysWarehouseLocation.level, SysWarehouseLocation.id).all()
# 查询所有库位,按 name 升序排序
all_locations = SysWarehouseLocation.query.order_by(SysWarehouseLocation.name.asc()).all()
# 构建树形结构
tree_data = build_tree(all_locations, parent_id=None)

View File

@ -1,6 +1,7 @@
# inventory-backend/app/services/inbound/buy_service.py
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
@ -525,3 +526,42 @@ class BuyInboundService:
def get_history_locations(base_id):
return [r[0] for r in
db.session.query(StockBuy.warehouse_location).filter(StockBuy.base_id == base_id).distinct().all()]
@staticmethod
def get_last_location_by_base_id(base_id):
"""
获取指定物料最近一次入库的库位(跨表查询)
查询顺序:采购入库 -> 成品入库 -> 半成品入库,返回最新入库的库位
"""
from app.models.inbound.semi import StockSemi
# 1. 查询采购入库最新记录
last_buy = StockBuy.query.filter(
StockBuy.base_id == base_id
).order_by(StockBuy.in_date.desc()).first()
# 2. 查询成品入库最新记录
last_product = StockProduct.query.filter(
StockProduct.base_id == base_id
).order_by(StockProduct.in_date.desc()).first()
# 3. 查询半成品入库最新记录
last_semi = StockSemi.query.filter(
StockSemi.base_id == base_id
).order_by(StockSemi.in_date.desc()).first()
# 比较三个表中的最新入库时间,返回最新的库位
candidates = []
if last_buy and last_buy.warehouse_location:
candidates.append((last_buy.in_date, last_buy.warehouse_location))
if last_product and last_product.warehouse_location:
candidates.append((last_product.in_date, last_product.warehouse_location))
if last_semi and last_semi.warehouse_location:
candidates.append((last_semi.in_date, last_semi.warehouse_location))
if not candidates:
return ""
# 按时间倒序排序,返回最新的库位
candidates.sort(key=lambda x: x[0] if x[0] else datetime.min, reverse=True)
return candidates[0][1] if candidates[0][1] else ""

View File

@ -1,6 +1,8 @@
# app/services/inbound/product_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
@ -576,3 +578,42 @@ class ProductInboundService:
except Exception as e:
traceback.print_exc()
raise e
@staticmethod
def get_last_location_by_base_id(base_id):
"""
获取指定物料最近一次入库的库位(跨表查询)
查询顺序:成品入库 -> 采购入库 -> 半成品入库,返回最新入库的库位
"""
from app.models.inbound.product import StockProduct
# 1. 查询成品入库最新记录
last_product = StockProduct.query.filter(
StockProduct.base_id == base_id
).order_by(StockProduct.in_date.desc()).first()
# 2. 查询采购入库最新记录
last_buy = StockBuy.query.filter(
StockBuy.base_id == base_id
).order_by(StockBuy.in_date.desc()).first()
# 3. 查询半成品入库最新记录
last_semi = StockSemi.query.filter(
StockSemi.base_id == base_id
).order_by(StockSemi.in_date.desc()).first()
# 比较三个表中的最新入库时间,返回最新的库位
candidates = []
if last_product and last_product.warehouse_location:
candidates.append((last_product.in_date, last_product.warehouse_location))
if last_buy and last_buy.warehouse_location:
candidates.append((last_buy.in_date, last_buy.warehouse_location))
if last_semi and last_semi.warehouse_location:
candidates.append((last_semi.in_date, last_semi.warehouse_location))
if not candidates:
return ""
# 按时间倒序排序,返回最新的库位
candidates.sort(key=lambda x: x[0] if x[0] else datetime.min, reverse=True)
return candidates[0][1] if candidates[0][1] else ""

View File

@ -1,6 +1,8 @@
# app/services/inbound/semi_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.product import StockProduct
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
@ -653,3 +655,42 @@ class SemiInboundService:
except Exception as e:
traceback.print_exc()
raise e
@staticmethod
def get_last_location_by_base_id(base_id):
"""
获取指定物料最近一次入库的库位(跨表查询)
查询顺序:半成品入库 -> 采购入库 -> 成品入库,返回最新入库的库位
"""
from app.models.inbound.semi import StockSemi
# 1. 查询半成品入库最新记录
last_semi = StockSemi.query.filter(
StockSemi.base_id == base_id
).order_by(StockSemi.in_date.desc()).first()
# 2. 查询采购入库最新记录
last_buy = StockBuy.query.filter(
StockBuy.base_id == base_id
).order_by(StockBuy.in_date.desc()).first()
# 3. 查询成品入库最新记录
last_product = StockProduct.query.filter(
StockProduct.base_id == base_id
).order_by(StockProduct.in_date.desc()).first()
# 比较三个表中的最新入库时间,返回最新的库位
candidates = []
if last_semi and last_semi.warehouse_location:
candidates.append((last_semi.in_date, last_semi.warehouse_location))
if last_buy and last_buy.warehouse_location:
candidates.append((last_buy.in_date, last_buy.warehouse_location))
if last_product and last_product.warehouse_location:
candidates.append((last_product.in_date, last_product.warehouse_location))
if not candidates:
return ""
# 按时间倒序排序,返回最新的库位
candidates.sort(key=lambda x: x[0] if x[0] else datetime.min, reverse=True)
return candidates[0][1] if candidates[0][1] else ""