物料-采购件入库页面功能实现

This commit is contained in:
dxc
2026-01-27 15:50:23 +08:00
parent 2f8a5c55b1
commit 3afea217b7
45 changed files with 1522 additions and 2756 deletions

View File

@ -1,4 +1,3 @@
# 文件路径: app/__init__.py
from flask import Flask
from config import Config
from app.extensions import db, migrate, cors
@ -10,14 +9,30 @@ def create_app():
# 初始化插件
db.init_app(app)
migrate.init_app(app, db)
cors.init_app(app) # 允许前端访问
# 确保跨域配置正确,允许前端访问
cors.init_app(app, resources={r"/api/*": {"origins": "*"}})
# 注册蓝图 (Blueprints)
from app.api.v1.stocks import stocks_bp
app.register_blueprint(stocks_bp, url_prefix='/api/v1/stocks')
# --- 注册蓝图 ---
# 可以在这里打印一下路由,方便调试
print("已注册路由:")
print(app.url_map)
# 1. 保持原有的 stocks 模块
try:
from app.api.v1.stocks import stocks_bp
app.register_blueprint(stocks_bp, url_prefix='/api/v1/stocks')
except ImportError as e:
print(f"⚠️ 警告: 原有 stocks 蓝图导入失败: {e}")
# 2. 注册新的入库聚合蓝图
# 核心:必须先导入,再注册。路径对应 app/api/v1/inbound/__init__.py
try:
from app.api.v1.inbound import inbound_bp
# 最终路径结构:/api/v1/inbound/buy/list
app.register_blueprint(inbound_bp, url_prefix='/api/v1/inbound')
print("✅ 入库模块蓝图注册成功")
except ImportError as e:
print(f"❌ 严重错误: 入库模块 inbound 蓝图导入失败: {e}")
# 打印路由映射,仅在本地调试时建议开启
# with app.app_context():
# print(app.url_map)
return app

View File

@ -0,0 +1,14 @@
from flask import Blueprint
from .buy import inbound_buy_bp
# 后续如果有 semi.py 或 product.py在这里导入
# from .semi import inbound_semi_bp
# 创建聚合蓝图
inbound_bp = Blueprint('inbound', __name__)
# 挂载子模块。url_prefix 会进行路径拼接
# 路径变为:/buy/...
inbound_bp.register_blueprint(inbound_buy_bp, url_prefix='/buy')
# 后续扩展:
# inbound_bp.register_blueprint(inbound_semi_bp, url_prefix='/semi')

View File

