fix: correct targeted search logic for material/stock list to prevent unrelated results

This commit is contained in:
DXC
2026-03-19 09:49:21 +08:00
parent de887136a3
commit ebb7969807
23 changed files with 3723 additions and 0 deletions

View File

@ -0,0 +1,287 @@
# inventory-backend/app/api/v1/scrap.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from app.utils.decorators import permission_required, audit_log
from app.services.auth_service import AuthService
from app.extensions import db
from app.models.transaction import TransScrap
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
import traceback
import math
scrap_bp = Blueprint('scrap', __name__, url_prefix='/scrap')
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
from flask_jwt_extended import get_jwt
from app.services.auth_service import AuthService
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
if user_role.upper() == 'SUPER_ADMIN':
return ['scrap_list:*']
perm_dict = AuthService.get_user_permissions(user_role)
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
# --------------------------------------------------------
# 1. 扫码查询库存接口 (关联三个库存表)
# GET /api/v1/scrap/scan?barcode=...
# --------------------------------------------------------
@scrap_bp.route('/scan', methods=['GET'])
@jwt_required()
@permission_required('scrap_selection')
def scan_barcode():
barcode = request.args.get('barcode')
if not barcode:
return jsonify({'code': 400, 'msg': '请提供条码'}), 400
try:
result = ScrapService.get_stock_by_barcode(barcode)
if result:
return jsonify({'code': 200, 'msg': '扫描成功', 'data': result})
else:
return jsonify({'code': 404, 'msg': '未找到对应的库存记录'}), 404
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': f'扫描查询出错: {str(e)}'}), 500
# --------------------------------------------------------
# 2. 提交报废单接口
# POST /api/v1/scrap
# --------------------------------------------------------
@scrap_bp.route('', methods=['POST'])
@jwt_required()
@audit_log(
module='报废管理',
action='报废出库',
get_target_name_fn=lambda: request.get_json().get('items')[0].get('sku') if request.get_json() and request.get_json().get('items') else None
)
def create_scrap():
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return jsonify({'code': 403, 'msg': '未授权'}), 403
# 超级管理员直接放行
if user_role.upper() != 'SUPER_ADMIN':
perm_dict = AuthService.get_user_permissions(user_role)
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
if 'scrap_create:operation' not in perms:
return jsonify({'code': 403, 'msg': '权限不足'}), 403
data = request.get_json()
if not data:
return jsonify({'code': 400, 'msg': '无有效数据'}), 400
current_user_name = get_jwt_identity() or 'Unknown'
# items 必填
if 'items' not in data or not data['items']:
return jsonify({'code': 400, 'msg': '报废商品列表不能为空'}), 400
try:
result = ScrapService.process_scrap(data, operator_name=current_user_name)
return jsonify({'code': 200, 'msg': '报废成功', 'data': result})
except Exception as e:
traceback.print_exc()
db.session.rollback()
return jsonify({'code': 400, 'msg': str(e)}), 400
# --------------------------------------------------------
# 3. 报废记录查询接口
# GET /api/v1/scrap/records
# --------------------------------------------------------
@scrap_bp.route('/records', methods=['GET'])
@jwt_required()
@permission_required('scrap_list')
def get_scrap_records():
page = request.args.get('page', 1, type=int)
page_size = request.args.get('pageSize', 50, type=int)
sku = request.args.get('sku', '')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
try:
result = ScrapService.query_records(
page=page,
page_size=page_size,
sku=sku,
start_date=start_date,
end_date=end_date
)
return jsonify({'code': 200, 'msg': 'success', 'data': result})
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': str(e)}), 500
# ============================================================
# Service 层:报废核心逻辑
# ============================================================
class ScrapService:
@staticmethod
def get_stock_by_barcode(barcode):
"""根据条码查找库存"""
if not barcode:
return None
clean_code = barcode.strip()
def get_price(item, table_type):
if table_type == 'stock_product':
return float(item.sale_price) if item.sale_price else 0
elif table_type == 'stock_buy':
return float(item.pre_tax_unit_price) if item.pre_tax_unit_price else 0
return 0
# 1. 查询成品
prod = StockProduct.query.filter(
db.or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code)
).first()
if prod:
res = ScrapService._format_stock(prod, 'stock_product')
res['price'] = get_price(prod, 'stock_product')
return res
# 2. 查询半成品
semi = StockSemi.query.filter(
db.or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code)
).first()
if semi:
res = ScrapService._format_stock(semi, 'stock_semi')
res['price'] = 0
return res
# 3. 查询原材料
buy = StockBuy.query.filter(
db.or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code)
).first()
if buy:
res = ScrapService._format_stock(buy, 'stock_buy')
res['price'] = get_price(buy, 'stock_buy')
return res
return None
@staticmethod
def _format_stock(item, table_type):
"""格式化库存查询结果"""
stock_qty = float(item.stock_quantity) if item.stock_quantity else 0
avail_qty = float(item.available_quantity) if item.available_quantity else 0
return {
'id': item.id,
'sku': item.sku,
'barcode': item.barcode,
'name': item.material_base.name if item.material_base else '',
'spec': item.material_base.spec_model if item.material_base else '',
'category': item.material_base.category if item.material_base else '',
'material_type': item.material_base.material_type if item.material_base else '',
'warehouse_loc': item.warehouse_loc or '',
'stock_quantity': stock_qty,
'available_quantity': avail_qty,
'source_table': table_type,
}
@staticmethod
def process_scrap(data, operator_name='System'):
"""处理报废:扣减库存并记录报废单"""
items = data.get('items', [])
reason = data.get('reason', '')
if not reason:
raise ValueError('请填写报废原因')
created_records = []
for item in items:
stock_id = item.get('id')
source_table = item.get('source_table')
scrap_qty = float(item.get('quantity', 0))
if not stock_id or not source_table or scrap_qty <= 0:
continue
# 获取库存记录
stock_record = None
if source_table == 'stock_product':
stock_record = StockProduct.query.get(stock_id)
elif source_table == 'stock_semi':
stock_record = StockSemi.query.get(stock_id)
elif source_table == 'stock_buy':
stock_record = StockBuy.query.get(stock_id)
if not stock_record:
raise ValueError(f'库存记录不存在: ID={stock_id}')
# 检查可用数量
avail_qty = float(stock_record.available_quantity) if stock_record.available_quantity else 0
if avail_qty < scrap_qty:
raise ValueError(f"SKU {stock_record.sku} 可用库存不足,当前可用: {avail_qty}")
# 计算损失金额
unit_price = 0.0
if source_table == 'stock_product':
unit_price = float(stock_record.sale_price) if stock_record.sale_price else 0
elif source_table == 'stock_buy':
unit_price = float(stock_record.pre_tax_unit_price) if stock_record.pre_tax_unit_price else 0
total_loss = round(unit_price * scrap_qty, 2)
# 扣减库存
stock_record.stock_quantity = float(stock_record.stock_quantity) - scrap_qty
stock_record.available_quantity = float(stock_record.available_quantity) - scrap_qty
# 创建报废记录
scrap_record = TransScrap(
sku=stock_record.sku,
source_table=source_table,
stock_id=stock_id,
quantity=scrap_qty,
reason=reason,
operator_name=operator_name,
approval_status='approved',
cost_at_scrap=unit_price,
total_loss=total_loss
)
db.session.add(scrap_record)
created_records.append(scrap_record)
db.session.commit()
return {'count': len(created_records)}
@staticmethod
def query_records(page=1, page_size=50, sku='', start_date='', end_date=''):
"""分页查询报废记录"""
query = TransScrap.query
if sku:
query = query.filter(TransScrap.sku.like(f'%{sku}%'))
if start_date:
query = query.filter(TransScrap.operation_time >= start_date)
if end_date:
query = query.filter(TransScrap.operation_time <= end_date + ' 23:59:59')
# 按时间倒序
query = query.order_by(TransScrap.operation_time.desc())
total = query.count()
records = query.offset((page - 1) * page_size).limit(page_size).all()
return {
'list': [r.to_dict() for r in records],
'total': total,
'page': page,
'pageSize': page_size
}

