feat(stocktake): implement strict blind stocktake logic with hidden system qty, editable count and status filters

This commit is contained in:
DXC
2026-03-26 17:42:51 +08:00
parent db0444cc13
commit 2a27f2e0df
3 changed files with 348 additions and 28 deletions

View File

@ -1167,3 +1167,154 @@ def generate_missing_stocktake():
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'msg': f'生成漏盘数据失败: {str(e)}'}), 500
# --------------------------------------------------------
# 获取应盘物资清单(盘点基数)
# GET /api/v1/inbound/stock/stocktake/all-items
# --------------------------------------------------------
@bp.route('/stocktake/all-items', methods=['GET'])
@permission_required('inventory_stocktake')
def get_all_stocktake_items():
"""
获取所有应盘物资清单(库存 > 0 的物料)
作为盘点基数,用于统计已盘/未盘数量
"""
try:
keyword = request.args.get('keyword', '', type=str)
all_items = []
# 1. 采购件
buy_query = StockBuy.query.filter(StockBuy.stock_quantity > 0)
if keyword:
buy_query = buy_query.join(MaterialBase, StockBuy.base_id == MaterialBase.id).filter(
db.or_(
StockBuy.sku.ilike(f'%{keyword}%'),
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
for item in buy_query.all():
all_items.append({
'id': item.id,
'sku': item.sku or '',
'barcode': item.barcode or '',
'material_name': item.base.name if item.base else '',
'spec_model': item.base.spec_model if item.base else '',
'stock_qty': float(item.stock_quantity or 0),
'available_qty': float(item.available_quantity or 0),
'source_table': 'stock_buy',
'warehouse_location': item.warehouse_location or ''
})
# 2. 半成品
if StockSemi:
semi_query = StockSemi.query.filter(StockSemi.stock_quantity > 0)
if keyword:
semi_query = semi_query.join(MaterialBase, StockSemi.base_id == MaterialBase.id).filter(
db.or_(
StockSemi.sku.ilike(f'%{keyword}%'),
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
for item in semi_query.all():
all_items.append({
'id': item.id,
'sku': item.sku or '',
'barcode': item.barcode or '',
'material_name': item.base.name if item.base else '',
'spec_model': item.base.spec_model if item.base else '',
'stock_qty': float(item.stock_quantity or 0),
'available_qty': float(item.available_quantity or 0),
'source_table': 'stock_semi',
'warehouse_location': item.warehouse_location or ''
})
# 3. 成品
if StockProduct:
product_query = StockProduct.query.filter(StockProduct.stock_quantity > 0)
if keyword:
product_query = product_query.join(MaterialBase, StockProduct.base_id == MaterialBase.id).filter(
db.or_(
StockProduct.sku.ilike(f'%{keyword}%'),
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
for item in product_query.all():
all_items.append({
'id': item.id,
'sku': item.sku or '',
'barcode': item.barcode or '',
'material_name': item.base.name if item.base else '',
'spec_model': item.base.spec_model if item.base else '',
'stock_qty': float(item.stock_quantity or 0),
'available_qty': float(item.available_quantity or 0),
'source_table': 'stock_product',
'warehouse_location': item.warehouse_location or ''
})
# 按 SKU 排序
all_items.sort(key=lambda x: (x['sku'] or '').lower())
return jsonify({
'code': 200,
'data': {
'items': all_items,
'total': len(all_items)
}
}), 200
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'msg': f'获取应盘清单失败: {str(e)}'}), 500
# --------------------------------------------------------
# 更新盘点实盘数(手动修改)
# POST /api/v1/inbound/stock/stocktake/update-quantity
# --------------------------------------------------------
@bp.route('/stocktake/update-quantity', methods=['POST'])
@permission_required('inventory_stocktake:operation')
def update_stocktake_quantity():
"""
更新盘点实盘数
用于手动修改盘点数量
"""
try:
data = request.json
stock_id = data.get('stock_id')
source_table = data.get('source_table')
quantity = float(data.get('quantity', 0))
if not stock_id or not source_table:
return jsonify({'code': 400, 'msg': '缺少必要参数'}), 400
# 查找对应的盘点记录
draft = StocktakeDraft.query.filter_by(
stock_id=stock_id,
source_table=source_table
).first()
if not draft:
return jsonify({'code': 404, 'msg': '未找到盘点记录'}), 404
# 更新数量
draft.quantity = quantity
draft.scan_time = beijing_time()
# 计算差异
draft.diff_qty = quantity - float(draft.stock_qty or 0)
db.session.commit()
return jsonify({'code': 200, 'msg': '更新成功'}), 200
except Exception as e:
db.session.rollback()
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'msg': f'更新失败: {str(e)}'}), 500