@ -0,0 +1,70 @@
from flask import Blueprint, request, jsonify
from app.services.inbound.buy_service import BuyInboundService
import traceback
# 定义蓝图
inbound_buy_bp = Blueprint('inbound_buy', __name__)
# ------------------------------------------------------------------
# 1. 获取列表 (GET)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/list', methods=['GET'])
def get_list():
try:
page = request.args.get('page', 1, type=int)
limit = request.args.get('pageSize', 15, type=int)
result = BuyInboundService.get_list(page, limit)
return jsonify({
"code": 200,
"msg": "success",
"data": result
})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 2. 新增入库 (POST)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/submit', methods=['POST'])
def submit():
try:
data = request.get_json()
if not data:
return jsonify({"code": 400, "msg": "No data provided"}), 400
BuyInboundService.handle_inbound(data)
return jsonify({"code": 200, "msg": "入库成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 3. 更新入库 (PUT)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/<int:id>', methods=['PUT'])
def update_buy(id):
try:
data = request.get_json()
BuyInboundService.update_inbound(id, data)
return jsonify({"code": 200, "msg": "更新成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 4. 删除入库 (DELETE)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/<int:id>', methods=['DELETE'])
def delete_buy(id):
try:
BuyInboundService.delete_inbound(id)
return jsonify({"code": 200, "msg": "删除成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500

View File

@ -1,45 +1,87 @@
from flask import Blueprint, request, jsonify
from app.services.stock_service import StockService
from app.schemas.stock_schema import stock_buy_schema
# 确保这两个引用路径是存在的,如果报错说明文件没建好
try:
from app.services.stock_service import StockService
from app.schemas.stock_schema import stock_buy_schema
except ImportError as e:
# 如果服务还没写好,这里会打印错误,防止整个后端起不来
print(f"❌ 导入服务出错: {e}")
StockService = None
stock_buy_schema = None
stocks_bp = Blueprint('stocks', __name__)
# ------------------------------------------------------------------
# 1. 获取入库列表
# URL: /api/v1/stocks/inbound (GET)
# ------------------------------------------------------------------
@stocks_bp.route('/inbound', methods=['GET'])
def get_inbound_list():
page = request.args.get('page', 1, type=int)
limit = request.args.get('pageSize', 10, type=int)
if not StockService:
return jsonify({'code': 500, 'msg': '后端服务未初始化'}), 500
result = StockService.get_list(page, limit)
try:
page = request.args.get('page', 1, type=int)
limit = request.args.get('pageSize', 10, type=int)
return jsonify({
'code': 200,
'msg': 'success',
'data': result
})
# 调用 Service 层获取数据
result = StockService.get_list(page, limit)
return jsonify({
'code': 200,
'msg': 'success',
'data': result
})
except Exception as e:
print(f"获取列表报错: {e}")
return jsonify({'code': 500, 'msg': '服务器内部错误'}), 500
# ------------------------------------------------------------------
# 2. 新增入库单
# URL: /api/v1/stocks/inbound (POST)
# ------------------------------------------------------------------
@stocks_bp.route('/inbound', methods=['POST'])
def create_inbound():
if not StockService:
return jsonify({'code': 500, 'msg': '后端服务未初始化'}), 500
json_data = request.get_json()
if not json_data:
return jsonify({'code': 400, 'msg': '没有接收到数据'}), 400
try:
# 1. 参数校验
# 1. 参数校验 (Marshmallow Schema)
data = stock_buy_schema.load(json_data)
# 2. 调用业务逻辑
new_stock = StockService.create_inbound(data)
# 3. 返回成功
# 注意:确保 new_stock 对象有 to_dict() 方法,否则这里会报错
return jsonify({
'code': 200,
'msg': '入库成功',
'data': new_stock.to_dict()
'data': new_stock.to_dict() if hasattr(new_stock, 'to_dict') else str(new_stock)
}), 201
except Exception as e:
# 捕获 ValueError 或 SQLAlchemyError
# 捕获校验错误或数据库错误
print(f"入库报错: {e}")
return jsonify({'code': 400, 'msg': str(e)}), 400
# ------------------------------------------------------------------
# 3. 更新入库单
# URL: /api/v1/stocks/inbound/<id> (PUT)
# ------------------------------------------------------------------
@stocks_bp.route('/inbound/<int:id>', methods=['PUT'])
def update_inbound(id):
if not StockService:
return jsonify({'code': 500, 'msg': '后端服务未初始化'}), 500
json_data = request.get_json()
try:
StockService.update_inbound(id, json_data)
@ -48,8 +90,15 @@ def update_inbound(id):
return jsonify({'code': 400, 'msg': str(e)}), 400
# ------------------------------------------------------------------
# 4. 删除入库单
# URL: /api/v1/stocks/inbound/<id> (DELETE)
# ------------------------------------------------------------------
@stocks_bp.route('/inbound/<int:id>', methods=['DELETE'])
def delete_inbound(id):
if not StockService:
return jsonify({'code': 500, 'msg': '后端服务未初始化'}), 500
try:
StockService.delete_inbound(id)
return jsonify({'code': 200, 'msg': '删除成功'})

View File

@ -1,34 +1,32 @@
#material.py
from app.extensions import db
from datetime import datetime
class MaterialBase(db.Model):
__tablename__ = 'material_base'
id = db.Column(db.Integer, primary_key=True)
# 核心字段
sku_code = db.Column(db.String(100), unique=True, nullable=False) # 唯一编码
name = db.Column(db.String(255), nullable=False) # 名称
spec_model = db.Column(db.String(255)) # 规格型号
unit = db.Column(db.String(50)) # 单位
category = db.Column(db.String(100)) # 分类
name = db.Column(db.String(255), nullable=False) # 名称
category = db.Column(db.String(100)) # 类别
material_type = db.Column(db.String(100)) # 类型
spec_model = db.Column(db.String(255)) # 规格型号
unit = db.Column(db.String(50)) # 计量单位
visibility_level = db.Column(db.Integer, default=0) # 信息可见等级
manual_link = db.Column(db.Text) # 通用说明书
product_image = db.Column(db.Text) # 通用产品图
is_enabled = db.Column(db.Boolean, default=True) # 是否启用
# 审计字段 (自动记录时间)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系 (让 StockBuy 可以反向找到 Material)
# 这里的 dynamic 允许在 material.stock_buys 时进行进一步过滤
# 【核心关联】
# 这里定义了反向关系lazy='dynamic' 允许我们后续做 count() 查询
# cascade='all, delete-orphan' 并不是在这里用的,因为我们是手动控制逻辑
stock_buys = db.relationship('StockBuy', back_populates='material', lazy='dynamic')
def to_dict(self):
"""将对象转换为字典,方便接口返回"""
return {
'id': self.id,
'sku_code': self.sku_code,
'name': self.name,
'spec_model': self.spec_model,
'unit': self.unit,
'category': self.category,
'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None
'material_type': self.material_type,
'spec_model': self.spec_model,
'unit': self.unit
}

View File

@ -1,3 +1,4 @@
#stock.py
from app.extensions import db
from datetime import datetime
@ -7,50 +8,62 @@ class StockBuy(db.Model):
id = db.Column(db.Integer, primary_key=True)
# 外键:必须关联一个 MaterialBase 的 ID
material_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
# 【核心关联】
# 这里明确指定了 base_id 是外键,关联 material_base 表的 id
base_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
# 业务数据
inbound_date = db.Column(db.DateTime, default=datetime.utcnow) # 入库时间
batch_no = db.Column(db.String(100)) # 批次号
warehouse_loc = db.Column(db.String(100)) # 库位
supplier_name = db.Column(db.String(255)) # 供应商
sku = db.Column(db.String(100))
in_date = db.Column(db.Date)
serial_number = db.Column(db.String(100))
batch_number = db.Column(db.String(100))
# 数量与状态
qty_inbound = db.Column(db.Numeric(19, 4), default=0) # 初始入库量
qty_current = db.Column(db.Numeric(19, 4), default=0) # 当前剩余量
qty_available = db.Column(db.Numeric(19, 4), default=0) # 当前可用量
status = db.Column(db.String(50), default='NORMAL')
# 数量
in_quantity = db.Column(db.Numeric(19, 4), default=0)
stock_quantity = db.Column(db.Numeric(19, 4), default=0)
available_quantity = db.Column(db.Numeric(19, 4), default=0)
# 财务数据
price_unit = db.Column(db.Numeric(19, 4), default=0) # 单价
price_total = db.Column(db.Numeric(19, 4), default=0) # 总价
# 状态与位置
status = db.Column(db.String(50))
inspection_status = db.Column(db.String(50))
warehouse_location = db.Column(db.String(100))
# 建立与 MaterialBase 的双向关系
# 财务与商务
unit_price = db.Column(db.Numeric(19, 4), default=0)
total_price = db.Column(db.Numeric(19, 4), default=0)
currency = db.Column(db.String(20), default='CNY')
exchange_rate = db.Column(db.Numeric(15, 6), default=1.0)
supplier_name = db.Column(db.String(255))
buyer_name = db.Column(db.String(100))
buyer_email = db.Column(db.String(100))
original_link = db.Column(db.Text)
detail_link = db.Column(db.Text)
arrival_photo = db.Column(db.Text)
# 【核心关联】
# 建立对象级别的连接,方便通过 stock.material 访问基础信息
material = db.relationship('MaterialBase', back_populates='stock_buys')
def to_dict(self):
"""
序列化方法:
这里做了一个扁平化处理,把关联的 material 里的 name/sku 直接拿出来,
方便前端表格直接显示,不用前端再去拼凑。
"""
"""序列化"""
return {
'id': self.id,
'material_id': self.material_id,
# 从关联对象获取基础信息
'sku_code': self.material.sku_code if self.material else None,
'base_id': self.base_id, # 前端需要这个ID来判断关联
'material_name': self.material.name if self.material else None,
'spec_model': self.material.spec_model if self.material else None,
'unit': self.material.unit if self.material else None,
'category': self.material.category if self.material else None,
'unit': self.material.unit if self.material else None,
'material_type': self.material.material_type if self.material else None,
# 本表信息
'inbound_date': self.inbound_date.strftime('%Y-%m-%d %H:%M:%S') if self.inbound_date else None,
'batch_no': self.batch_no,
'warehouse_loc': self.warehouse_loc,
'supplier_name': self.supplier_name,
'qty_inbound': float(self.qty_inbound) if self.qty_inbound else 0,
'price_unit': float(self.price_unit) if self.price_unit else 0,
'price_total': float(self.price_total) if self.price_total else 0,
'sku': self.sku,
'inbound_date': self.in_date.strftime('%Y-%m-%d') if self.in_date else None,
'serial_number': self.serial_number,
'batch_number': self.batch_number,
'qty_inbound': float(self.in_quantity) if self.in_quantity else 0,
'qty_stock': float(self.stock_quantity) if self.stock_quantity else 0,
'qty_available': float(self.available_quantity) if self.available_quantity else 0,
'warehouse_loc': self.warehouse_location,
'status': self.status,
'price_unit': float(self.unit_price) if self.unit_price else 0,
'price_total': float(self.total_price) if self.total_price else 0,
'supplier_name': self.supplier_name
}

View File

@ -0,0 +1,176 @@
from app.extensions import db
from app.models.material import MaterialBase
from app.models.stock import StockBuy
from datetime import datetime
import traceback
class BuyInboundService:
@staticmethod
def handle_inbound(data):
"""新增入库:自动关联/创建基础信息 + 创建库存记录"""
try:
# 0. 基础校验
if not data.get('spec_model') or not data.get('material_name'):
raise ValueError("缺少必要的物料名称或规格型号")
# 1. 关联逻辑:通过规格型号(spec_model)查找基础库
material = MaterialBase.query.filter_by(spec_model=data['spec_model']).first()
# 如果不存在,则新建 MaterialBase
if not material:
material = MaterialBase(
name=data['material_name'],
spec_model=data['spec_model'],
category=data.get('category'),
material_type='采购件',
unit=data.get('unit'),
visibility_level=data.get('visibility_level', 0),
manual_link=data.get('manual_link'),
product_image=data.get('product_image'),
is_enabled=True
)
db.session.add(material)
db.session.flush() # 立即执行,拿到 material.id
# 2. 处理日期 (兼容性处理)
in_date_val = None
if data.get('in_date'):
try:
in_date_val = datetime.strptime(data['in_date'], '%Y-%m-%d %H:%M:%S').date()
except ValueError:
try:
in_date_val = datetime.strptime(data['in_date'], '%Y-%m-%d').date()
except ValueError:
in_date_val = datetime.utcnow().date()
# 3. 创建 StockBuy
new_stock = StockBuy(
base_id=material.id,
sku=data.get('sku'),
in_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
status='在库',
inspection_status=data.get('inspection_status'),
in_quantity=data.get('in_quantity', 0),
stock_quantity=data.get('in_quantity', 0),
available_quantity=data.get('in_quantity', 0),
warehouse_location=data.get('warehouse_location'),
unit_price=data.get('unit_price', 0),
total_price=data.get('total_price', 0),
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'),
buyer_name=data.get('buyer_name'),
buyer_email=data.get('buyer_email'),
original_link=data.get('original_link'),
detail_link=data.get('detail_link'),
arrival_photo=data.get('arrival_photo')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_inbound(stock_id, data):
"""更新入库:支持级联更新基础信息 + 自动重算总价"""
try:
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
# 1. 更新普通字段
if 'serial_number' in data: stock.serial_number = data['serial_number']
if 'batch_number' in data: stock.batch_number = data['batch_number']
if 'warehouse_location' in data: stock.warehouse_location = data['warehouse_location']
if 'supplier_name' in data: stock.supplier_name = data['supplier_name']
if 'status' in data: stock.status = data['status']
if 'inspection_status' in data: stock.inspection_status = data['inspection_status']
if 'arrival_photo' in data: stock.arrival_photo = data['arrival_photo']
if 'remark' in data: stock.remark = data['remark']
# 2. 级联更新基础信息 (MaterialBase)
if stock.material:
if 'material_name' in data: stock.material.name = data['material_name']
if 'category' in data: stock.material.category = data['category']
if 'unit' in data: stock.material.unit = data['unit']
# 3. 核心逻辑:数量与价格联动
qty_changed = False
price_changed = False
# (A) 数量变更 -> 更新库存和可用量
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
diff = new_qty - old_qty
if diff != 0:
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
# (B) 单价变更
if 'unit_price' in data:
new_price = float(data['unit_price'])
if new_price != float(stock.unit_price):
stock.unit_price = new_price
price_changed = True
# (C) 重算总价
if qty_changed or price_changed:
stock.total_price = float(stock.in_quantity) * float(stock.unit_price)
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_inbound(stock_id):
"""删除逻辑孤儿策略如果MaterialBase无其他引用则一并删除"""
try:
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
# 1. 记下 base_id
material_id = stock.base_id
# 2. 删除库存记录
db.session.delete(stock)
db.session.flush()
# 3. 检查是否还有残留
remaining_count = StockBuy.query.filter_by(base_id=material_id).count()
if remaining_count == 0:
print(f"触发级联删除: MaterialBase ID {material_id} 已无关联,执行清理。")
material = MaterialBase.query.get(material_id)
if material:
db.session.delete(material)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"删除失败: {e}")
raise e
@staticmethod
def get_list(page, limit):
try:
pagination = StockBuy.query.order_by(StockBuy.id.desc()).paginate(page=page, per_page=limit)
items = [item.to_dict() for item in pagination.items]
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"查询列表失败: {e}")
return {"total": 0, "items": []}