借库逻辑实现

This commit is contained in:
dxc
2026-02-06 17:11:47 +08:00
parent 387c8973d6
commit 04ee938cd1
15 changed files with 1766 additions and 268 deletions

View File

@ -80,18 +80,21 @@ def create_app():
# -----------------------------------------------------
# 2.4 注册业务操作模块 (Transactions - 借还/维修/报废)
# ★★★ 关键修改:将前缀改为 /api/v1/transactions 以匹配前端请求 ★★★
# -----------------------------------------------------
try:
from app.api.v1.transactions import trans_bp
app.register_blueprint(trans_bp, url_prefix='/api/v1/trans')
app.register_blueprint(trans_bp, url_prefix='/api/trans', name='trans_legacy')
# 标准: /api/v1/transactions/borrow
app.register_blueprint(trans_bp, url_prefix='/api/v1/transactions')
# 兼容: /api/transactions/borrow
app.register_blueprint(trans_bp, url_prefix='/api/transactions', name='trans_legacy')
print("✅ Transactions 模块注册成功")
except ImportError as e:
# 允许模块不存在时不崩溃
print(f"⚠️ 提示: Transaction 模块尚未创建或导入失败: {e}")
# 允许模块不存在时不崩溃,但在开发借还功能时这里报错说明 trans_bp 定义有问题
print(f"⚠️ 提示: Transaction 模块导入失败 (请检查 app/api/v1/transactions.py): {e}")
# -----------------------------------------------------
# 2.5 ★ [新增] 注册出库模块 (Outbound)
# 2.5 注册出库模块 (Outbound)
# -----------------------------------------------------
try:
from app.api.v1.outbound import outbound_bp
@ -114,11 +117,12 @@ def create_app():
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
# ★ [新增] 出库模型 (确保迁移工具能检测到 trans_outbound 表)
# 出库模型
from app.models.outbound import TransOutbound
# 系统与业务模型
from app.models.system import SysUser, SysLog
# 确保借还模型被加载
from app.models.transaction import TransBorrow, TransRepair, TransScrap
# 首次运行时可取消注释自动建表 (但在生产环境建议使用 flask db upgrade)

View File

@ -1,12 +1,58 @@
from flask import Blueprint, jsonify
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.services.trans_service import TransService
import traceback
# 定义蓝图
# 注意:这个变量名 trans_bp 必须与 app/__init__.py 中注册时引用的名字一致
trans_bp = Blueprint('transactions', __name__)
trans_bp = Blueprint('transactions', __name__, url_prefix='/transactions')
@trans_bp.route('/test', methods=['GET'])
def test_transaction():
"""
测试接口:用于验证 Transaction 模块是否加载成功
"""
return jsonify({"message": "Transaction module is working", "status": "success"})
# --- 借库接口 ---
@trans_bp.route('/borrow', methods=['POST'])
@jwt_required()
def create_borrow():
data = request.get_json()
try:
no = TransService.create_borrow(data)
return jsonify({'code': 200, 'msg': '借用成功', 'data': {'borrow_no': no}})
except Exception as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
# --- 还库辅助:扫码查找借出记录 ---
@trans_bp.route('/return/scan', methods=['GET'])
@jwt_required()
def scan_borrowed_item():
barcode = request.args.get('barcode')
if not barcode:
return jsonify({'code': 400, 'msg': '无条码'}), 400
res = TransService.scan_for_return(barcode)
if res:
return jsonify({'code': 200, 'data': res})
else:
return jsonify({'code': 404, 'msg': '未找到该物品的未还记录'}), 404
# --- 还库提交 ---
@trans_bp.route('/return', methods=['POST'])
@jwt_required()
def submit_return():
data = request.get_json()
user = get_jwt_identity() # 库管
try:
TransService.process_return(data, operator_name=user)
return jsonify({'code': 200, 'msg': '还库成功'})
except Exception as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
# --- 记录列表 ---
@trans_bp.route('/records', methods=['GET'])
@jwt_required()
def get_records():
status = request.args.get('status', 'all')
page = int(request.args.get('page', 1))
keyword = request.args.get('keyword', '')
res = TransService.get_records(page=page, limit=10, status=status, keyword=keyword)
return jsonify({'code': 200, 'data': res})

View File