View File

@ -0,0 +1,10 @@
{
"label_printer": {
"ip": "172.16.0.119",
"port": 9100
},
"network_printer": {
"ip": "192.168.9.250",
"port": 9100
}
}

View File

@ -0,0 +1,50 @@
import os
def find_bad_imports(root_dir):
# 我们要查找的错误引用字符串
targets = [
"app.models.material",
"from app.models.material"
]
print(f"🚀 开始扫描目录: {os.path.abspath(root_dir)}")
print(f"🔍 正在寻找残留代码: {targets} ...\n")
found_count = 0
# 排除的目录(避免扫描虚拟环境或缓存)
exclude_dirs = {'.git', '__pycache__', 'venv', '.idea', '.vscode'}
for root, dirs, files in os.walk(root_dir):
# 修改 dirs 列表以跳过排除的目录
dirs[:] = [d for d in dirs if d not in exclude_dirs]
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
for target in targets:
if target in line:
print(f"❌ 发现错误文件: {file_path}")
print(f" ➡️ 第 {i} 行: {line.strip()}")
found_count += 1
except Exception as e:
# 忽略无法读取的文件(如二进制文件)
pass
print("-" * 50)
if found_count == 0:
print("✅ 恭喜!未发现任何 'app.models.material' 的残留引用。")
print("💡 如果依然报错,请尝试完全重启 Docker 或清理 __pycache__。")
else:
print(f"⚠️ 共发现 {found_count} 处错误引用,请照上方路径逐一修改。")
print("👉 修改方法: 将 'app.models.material' 改为 'app.models.base'")
if __name__ == "__main__":
# 扫描当前目录
find_bad_imports(".")

