Files
KCGL/inventory-backend/app/api/v1/warehouse.py

357 lines
11 KiB
Python

# inventory-backend/app/api/v1/warehouse.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required
from app.extensions import db
from app.models.system import SysWarehouseLocation
from app.utils.decorators import audit_log
warehouse_bp = Blueprint('warehouse', __name__, url_prefix='/api/v1/warehouse')
def build_tree(nodes, parent_id=None):
"""
将平铺的数据构建为树形结构
"""
tree = []
for node in nodes:
if node.parent_id == parent_id:
children = build_tree(nodes, node.id)
node_dict = node.to_dict()
if children:
# 子节点按 name 升序排序
children_sorted = sorted(children, key=lambda x: x.get('name', ''))
node_dict['children'] = children_sorted
else:
node_dict['children'] = []
tree.append(node_dict)
# 当前层级按 name 升序排序
tree_sorted = sorted(tree, key=lambda x: x.get('name', ''))
return tree_sorted
@warehouse_bp.route('/tree', methods=['GET'])
def get_tree():
"""
获取库位树形结构
"""
try:
# 查询所有库位,按 name 升序排序
all_locations = SysWarehouseLocation.query.order_by(SysWarehouseLocation.name.asc()).all()
# 构建树形结构
tree_data = build_tree(all_locations, parent_id=None)
return jsonify({
'code': 200,
'msg': 'success',
'data': tree_data
})
except Exception as e:
return jsonify({
'code': 500,
'msg': str(e),
'data': None
}), 500
@warehouse_bp.route('', methods=['POST'])
@jwt_required()
@audit_log(
module='库位管理',
action='新增',
get_target_name_fn=lambda: request.get_json().get('name') if request.get_json() else None
)
def create_location():
"""
创建库位
"""
try:
data = request.get_json()
name = data.get('name', '').strip()
parent_id = data.get('parent_id') # None 表示顶级
is_enabled = data.get('is_enabled', True)
if not name:
return jsonify({'code': 400, 'msg': '库位名称不能为空', 'data': None})
# 计算 level 和 full_path
if parent_id is None:
level = 0
full_path = name
parent_full_path = ''
else:
parent = SysWarehouseLocation.query.get(parent_id)
if not parent:
return jsonify({'code': 400, 'msg': '父级库位不存在', 'data': None})
level = parent.level + 1
parent_full_path = parent.full_path or ''
full_path = f"{parent_full_path}/{name}" if parent_full_path else name
location = SysWarehouseLocation(
name=name,
parent_id=parent_id,
full_path=full_path,
level=level,
is_enabled=is_enabled
)
db.session.add(location)
db.session.commit()
return jsonify({
'code': 200,
'msg': '创建成功',
'data': location.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'code': 500,
'msg': str(e),
'data': None
}), 500
@warehouse_bp.route('/<int:location_id>', methods=['PUT'])
@jwt_required()
@audit_log(
module='库位管理',
action='修改',
get_target_id_fn=lambda: request.view_args.get('location_id'),
get_target_name_fn=lambda: request.get_json().get('name') if request.get_json() else None
)
def update_location(location_id):
"""
更新库位
"""
try:
data = request.get_json()
location = SysWarehouseLocation.query.get(location_id)
if not location:
return jsonify({'code': 404, 'msg': '库位不存在', 'data': None})
# 更新名称
if 'name' in data and data['name']:
new_name = data['name'].strip()
if new_name != location.name:
# 需要更新 full_path
parent = location.parent
if parent:
location.full_path = f"{parent.full_path}/{new_name}" if parent.full_path else new_name
else:
location.full_path = new_name
location.name = new_name
# 更新启用状态
if 'is_enabled' in data:
location.is_enabled = data['is_enabled']
db.session.commit()
return jsonify({
'code': 200,
'msg': '更新成功',
'data': location.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'code': 500,
'msg': str(e),
'data': None
}), 500
@warehouse_bp.route('/<int:location_id>', methods=['DELETE'])
@jwt_required()
@audit_log(
module='库位管理',
action='删除',
get_target_id_fn=lambda: request.view_args.get('location_id')
)
def delete_location(location_id):
"""
删除库位(级联删除子库位)
"""
try:
location = SysWarehouseLocation.query.get(location_id)
if not location:
return jsonify({'code': 404, 'msg': '库位不存在', 'data': None})
# 在删除前提取属性,避免 commit 后访问已删除对象
deleted_loc_name = location.name
# 递归删除所有子库位
def delete_recursive(loc):
# 先删除所有子节点
children = SysWarehouseLocation.query.filter_by(parent_id=loc.id).all()
for child in children:
delete_recursive(child)
# 再删除自身
db.session.delete(loc)
delete_recursive(location)
db.session.commit()
return jsonify({
'code': 200,
'msg': '删除成功',
'deleted_location': deleted_loc_name
})
except Exception as e:
db.session.rollback()
return jsonify({
'code': 500,
'msg': str(e),
'data': None
}), 500
@warehouse_bp.route('/batch', methods=['DELETE'])
@jwt_required()
@audit_log(
module='库位管理',
action='批量删除'
)
def batch_delete_locations():
"""
批量删除库位
"""
try:
ids = request.get_json()
if not ids or not isinstance(ids, list):
return jsonify({'code': 400, 'msg': '请提供要删除的库位ID列表', 'data': None})
deleted_count = 0
deleted_names = []
for loc_id in ids:
location = SysWarehouseLocation.query.get(loc_id)
if not location:
continue
# 在删除前提取属性
deleted_names.append(location.name)
# 递归删除
def delete_recursive(loc):
children = SysWarehouseLocation.query.filter_by(parent_id=loc.id).all()
for child in children:
delete_recursive(child)
db.session.delete(loc)
delete_recursive(location)
deleted_count += 1
db.session.commit()
return jsonify({
'code': 200,
'msg': f'删除成功,共删除 {deleted_count} 个库位',
'data': {'deleted_count': deleted_count, 'deleted_names': deleted_names}
})
except Exception as e:
db.session.rollback()
return jsonify({
'code': 500,
'msg': str(e),
'data': None
}), 500
@warehouse_bp.route('/batch-generate', methods=['POST'])
@jwt_required()
@audit_log(
module='库位管理',
action='批量生成'
)
def batch_generate_locations():
"""
规则化批量新增库位
"""
MAX_TOTAL = 3000 # 单次最多生成数量限制
try:
data = request.get_json()
parent_id = data.get('parent_id')
rules = data.get('rules', [])
if not rules:
return jsonify({'code': 400, 'msg': '请提供生成规则', 'data': None})
# 验证规则并计算总数
total_count = 1
for rule in rules:
start = rule.get('start', 1)
end = rule.get('end', 1)
total_count *= max(0, end - start + 1)
if total_count > MAX_TOTAL:
return jsonify({'code': 400, 'msg': f'单次生成数量不能超过 {MAX_TOTAL} 个,当前计划生成 {total_count}', 'data': None})
# 初始化父级列表
if parent_id:
parent = SysWarehouseLocation.query.get(parent_id)
if not parent:
return jsonify({'code': 404, 'msg': '父级库位不存在', 'data': None})
current_parents = [parent_id]
else:
current_parents = [None]
# 逐层处理规则
generated_ids = []
for rule in rules:
prefix = rule.get('prefix', '')
start = rule.get('start', 1)
end = rule.get('end', 1)
pad = rule.get('pad', 1)
new_locations = []
for parent_id in current_parents:
# 1. 动态获取当前特定父节点的信息(严禁放循环外面共享!)
if parent_id is None:
current_level = 0
current_parent_path = ''
else:
p = SysWarehouseLocation.query.get(parent_id)
current_level = (p.level + 1) if p else 0
current_parent_path = p.full_path if p and p.full_path else ''
# 2. 生成当前父节点下的专属子节点
for num in range(start, end + 1):
name = f"{prefix}{str(num).zfill(pad)}"
# 路径由当前特定的 current_parent_path 决定
full_path = f"{current_parent_path}/{name}" if current_parent_path else name
location = SysWarehouseLocation(
name=name,
parent_id=parent_id,
full_path=full_path,
level=current_level,
is_enabled=True
)
db.session.add(location)
new_locations.append(location)
# 单层循环结束后再 flush 和获取新 ID 列表
db.session.flush()
current_parents = [loc.id for loc in new_locations]
generated_ids.extend(current_parents)
db.session.commit()
return jsonify({
'code': 200,
'msg': f'生成成功,共生成 {len(generated_ids)} 个库位',
'data': {'generated_count': len(generated_ids), 'generated_ids': generated_ids}
})
except Exception as e:
db.session.rollback()
return jsonify({
'code': 500,
'msg': str(e),
'data': None
}), 500