@ -2,95 +2,52 @@ from app.extensions import db
from datetime import datetime
# 1. 借用表
class TransBorrow(db.Model):
__tablename__ = 'trans_borrow'
id = db.Column(db.Integer, primary_key=True)
sku = db.Column(db.String(100), index=True) # 加索引优化查询
borrow_no = db.Column(db.String(100))
sku = db.Column(db.String(100))
source_table = db.Column(db.String(50))
stock_id = db.Column(db.Integer)
barcode = db.Column(db.String(100))
quantity = db.Column(db.Numeric(19, 4))
# 借出信息
borrower_name = db.Column(db.String(100))
borrow_time = db.Column(db.DateTime, default=datetime.now)
borrow_signature = db.Column(db.Text) # 借用人签字
expected_return_time = db.Column(db.DateTime)
borrower_name = db.Column(db.String(100))
actual_return_time = db.Column(db.DateTime)
approver_name = db.Column(db.String(100))
# 归还信息
is_returned = db.Column(db.Boolean, default=False)
return_time = db.Column(db.DateTime)
return_operator = db.Column(db.String(100)) # 库管
return_signature = db.Column(db.Text) # 库管签字
return_location = db.Column(db.String(100)) # 归还库位
# 状态borrowed(借出), returned(已还), overdue(逾期)
status = db.Column(db.String(20), default='borrowed')
remark = db.Column(db.Text)
def to_dict(self):
return {
'id': self.id,
'borrow_no': self.borrow_no,
'sku': self.sku,
'barcode': self.barcode,
'quantity': float(self.quantity) if self.quantity else 0,
'borrower_name': self.borrower_name,
'borrow_time': self.borrow_time.strftime('%Y-%m-%d %H:%M:%S') if self.borrow_time else None,
'status': self.status
}
# 2. 维修表
class TransRepair(db.Model):
__tablename__ = 'trans_repair'
id = db.Column(db.Integer, primary_key=True)
sku = db.Column(db.String(100), index=True)
source_table = db.Column(db.String(50))
stock_id = db.Column(db.Integer)
arrival_date = db.Column(db.Date)
expected_repair_time = db.Column(db.String(100))
shipping_date = db.Column(db.Date)
is_self_made = db.Column(db.Boolean, default=False)
related_product_id = db.Column(db.Integer)
related_contract_id = db.Column(db.String(100))
repair_manager = db.Column(db.String(100))
fault_description = db.Column(db.Text)
repair_result = db.Column(db.Text)
cost_price = db.Column(db.Numeric(19, 4))
sale_price = db.Column(db.Numeric(19, 4))
def to_dict(self):
return {
'id': self.id,
'sku': self.sku,
'status': 'repaired' if self.repair_result else 'pending',
'manager': self.repair_manager
}
# 3. 报废表
class TransScrap(db.Model):
__tablename__ = 'trans_scrap'
id = db.Column(db.Integer, primary_key=True)
sku = db.Column(db.String(100), index=True)
source_table = db.Column(db.String(50))
stock_id = db.Column(db.Integer)
quantity = db.Column(db.Numeric(19, 4))
reason = db.Column(db.Text)
operator_name = db.Column(db.String(100))
operation_time = db.Column(db.DateTime, default=datetime.now)
approver_name = db.Column(db.String(100))
approval_status = db.Column(db.String(20), default='pending') # pending, approved, rejected
cost_at_scrap = db.Column(db.Numeric(19, 4))
total_loss = db.Column(db.Numeric(19, 4))
def to_dict(self):
return {
'id': self.id,
'sku': self.sku,
'quantity': float(self.quantity) if self.quantity else 0,
'total_loss': float(self.total_loss) if self.total_loss else 0,
'reason': self.reason
'borrow_time': self.borrow_time.strftime('%Y-%m-%d %H:%M') if self.borrow_time else '',
'borrow_signature': self.borrow_signature,
'expected_return_time': self.expected_return_time.strftime(
'%Y-%m-%d %H:%M') if self.expected_return_time else '',
'is_returned': self.is_returned,
'return_time': self.return_time.strftime('%Y-%m-%d %H:%M') if self.return_time else '',
'return_operator': self.return_operator,
'return_signature': self.return_signature,
'return_location': self.return_location,
'status': self.status,
'remark': self.remark
}

View File

