盘库操作初设计

This commit is contained in:
dxc
2026-02-06 10:16:37 +08:00
parent c1ddb8093f
commit e027ebd4a9
15 changed files with 1227 additions and 30 deletions

View File

@ -1,9 +1,15 @@
# app/services/inbound/buy_service.py
from app.extensions import db
# 引用新的模型类 StockBuy
from app.models.inbound.buy import StockBuy
from app.models.base import MaterialBase
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone # [修改] 引入 timezone
# 尝试导入出库模型,如果不存在则忽略(防止报错影响入库功能)
try:
from app.models.outbound import TransOutbound
except ImportError:
TransOutbound = None
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
import traceback
import json
@ -76,11 +82,22 @@ class BuyInboundService:
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
# [核心逻辑] 获取全局打印流水号
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except Exception:
# 如果序列不存在,回退处理(或在数据库创建序列)
print("Warning: Sequence global_print_seq not found.")
next_global_id = None
# SKU 生成逻辑:如果没有 ID用临时随机数或空通常应该依赖 next_global_id
if next_global_id:
generated_sku = str(next_global_id).zfill(10)
else:
generated_sku = datetime.now().strftime('%Y%m%d%H%M%S') # 降级方案
generated_sku = str(next_global_id).zfill(10)
final_barcode = data.get('barcode') or generated_sku
arrival_list = data.get('arrival_photo', [])
@ -151,6 +168,7 @@ class BuyInboundService:
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
# 计算差值,同步更新库存量和可用量
diff = new_qty - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = new_qty
@ -189,6 +207,8 @@ class BuyInboundService:
@staticmethod
def get_outbound_history(stock_id):
"""获取出库历史"""
if not TransOutbound:
return []
try:
records = TransOutbound.query.filter_by(
source_table='stock_buy', stock_id=stock_id
@ -228,8 +248,10 @@ class BuyInboundService:
statuses = ['在库', '借库']
if '已出库' in statuses:
# 如果明确查已出库可以包含库存为0的
query = query.filter(StockBuy.status.in_(statuses))
else:
# 默认查在库,必须保证库存 > 0
query = query.filter(
and_(
StockBuy.status.in_(statuses),

View File

@ -0,0 +1,114 @@
import socket
import datetime
class NetworkPrintService:
def __init__(self, ip='192.168.9.205', port=9100):
"""
初始化网络打印机服务
:param ip: 打印机IP默认 192.168.9.205
:param port: 端口,默认 9100
"""
self.ip = ip
self.port = port
def _send_to_printer(self, content):
"""底层发送方法"""
try:
# 建立 Socket 连接
with socket.socket(socket.socket.AF_INET, socket.socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置5秒超时
s.connect((self.ip, self.port))
# 发送内容,使用 GB18030 编码以支持中文
s.sendall(content.encode('gb18030'))
# 发送切纸指令 (ESC/POS: GS V m)
# 十六进制: 1D 56 42 00
s.sendall(b'\x1d\x56\x42\x00')
return True, "打印成功"
except Exception as e:
print(f"[NetworkPrint Error] {str(e)}")
return False, f"打印失败: {str(e)}"
def print_outbound_selection(self, items):
"""
打印出库选单 (拣货单)
:param items: 选中的物品列表
"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines = []
lines.append("\n")
lines.append("********************************")
lines.append(" 出库拣货确认单 ")
lines.append("********************************")
lines.append(f"打印时间: {timestamp}")
lines.append(f"待出库总数: {len(items)}")
lines.append("--------------------------------")
lines.append(f"{'名称':<14}{'规格/批号':<10}")
lines.append("--------------------------------")
for item in items:
# 获取名称,优先取 material_name, 其次 product_name
name = item.get('material_name') or item.get('product_name') or "未知物品"
if len(name) > 14: name = name[:13] + "." # 名称过长截断
standard = item.get('standard', '')
batch = item.get('batch_no', '')
uuid = item.get('uuid', '')[-6:] # 只显示UUID后6位
lines.append(f"{name:<14} {standard}")
lines.append(f"批号: {batch} | 尾号: {uuid}")
lines.append("- - - - - - - - - - - - - - - -")
lines.append("\n")
lines.append("库管员签字: ______________")
lines.append("领料人签字: ______________")
lines.append("\n\n\n") # 走纸
content = "\n".join(lines)
return self._send_to_printer(content)
def print_stocktake_report(self, data):
"""
打印盘点统计报告
:param data: 包含 total, scanned, missing, missing_items
"""
total = data.get('total', 0)
scanned = data.get('scanned', 0)
missing = data.get('missing', 0)
missing_items = data.get('missing_items', [])
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines = []
lines.append("\n")
lines.append("================================")
lines.append(" 库存盘点统计报告 ")
lines.append("================================")
lines.append(f"盘点时间: {timestamp}")
lines.append(f"应盘总数: {total}")
lines.append(f"实盘(已扫): {scanned}")
lines.append(f"差异(未扫): {missing}")
lines.append("--------------------------------")
if missing == 0:
lines.append("【结果】: 账实相符,库存完美!")
else:
lines.append("【差异明细 (未扫码物品)】:")
for item in missing_items:
name = item.get('material_name') or item.get('product_name') or "未知"
batch = item.get('batch_no', '-')
# 兼容不同模型的字段
code = item.get('uuid', item.get('bar_code', 'N/A'))[-6:]
lines.append(f"[ ] {name}")
lines.append(f" 批:{batch} 码:{code}")
lines.append("\n")
lines.append("监盘人: ______________")
lines.append("\n\n\n")
content = "\n".join(lines)
return self._send_to_printer(content)