View File

@ -0,0 +1,9 @@
from app import create_app, db
# 1. 创建应用实例
app = create_app()
# 2. 在应用上下文中创建表
with app.app_context():
db.create_all()
print("✅ 数据库表结构已成功创建!")

View File

@ -0,0 +1,139 @@
import socket
import os
from io import BytesIO
from PIL import Image, ImageDraw
# 引入条形码生成库
try:
import barcode
from barcode.writer import ImageWriter
except ImportError:
print("❌ 错误: 请先安装 python-barcode (pip install python-barcode)")
exit()
class BarcodeStackTester:
# 打印机配置
PRINTER_IP = "192.168.9.205"
PRINTER_PORT = 9100
# 纸张配置: 40mm x 30mm @ 300 DPI
DOTS_PER_MM = 12
CANVAS_W = int(40 * DOTS_PER_MM) # 480px
CANVAS_H = int(30 * DOTS_PER_MM) # 360px
@staticmethod
def _generate_raw_barcode(content, bar_height_mm=4):
"""
生成原始条码图片 (不强制缩放宽度,让其自然生长)
"""
try:
# Code128 自动优化 (数字会被压缩所以20位数字其实不长)
writer = ImageWriter()
# module_width: 条码黑条的最小宽度(mm)。
# 0.2mm 在 300DPI 下约等于 2.4像素。这是一个比较清晰且节省空间的数值。
# 如果设得太大(如0.3)20位可能会超出40mm纸张。
options = {
"module_width": 0.2,
"module_height": bar_height_mm, # 条码高度
"quiet_zone": 2.0, # 两侧留白(mm)
"write_text": False, # 【关键】不写文字
"dpi": 300 # 对应打印机DPI
}
code_img = barcode.get('code128', content, writer=writer)
# 渲染到内存
buffer = BytesIO()
code_img.write(buffer, options=options)
buffer.seek(0)
return Image.open(buffer)
except Exception as e:
print(f"生成失败: {e}")
return None
def run_test(self):
# 1. 创建白色底图 (40x30mm)
canvas = Image.new('RGB', (self.CANVAS_W, self.CANVAS_H), color='white')
# 测试用例: 长度从小到大
test_lengths = [13, 15, 17, 20]
# 布局参数
current_y = 20 # 顶部起始留白 (px)
gap = 10 # 间距 (px)
print("-" * 50)
print("🚀 开始生成堆叠条码测试图...")
for length in test_lengths:
# 生成全7的内容
content = "7" * length
print(f" 正在处理: {length}位 -> {content}")
# 生成条码 (高度设为5mm左右方便在一张纸上放下4个)
bar_img = self._generate_raw_barcode(content, bar_height_mm=5)
if bar_img:
# 计算居中位置
# bar_img.width 是条码生成的自然宽度
x_pos = (self.CANVAS_W - bar_img.width) // 2
# 如果宽度超出了纸张 (x_pos < 0)说明40mm纸打不下这么多位
if x_pos < 0:
print(f" ⚠️ 警告: {length}位条码过宽 ({bar_img.width}px > {self.CANVAS_W}px),可能会被裁切!")
x_pos = 0
# 粘贴到画布
canvas.paste(bar_img, (x_pos, current_y))
# 更新Y坐标准备画下一个
current_y += bar_img.height + gap
else:
print(" 生成失败")
# 2. 保存预览图
preview_file = "test_stack_preview.jpg"
canvas.save(preview_file)
print(f"✅ 图片已生成: {preview_file} (请打开查看宽度是否超出)")
# 3. 发送打印 (二值化处理)
self._send_to_printer(canvas)
def _send_to_printer(self, img_rgb):
try:
# 转二值化 (白=1, 黑=0)
img_bw = img_rgb.convert('L').convert('1', dither=Image.Dither.NONE)
bitmap_data = img_bw.tobytes()
width_bytes = (img_bw.width + 7) // 8
height_dots = img_bw.height
# TSPL 指令
header = (
f"SIZE 40 mm, 30 mm\r\n"
"GAP 2 mm, 0 mm\r\n"
"CLS\r\n"
"DIRECTION 1\r\n"
).encode('gbk')
bitmap_cmd = f"BITMAP 0,0,{width_bytes},{height_dots},0,".encode('gbk')
footer = b"\r\nPRINT 1,1\r\n"
print("🖨️ 正在发送打印指令...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
s.connect((self.PRINTER_IP, self.PRINTER_PORT))
s.sendall(header + bitmap_cmd + bitmap_data + footer)
s.close()
print("✅ 指令已发送")
except Exception as e:
print(f"❌ 打印失败: {e}")
if __name__ == "__main__":
tester = BarcodeStackTester()
tester.run_test()

View File

@ -0,0 +1,56 @@
import socket
# ================= 配置区域 =================
PRINTER_IP = '192.168.9.89'
PRINTER_PORT = 9100
def print_barcode_no_text():
try:
# 1. 建立连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
client.connect((PRINTER_IP, PRINTER_PORT))
print(f"✅ 已连接 {PRINTER_IP}")
# 2. 准备 TSPL 指令
commands = (
"SIZE 40 mm, 30 mm\r\n"
"GAP 2 mm, 0 mm\r\n"
"CLS\r\n"
"DIRECTION 1\r\n"
# 保持之前的居中设置 (向右偏移 3mm)
"REFERENCE 25, 0\r\n"
# --- 条形码 (不显示数字) ---
# 关键修改第5个参数从 1 改为了 0
# BARCODE x,y,"128",height,human_readable(0=不显),rot,narrow,wide,"content"
"BARCODE 20,10,\"128\",80,0,0,2,2,\"00000002\"\r\n"
# --- 文本内容 (保持字号 3) ---
# 第一行 Y=110
"TEXT 0,110,\"3\",0,1,1,\"N:test01\"\r\n"
"TEXT 150,110,\"3\",0,1,1,\"S:test01\"\r\n"
# 第二行 Y=140
"TEXT 0,140,\"3\",0,1,1,\"T:test01\"\r\n"
"TEXT 150,140,\"3\",0,1,1,\"K:test01\"\r\n"
# 第三行 Y=170
"TEXT 0,170,\"3\",0,1,1,\"L:test01\"\r\n"
"TEXT 150,170,\"3\",0,1,1,\"B:test01\"\r\n"
"PRINT 1,1\r\n"
)
client.sendall(commands.encode('gbk'))
client.close()
print("✅ 条码数字已隐藏,指令发送成功")
except Exception as e:
print(f"❌ 失败: {e}")
if __name__ == "__main__":
print_barcode_no_text()