@ -1,9 +1,8 @@
from app.extensions import db
# 引用新的模型类 StockBuy
from app.models.inbound.buy import StockBuy
from app.models.base import MaterialBase
# 尝试导入出库模型,如果不存在则忽略(防止报错影响入库功能)
# 尝试导入出库模型,如果不存在则忽略
try:
from app.models.outbound import TransOutbound
except ImportError:
@ -17,28 +16,70 @@ import json
class BuyInboundService:
# ============================================================
# 0. 辅助:唯一性校验 (核心修复)
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
"""
校验序列号和批号的唯一性逻辑
:param base_id: 当前物料的基础ID
:param serial_number: 序列号
:param batch_number: 批号
:param exclude_id: 排除的ID (用于编辑模式)
"""
# 1. 序列号 (SN) 全局唯一校验
# 解释: 不同规格的物料通常也不应该有相同的SN防止扫码混淆
if serial_number:
query = StockBuy.query.filter(StockBuy.serial_number == serial_number)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
exists = query.first()
if exists:
# 获取占用该SN的物料名称提示更友好
occupied_name = exists.material.name if exists.material else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。")
# 2. 批号 (BN) 同物料唯一校验
# 解释: 不同规格的物料可以有相同的批号(如都有 001 批次),但同一个物料不能重复建单
if batch_number and base_id:
query = StockBuy.query.filter(
StockBuy.base_id == base_id,
StockBuy.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。")
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword):
"""搜索基础物料"""
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
MaterialBase.spec_model.ilike(f'%{keyword}%'),
MaterialBase.pinyin.ilike(f'%{keyword}%') # 假设有拼音搜索
)
)
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id, 'name': item.name, 'spec': item.spec_model,
'category': item.category, 'unit': item.unit,
'type': item.material_type, 'status': '启用'
'id': item.id,
'name': item.name,
'spec': item.spec_model, # 确保这里字段对应正确
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
@ -46,76 +87,76 @@ class BuyInboundService:
return []
# ============================================================
# 2. 新增入库逻辑 (强制北京时间)
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
"""新增入库"""
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
if not base_id:
raise ValueError("必须选择基础物料")
# [核心修改] 获取当前北京时间 (UTC+8)
# 无论服务器在 UTC 还是其他时区,这里强制转换为 UTC+8 并去掉时区信息存入数据库
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError("所选物料不存在")
# --- [修复点] 执行唯一性校验 ---
BuyInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
# 时间处理 (强制北京时间)
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
# 如果前端传了时分秒,尝试直接解析
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
# 如果只传了日期,使用该日期 + 当前北京时间的时分秒
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
current_time.hour, current_time.minute, current_time.second)
except:
# 解析失败则使用当前北京时间
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
# [核心逻辑] 获取全局打印流水号
# 获取全局打印ID
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except Exception:
# 如果序列不存在,回退处理(或在数据库创建序列)
print("Warning: Sequence global_print_seq not found.")
except:
next_global_id = None
# SKU 生成逻辑:如果没有 ID用临时随机数或空通常应该依赖 next_global_id
# SKU 生成
if next_global_id:
generated_sku = str(next_global_id).zfill(10)
else:
generated_sku = datetime.now().strftime('%Y%m%d%H%M%S') # 降级方案
generated_sku = datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
arrival_list = data.get('arrival_photo', [])
report_list = data.get('inspection_report', [])
if not isinstance(arrival_list, list): arrival_list = []
if not isinstance(report_list, list): report_list = []
new_stock = StockBuy(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
barcode=final_barcode,
in_date=in_date_val, # 存入 DateTime 对象
in_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
status=data.get('status', '在库'),
in_quantity=in_qty,
stock_quantity=in_qty,
stock_quantity=in_qty, # 初始库存等于入库数
available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
@ -143,13 +184,27 @@ class BuyInboundService:
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
"""更新入库"""
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
if not stock:
raise ValueError("记录不存在")
# --- [修复点] 编辑时也要校验唯一性 (排除自身ID) ---
# 如果修改了物料(base_id)或者修改了SN/BN都需要校验
new_base_id = data.get('base_id', stock.base_id)
new_sn = data.get('serial_number', stock.serial_number)
new_bn = data.get('batch_number', stock.batch_number)
BuyInboundService._check_unique(
base_id=new_base_id,
serial_number=new_sn,
batch_number=new_bn,
exclude_id=stock_id
)
# 更新字段
field_mapping = {
'sku': 'sku', 'barcode': 'barcode',
'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number', 'batch_number': 'batch_number',
'status': 'status', 'inspection_status': 'inspection_status',
@ -166,9 +221,9 @@ class BuyInboundService:
if 'inspection_report' in data and isinstance(data['inspection_report'], list):
stock.inspection_report = json.dumps(data['inspection_report'])
# 库存数量变更逻辑
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
# 计算差值,同步更新库存量和可用量
diff = new_qty - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = new_qty
@ -190,7 +245,6 @@ class BuyInboundService:
# ============================================================
@staticmethod
def delete_inbound(stock_id):
"""删除入库"""
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
@ -202,34 +256,13 @@ class BuyInboundService:
raise e
# ============================================================
# 5. 获取出库流转历史
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
"""获取出库历史"""
if not TransOutbound:
return []
try:
records = TransOutbound.query.filter_by(
source_table='stock_buy', stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except:
return []
# ============================================================
# 6. 获取列表 (修改:按时间倒序排序 + 展示只显示日期)
# 5. 获取列表
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None):
"""
获取列表
:param statuses: 状态列表 (e.g. ['在库', '借库', '已出库'])
"""
try:
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
# 1. 关键词搜索
if keyword:
kw = f'%{keyword}%'
query = query.filter(
@ -243,23 +276,14 @@ class BuyInboundService:
)
)
# 2. 状态筛选
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
# 如果明确查已出库可以包含库存为0的
query = query.filter(StockBuy.status.in_(statuses))
else:
# 默认查在库,必须保证库存 > 0
query = query.filter(
and_(
StockBuy.status.in_(statuses),
StockBuy.stock_quantity > 0
)
)
query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0))
# [核心修改] 按照入库时间倒序排序 (从近到远)
pagination = query.order_by(StockBuy.in_date.desc()).paginate(page=page, per_page=limit, error_out=False)
current_items = pagination.items
@ -275,7 +299,6 @@ class BuyInboundService:
qty_stock = float(item.stock_quantity or 0)
qty_avail = float(item.available_quantity or 0)
# [核心修改] 格式化展示日期,去掉时分秒
date_display = ''
if item.in_date:
try:
@ -286,6 +309,7 @@ class BuyInboundService:
d = {
'id': item.id,
'base_id': item.base_id,
# 确保这里从关联的 MaterialBase 获取规格型号
'material_name': item.material.name if item.material else '',
'spec_model': item.material.spec_model if item.material else '',
'category': item.material.category if item.material else '',
@ -293,21 +317,15 @@ class BuyInboundService:
'material_type': item.material.material_type if item.material else '',
'sku': item.sku,
'inbound_date': date_display, # 前端展示用的日期字符串
'inbound_date': date_display,
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
'status': item.status,
'inspection_status': item.inspection_status,
'qty_inbound': float(item.in_quantity or 0),
'qty_stock': qty_stock,
'qty_available': qty_avail,
'sum_stock': qty_stock,
'sum_available': qty_avail,
'warehouse_loc': item.warehouse_location,
'unit_price': float(item.unit_price or 0),
'total_price': float(item.total_price or 0),
@ -320,8 +338,7 @@ class BuyInboundService:
'detail_link': item.detail_link,
'arrival_photo': parse_img(item.arrival_photo),
'inspection_report': parse_img(item.inspection_report),
'global_print_id': item.global_print_id,
'global_print_id_str': f"{item.global_print_id:08d}" if item.global_print_id else ""
'global_print_id': item.global_print_id
}
items.append(d)

