feat: initialize inventory profit and loss adjustment module

This commit is contained in:
DXC
2026-03-19 12:06:32 +08:00
parent 7a4717ce21
commit d8a57ab66e
6 changed files with 634 additions and 1 deletions

View File

@ -162,7 +162,17 @@ def create_app():
print(f"⚠️ 审计日志菜单初始化跳过: {e}")
# -----------------------------------------------------
# 2.10 注册库位管理模块 (Warehouse)
# 2.10 注册盘盈盘亏管理模块 (Stock Adjustment)
# -----------------------------------------------------
try:
from app.api.v1.stock.adjustment import adjustment_bp
app.register_blueprint(adjustment_bp, url_prefix='/api/v1/stock/adjustment')
print("✅ Stock Adjustment 模块注册成功")
except ImportError as e:
print(f"❌ 错误: Stock Adjustment 模块导入失败: {e}")
# -----------------------------------------------------
# 2.11 注册库位管理模块 (Warehouse)
# -----------------------------------------------------
try:
from app.api.v1.warehouse import warehouse_bp

View File

@ -0,0 +1,223 @@
# inventory-backend/app/api/v1/stock/adjustment.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from app.utils.decorators import permission_required
from app.extensions import db
from app.models.stock.adjustment import StockAdjustment
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from datetime import datetime
import random
import string
adjustment_bp = Blueprint('adjustment', __name__, url_prefix='/stock/adjustment')
def generate_order_no():
"""生成单号 ADJ-YYYYMMDD-XXXX"""
today = datetime.now().strftime('%Y%m%d')
suffix = ''.join(random.choices(string.digits, k=4))
return f'ADJ-{today}-{suffix}'
def get_stock_model(source_table):
"""根据source_table获取对应的库存模型"""
if source_table == 'stock_buy':
return StockBuy
elif source_table == 'stock_semi':
return StockSemi
elif source_table == 'stock_product':
return StockProduct
return None
# --------------------------------------------------------
# 1. 获取调整单列表
# GET /api/v1/stock/adjustment/list
# --------------------------------------------------------
@adjustment_bp.route('/list', methods=['GET'])
@jwt_required()
@permission_required('stock_adjustment:list')
def get_list():
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
keyword = request.args.get('keyword', '')
adjust_type = request.args.get('adjust_type', '')
status = request.args.get('status', '')
query = StockAdjustment.query
if keyword:
query = query.filter(
db.or_(
StockAdjustment.order_no.ilike(f'%{keyword}%'),
StockAdjustment.sku.ilike(f'%{keyword}%'),
StockAdjustment.material_name.ilike(f'%{keyword}%')
)
)
if adjust_type:
query = query.filter(StockAdjustment.adjust_type == adjust_type)
if status:
query = query.filter(StockAdjustment.status == status)
# 按创建时间降序
query = query.order_by(StockAdjustment.create_time.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
return jsonify({
'code': 200,
'data': {
'items': [item.to_dict() for item in pagination.items],
'total': pagination.total,
'page': page,
'limit': limit
}
})
# --------------------------------------------------------
# 2. 创建调整单
# POST /api/v1/stock/adjustment/create
# --------------------------------------------------------
@adjustment_bp.route('/create', methods=['POST'])
@jwt_required()
@permission_required('stock_adjustment:operation')
def create():
data = request.get_json()
if not data:
return jsonify({'code': 400, 'msg': '请求参数不能为空'}), 400
# 必填字段验证
required_fields = ['source_table', 'stock_id', 'adjust_type', 'adjust_quantity', 'reason']
for field in required_fields:
if field not in data or not data.get(field):
return jsonify({'code': 400, 'msg': f'{field} 为必填项'}), 400
source_table = data['source_table']
stock_id = int(data['stock_id'])
adjust_type = data['adjust_type'] # 'profit' or 'loss'
adjust_quantity = float(data['adjust_quantity'])
reason = data['reason']
operator = get_jwt_identity() or 'system'
# 获取库存记录
StockModel = get_stock_model(source_table)
if not StockModel:
return jsonify({'code': 400, 'msg': '无效的库存类型'}), 400
stock = StockModel.query.get(stock_id)
if not stock:
return jsonify({'code': 404, 'msg': '库存记录不存在'}), 404
# 获取物料信息
base_id = getattr(stock, 'base_id', None)
material = MaterialBase.query.get(base_id) if base_id else None
# 计算库存变动
if adjust_type == 'profit':
# 盘盈:增加库存
new_stock_qty = float(stock.stock_quantity or 0) + adjust_quantity
new_avail_qty = float(stock.available_quantity or 0) + adjust_quantity
elif adjust_type == 'loss':
# 盘亏:减少库存
new_stock_qty = float(stock.stock_quantity or 0) - adjust_quantity
new_avail_qty = float(stock.available_quantity or 0) - adjust_quantity
if new_stock_qty < 0 or new_avail_qty < 0:
return jsonify({'code': 400, 'msg': '库存不足,无法盘亏'}), 400
else:
return jsonify({'code': 400, 'msg': '无效的调整类型'}), 400
try:
# 创建调整单
adjustment = StockAdjustment(
order_no=generate_order_no(),
base_id=base_id,
stock_id=stock_id,
source_table=source_table,
sku=getattr(stock, 'sku', None) or getattr(stock, 'SKU', None),
material_name=material.name if material else getattr(stock, 'sku', '未知'),
spec_model=getattr(material, 'spec_model', None) if material else None,
warehouse_location=getattr(stock, 'warehouse_location', None),
adjust_type=adjust_type,
adjust_quantity=adjust_quantity,
reason=reason,
status='completed',
operator=operator
)
db.session.add(adjustment)
# 更新库存
stock.stock_quantity = new_stock_qty
stock.available_quantity = new_avail_qty
db.session.commit()
return jsonify({
'code': 200,
'msg': '调整成功',
'data': adjustment.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'msg': f'调整失败: {str(e)}'}), 500
# --------------------------------------------------------
# 3. 获取库存列表(用于选择物料)
# GET /api/v1/stock/adjustment/stocks
# --------------------------------------------------------
@adjustment_bp.route('/stocks', methods=['GET'])
@jwt_required()
@permission_required('stock_adjustment:list')
def get_stocks():
"""获取可用于调整的库存列表"""
source_table = request.args.get('source_table', 'stock_buy')
keyword = request.args.get('keyword', '')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
StockModel = get_stock_model(source_table)
if not StockModel:
return jsonify({'code': 400, 'msg': '无效的库存类型'}), 400
query = StockModel.query.filter(StockModel.stock_quantity > 0)
if keyword:
query = query.filter(
db.or_(
StockModel.sku.ilike(f'%{keyword}%'),
StockModel.barcode.ilike(f'%{keyword}%')
)
)
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for stock in pagination.items:
base_id = getattr(stock, 'base_id', None)
material = MaterialBase.query.get(base_id) if base_id else None
items.append({
'stock_id': stock.id,
'source_table': source_table,
'sku': getattr(stock, 'sku', None) or getattr(stock, 'SKU', None),
'material_name': material.name if material else getattr(stock, 'sku', '未知'),
'spec_model': getattr(material, 'spec_model', None) if material else None,
'stock_quantity': float(stock.stock_quantity or 0),
'available_quantity': float(stock.available_quantity or 0),
'warehouse_location': getattr(stock, 'warehouse_location', None),
})
return jsonify({
'code': 200,
'data': {
'items': items,
'total': pagination.total,
'page': page,
'limit': limit
}
})

View File

@ -0,0 +1,61 @@
# app/models/stock/adjustment.py
from app.extensions import db, beijing_time
from datetime import datetime
class StockAdjustment(db.Model):
"""
盘盈盘亏调整表
用于记录财务/主管手动发起的库存修正
"""
__tablename__ = 'stock_adjustment'
id = db.Column(db.Integer, primary_key=True)
# 单号,如 ADJ-YYYYMMDD-XXXX
order_no = db.Column(db.String(50), unique=True, nullable=False, index=True)
# 关联物料基础表
base_id = db.Column(db.Integer, db.ForeignKey('material_base.id'))
# 关联具体库存行ID
stock_id = db.Column(db.Integer)
# 库存类型 (stock_buy/stock_semi/stock_product)
source_table = db.Column(db.String(50))
# 物料冗余信息
sku = db.Column(db.String(100))
material_name = db.Column(db.String(255))
spec_model = db.Column(db.String(255))
# 库位
warehouse_location = db.Column(db.String(100))
# 调整类型:'profit' 盘盈 / 'loss' 盘亏
adjust_type = db.Column(db.String(20), nullable=False)
# 调整数量(绝对值)
adjust_quantity = db.Column(db.Numeric(19, 4), nullable=False)
# 原因说明(必填)
reason = db.Column(db.String(500), nullable=False)
# 状态:'pending' 待处理 / 'completed' 已完成 / 'cancelled' 已取消
status = db.Column(db.String(20), default='pending')
# 操作人/经办人
operator = db.Column(db.String(100))
# 创建时间
create_time = db.Column(db.DateTime, default=beijing_time)
# 更新时间
update_time = db.Column(db.DateTime, default=beijing_time, onupdate=beijing_time)
def to_dict(self):
return {
'id': self.id,
'order_no': self.order_no,
'base_id': self.base_id,
'stock_id': self.stock_id,
'source_table': self.source_table,
'sku': self.sku,
'material_name': self.material_name,
'spec_model': self.spec_model,
'warehouse_location': self.warehouse_location,
'adjust_type': self.adjust_type,
'adjust_quantity': float(self.adjust_quantity or 0),
'reason': self.reason,
'status': self.status,
'operator': self.operator,
'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None,
'update_time': self.update_time.strftime('%Y-%m-%d %H:%M:%S') if self.update_time else None,
}