View File

@ -11,7 +11,30 @@ import json
class ProductInboundService:
# ============================================================
# 1. 基础物料搜索 (已修正:完全对齐 Buy/Semi 的逻辑)
# 0. 辅助:唯一性校验 (新增核心逻辑)
# ============================================================
@staticmethod
def _check_unique(serial_number, exclude_id=None):
"""
校验成品的唯一性
:param serial_number: 序列号
:param exclude_id: 排除的ID (编辑模式用)
"""
from app.models.inbound.product import StockProduct
# 成品强校验序列号 (SN) - SN应该是全局唯一的
if serial_number:
query = StockProduct.query.filter(StockProduct.serial_number == serial_number)
if exclude_id:
query = query.filter(StockProduct.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.material.name if (hasattr(exists, 'material') and exists.material) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword):
@ -31,16 +54,16 @@ class ProductInboundService:
# 3. 排序与限制按ID倒序取最新20条
query = query.order_by(MaterialBase.id.desc()).limit(20)
# 4. 结果封装:确保字段名与前端 Vue 的 handleSelect 方法一致
# 4. 结果封装
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model, # 对应前端: item.spec
'category': item.category, # 对应前端: item.category
'unit': item.unit, # 对应前端: item.unit
'type': item.material_type, # 对应前端: item.type
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
@ -49,7 +72,7 @@ class ProductInboundService:
return []
# ============================================================
# 2. 新增入库逻辑 (强制北京时间)
# 2. 新增入库逻辑 (强制北京时间 + 唯一性校验)
# ============================================================
@staticmethod
def handle_inbound(data):
@ -61,6 +84,11 @@ class ProductInboundService:
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
# --- [核心修改] 执行唯一性校验 ---
ProductInboundService._check_unique(
serial_number=data.get('serial_number')
)
# [核心修改] 强制北京时间
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
@ -86,11 +114,14 @@ class ProductInboundService:
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
# 全局流水号
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
generated_sku = str(next_global_id).zfill(10)
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
photo_list = data.get('product_photo', [])
@ -156,6 +187,13 @@ class ProductInboundService:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# --- [核心修改] 编辑时也要校验唯一性 ---
if 'serial_number' in data:
ProductInboundService._check_unique(
serial_number=data['serial_number'],
exclude_id=stock_id
)
fields = [
'barcode', 'serial_number', 'warehouse_location',
'status', 'quality_status', 'bom_code', 'bom_version',

View File

@ -11,7 +11,44 @@ import json
class SemiInboundService:
# ============================================================
# 1. 基础物料搜索 (已修复:支持空关键词返回最新数据)
# 0. 辅助:唯一性校验 (新增核心逻辑)
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
"""
校验半成品的唯一性
:param base_id: 基础物料ID
:param serial_number: 序列号
:param batch_number: 批号
:param exclude_id: 排除的ID
"""
from app.models.inbound.semi import StockSemi
# 1. 序列号 (SN) 校验 - 全局唯一
if serial_number:
query = StockSemi.query.filter(StockSemi.serial_number == serial_number)
if exclude_id:
query = query.filter(StockSemi.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.material.name if (hasattr(exists, 'material') and exists.material) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被半成品 [{occupied_name}] 占用,请核查。")
# 2. 批号 (BN) 校验 - 同物料下不能重复开单
if batch_number and base_id:
query = StockSemi.query.filter(
StockSemi.base_id == base_id,
StockSemi.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockSemi.id != exclude_id)
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复建单,建议在原批次上追加库存。")
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword):
@ -63,6 +100,13 @@ class SemiInboundService:
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
# --- [核心修改] 执行唯一性校验 ---
SemiInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
# [核心修改] 强制北京时间
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
@ -194,6 +238,18 @@ class SemiInboundService:
if not stock:
raise ValueError("记录不存在")
# --- [核心修改] 编辑时也要校验唯一性 ---
new_base_id = data.get('base_id', stock.base_id)
new_sn = data.get('serial_number', stock.serial_number)
new_bn = data.get('batch_number', stock.batch_number)
SemiInboundService._check_unique(
base_id=new_base_id,
serial_number=new_sn,
batch_number=new_bn,
exclude_id=stock_id
)
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',

View File

@ -0,0 +1,185 @@
import uuid
from datetime import datetime
from app.extensions import db
from app.models.transaction import TransBorrow
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import desc, func
class TransService:
@staticmethod
def generate_borrow_no():
"""
生成借用单号: BOR-yyyyMMdd-0001 (按日流水)
逻辑:统计当天已存在的不同借用单号数量,+1 作为新序号
"""
now = datetime.now()
date_str = now.strftime('%Y%m%d')
prefix = f"BOR-{date_str}-"
# 使用 count distinct 来计算当天有多少个不同的借用单 (因为一单多货会占多行)
count = db.session.query(func.count(func.distinct(TransBorrow.borrow_no))) \
.filter(TransBorrow.borrow_no.like(f"{prefix}%")).scalar()
sequence = count + 1
return f"{prefix}{sequence:04d}"
@staticmethod
def create_borrow(data, operator_name='System'):
"""
借库逻辑:减少可用库存,不减总库存
"""
items = data.get('items', [])
borrower_name = data.get('borrower_name')
signature = data.get('signature_path') # 借用人签字
if not items: raise ValueError("物品列表为空")
if not borrower_name: raise ValueError("请输入借用人")
if not signature: raise ValueError("借用人必须签字")
borrow_no = TransService.generate_borrow_no()
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
try:
for item in items:
source_table = item.get('source_table')
stock_id = item.get('id')
qty = float(item.get('out_quantity', 0))
ModelClass = model_map.get(source_table)
if not ModelClass: continue
stock = ModelClass.query.with_for_update().get(stock_id)
if not stock: raise ValueError(f"库存不存在 ID:{stock_id}")
if float(stock.available_quantity) < qty:
raise ValueError(f"SKU {stock.sku} 可用库存不足")
# 1. 冻结库存 (只减可用)
stock.available_quantity = float(stock.available_quantity) - qty
# 2. 创建借用单
record = TransBorrow(
borrow_no=borrow_no,
sku=stock.sku,
source_table=source_table,
stock_id=stock.id,
barcode=stock.barcode,
quantity=qty,
borrower_name=borrower_name,
borrow_signature=signature,
remark=data.get('remark'),
expected_return_time=data.get('expected_return_time'),
status='borrowed',
is_returned=False
)
db.session.add(record)
db.session.commit()
return borrow_no
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def scan_for_return(barcode):
"""
扫码还库:查找未归还记录,并返回当前物品的库位
"""
records = TransBorrow.query.filter_by(barcode=barcode, is_returned=False).all()
if not records:
return None
# 取第一条未还记录
record = records[0]
# 获取当前库存表中的实时库位
current_location = ""
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
ModelClass = model_map.get(record.source_table)
if ModelClass:
stock = ModelClass.query.get(record.stock_id)
if stock:
current_location = stock.warehouse_location
res_dict = record.to_dict()
res_dict['current_location'] = current_location # 用于前端对比和预填
return res_dict
@staticmethod
def process_return(data, operator_name):
"""
还库逻辑:
1. 恢复可用库存
2. 更新库位 (如果有变动)
3. 记录库管签字
"""
items = data.get('items', [])
signature = data.get('signature_path') # 库管签字
if not items: raise ValueError("还库列表为空")
if not signature: raise ValueError("库管必须签字确认")
model_map = {'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct}
try:
for item in items:
borrow_id = item.get('id')
# 前端如果没有填 return_location应该在提交前处理好或者这里做 fallback
# 这里假设前端传来的 return_location 就是最终要保存的库位
final_location = item.get('return_location')
record = TransBorrow.query.with_for_update().get(borrow_id)
if not record or record.is_returned:
continue
ModelClass = model_map.get(record.source_table)
if ModelClass:
stock = ModelClass.query.with_for_update().get(record.stock_id)
if stock:
# 1. 恢复可用库存
stock.available_quantity = float(stock.available_quantity) + float(record.quantity)
# 2. 更新库位 (如果提供了有效值)
if final_location:
stock.warehouse_location = final_location
# 3. 更新借用单状态
record.is_returned = True
record.status = 'returned'
record.return_time = datetime.now()
record.return_operator = operator_name
record.return_signature = signature
record.return_location = final_location
db.session.commit()
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_records(page=1, limit=10, status='all', keyword=None):
q = TransBorrow.query
if status == 'borrowed':
q = q.filter(TransBorrow.is_returned == False)
elif status == 'returned':
q = q.filter(TransBorrow.is_returned == True)
if keyword:
q = q.filter(TransBorrow.borrower_name.ilike(f'%{keyword}%') |
TransBorrow.sku.ilike(f'%{keyword}%') |
TransBorrow.borrow_no.ilike(f'%{keyword}%'))
q = q.order_by(desc(TransBorrow.borrow_time))
pagination = q.paginate(page=page, per_page=limit, error_out=False)
return {
'items': [r.to_dict() for r in pagination.items],
'total': pagination.total,
'page': page,
'limit': limit
}