python-flask和Vue两种模式初模板

This commit is contained in:
dxc
2026-01-26 17:00:12 +08:00
parent ee9f4aed3e
commit 2f8a5c55b1
36 changed files with 943 additions and 126 deletions

View File

@ -1,7 +1,7 @@
# 文件路径: app/__init__.py
from flask import Flask
from config import Config
from app.extensions import db, ma
from app.extensions import db, migrate, cors
def create_app():
app = Flask(__name__)
@ -9,18 +9,15 @@ def create_app():
# 初始化插件
db.init_app(app)
ma.init_app(app)
migrate.init_app(app, db)
cors.init_app(app) # 允许前端访问
# 【新增关键步骤】: 显式导入 models让 SQLAlchemy 认识所有的表
# 必须放在 db.init_app 之后create_all 或 蓝图注册 之前
from app import models
# 注册蓝图 (Blueprints)
from app.api.v1.stocks import stocks_bp
app.register_blueprint(stocks_bp, url_prefix='/api/v1/stocks')
# 注册路由蓝图
from app.api.v1.stocks import stock_bp
app.register_blueprint(stock_bp, url_prefix='/api/v1')
# 【可选】如果你没有用 Flask-Migrate可以用下面这句话自动建表开发阶段
# with app.app_context():
# db.create_all()
# 可以在这里打印一下路由,方便调试
print("已注册路由:")
print(app.url_map)
return app

View File

@ -1,34 +1,57 @@
from flask import Blueprint, request, jsonify
from app.services.stock_service import create_inbound_stock
from app.schemas.stock_schema import StockBuySchema
from app.services.stock_service import StockService
from app.schemas.stock_schema import stock_buy_schema
stock_bp = Blueprint('stocks', __name__)
stocks_bp = Blueprint('stocks', __name__)
@stock_bp.route('/buy-inbound', methods=['POST'])
def buy_inbound():
"""
采购入库接口
POST /api/v1/buy-inbound
Body: { "material_id": 1, "qty_inbound": 100, "price_unit": 10.5 ... }
"""
# 1. 接收 JSON 数据
@stocks_bp.route('/inbound', methods=['GET'])
def get_inbound_list():
page = request.args.get('page', 1, type=int)
limit = request.args.get('pageSize', 10, type=int)
result = StockService.get_list(page, limit)
return jsonify({
'code': 200,
'msg': 'success',
'data': result
})
@stocks_bp.route('/inbound', methods=['POST'])
def create_inbound():
json_data = request.get_json()
if not json_data:
return jsonify({"message": "No input data provided"}), 400
# 2. 数据校验
schema = StockBuySchema()
try:
# 这一步只做校验,不直接生成对象,因为我们要在 Service 里手动处理逻辑
data = schema.load(json_data, partial=True)
# 1. 参数校验
data = stock_buy_schema.load(json_data)
# 2. 调用业务逻辑
new_stock = StockService.create_inbound(data)
# 3. 返回成功
return jsonify({
'code': 200,
'msg': '入库成功',
'data': new_stock.to_dict()
}), 201
except Exception as e:
return jsonify({"message": "Validation error", "errors": e.messages}), 422
# 捕获 ValueError 或 SQLAlchemyError
return jsonify({'code': 400, 'msg': str(e)}), 400
# 3. 调用业务逻辑
@stocks_bp.route('/inbound/<int:id>', methods=['PUT'])
def update_inbound(id):
json_data = request.get_json()
try:
new_stock = create_inbound_stock(data)
# 4. 返回成功结果
result = schema.dump(new_stock)
return jsonify({"message": "Inbound successful", "data": result}), 201
StockService.update_inbound(id, json_data)
return jsonify({'code': 200, 'msg': '更新成功'})
except Exception as e:
return jsonify({"message": "Internal Server Error", "error": str(e)}), 500
return jsonify({'code': 400, 'msg': str(e)}), 400
@stocks_bp.route('/inbound/<int:id>', methods=['DELETE'])
def delete_inbound(id):
try:
StockService.delete_inbound(id)
return jsonify({'code': 200, 'msg': '删除成功'})
except Exception as e:
return jsonify({'code': 400, 'msg': str(e)}), 400

View File

@ -1,6 +1,8 @@
# 文件路径: app/extensions.py
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from flask_migrate import Migrate
from flask_cors import CORS # 解决前后端跨域问题
# 初始化数据库和序列化工具
db = SQLAlchemy()
ma = Marshmallow()
migrate = Migrate()
cors = CORS()

View File

@ -1,11 +1,34 @@
from app.extensions import db
from datetime import datetime
class MaterialBase(db.Model):
__tablename__ = 'material_base'
id = db.Column(db.Integer, primary_key=True)
sku_code = db.Column(db.String(100), unique=True, nullable=False)
name = db.Column(db.String(255), nullable=False)
spec_model = db.Column(db.String(255))
unit = db.Column(db.String(50))
# 其他字段按需添加,入库时主要是为了外键关联
# 核心字段
sku_code = db.Column(db.String(100), unique=True, nullable=False) # 唯一编码
name = db.Column(db.String(255), nullable=False) # 名称
spec_model = db.Column(db.String(255)) # 规格型号
unit = db.Column(db.String(50)) # 单位
category = db.Column(db.String(100)) # 分类
# 审计字段 (自动记录时间)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系 (让 StockBuy 可以反向找到 Material)
# 这里的 dynamic 允许在 material.stock_buys 时进行进一步过滤
stock_buys = db.relationship('StockBuy', back_populates='material', lazy='dynamic')
def to_dict(self):
"""将对象转换为字典,方便接口返回"""
return {
'id': self.id,
'sku_code': self.sku_code,
'name': self.name,
'spec_model': self.spec_model,
'unit': self.unit,
'category': self.category,
'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None
}

View File

@ -6,20 +6,51 @@ class StockBuy(db.Model):
__tablename__ = 'stock_buy'
id = db.Column(db.Integer, primary_key=True)
# 外键:必须关联一个 MaterialBase 的 ID
material_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
inbound_date = db.Column(db.DateTime, default=datetime.now)
barcode = db.Column(db.String(100))
batch_no = db.Column(db.String(100))
# 数量相关 (使用 Numeric 对应数据库的 NUMERIC)
qty_inbound = db.Column(db.Numeric(19, 4), default=0)
qty_current = db.Column(db.Numeric(19, 4), default=0)
qty_available = db.Column(db.Numeric(19, 4), default=0)
# 业务数据
inbound_date = db.Column(db.DateTime, default=datetime.utcnow) # 入库时间
batch_no = db.Column(db.String(100)) # 批次号
warehouse_loc = db.Column(db.String(100)) # 库位
supplier_name = db.Column(db.String(255)) # 供应商
price_unit = db.Column(db.Numeric(19, 4), default=0)
price_total = db.Column(db.Numeric(19, 4), default=0)
supplier_name = db.Column(db.String(255))
warehouse_loc = db.Column(db.String(100))
# 数量与状态
qty_inbound = db.Column(db.Numeric(19, 4), default=0) # 初始入库量
qty_current = db.Column(db.Numeric(19, 4), default=0) # 当前剩余量
qty_available = db.Column(db.Numeric(19, 4), default=0) # 当前可用量
status = db.Column(db.String(50), default='NORMAL')
# 建立关联,方便查询物料详情
material = db.relationship('MaterialBase', backref='buy_stocks')
# 财务数据
price_unit = db.Column(db.Numeric(19, 4), default=0) # 单价
price_total = db.Column(db.Numeric(19, 4), default=0) # 总价
# 建立与 MaterialBase 的双向关系
material = db.relationship('MaterialBase', back_populates='stock_buys')
def to_dict(self):
"""
序列化方法:
这里做了一个扁平化处理,把关联的 material 里的 name/sku 直接拿出来,
方便前端表格直接显示,不用前端再去拼凑。
"""
return {
'id': self.id,
'material_id': self.material_id,
# 从关联对象获取基础信息
'sku_code': self.material.sku_code if self.material else None,
'material_name': self.material.name if self.material else None,
'spec_model': self.material.spec_model if self.material else None,
'unit': self.material.unit if self.material else None,
'category': self.material.category if self.material else None,
# 本表信息
'inbound_date': self.inbound_date.strftime('%Y-%m-%d %H:%M:%S') if self.inbound_date else None,
'batch_no': self.batch_no,
'warehouse_loc': self.warehouse_loc,
'supplier_name': self.supplier_name,
'qty_inbound': float(self.qty_inbound) if self.qty_inbound else 0,
'price_unit': float(self.price_unit) if self.price_unit else 0,
'price_total': float(self.price_total) if self.price_total else 0,
}

View File

@ -1,14 +1,42 @@
from app.extensions import ma
from app.models.stock import StockBuy
from marshmallow import fields
from marshmallow import Schema, fields, validate, validates_schema, ValidationError
class StockBuySchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = StockBuy
load_instance = True # 反序列化时自动创建模型实例
include_fk = True # 包含外键 material_id
# 必须字段校验
material_id = fields.Integer(required=True)
qty_inbound = fields.Decimal(required=True, as_string=True)
price_unit = fields.Decimal(missing=0, as_string=True)
class StockBuySchema(Schema):
# 只用于输出的字段
id = fields.Int(dump_only=True)
# --- 输入字段 ---
# 1. 核心识别字段
material_id = fields.Int(missing=None) # 如果是老物料可能传ID
sku_code = fields.Str(required=True, error_messages={"required": "SKU编码是必填项"}) # 必填
# 2. 新物料自动建档字段 (如果是新SKU这些需要校验)
material_name = fields.Str(missing=None)
spec_model = fields.Str(missing=None)
unit = fields.Str(missing=None)
category = fields.Str(missing=None)
# 3. 入库业务字段
qty_inbound = fields.Float(required=True, validate=validate.Range(min=0.0001, error="入库数量必须大于0"))
price_unit = fields.Float(missing=0)
inbound_date = fields.DateTime(format='%Y-%m-%d %H:%M:%S')
batch_no = fields.Str(missing='')
warehouse_loc = fields.Str(missing='')
supplier_name = fields.Str(missing='')
@validates_schema
def validate_material_logic(self, data, **kwargs):
"""
自定义校验逻辑:
如果用户没传 material_id说明可能想新建物料。
虽然最终是否新建由 Service 层判断数据库决定,
但这里可以做一个弱校验:尽量让用户填上名字。
"""
pass
# 这里暂时不强制抛出错误,交给 Service 层处理 "SKU不存在且无名字" 的情况
# 实例化 Schema
stock_buy_schema = StockBuySchema()

View File

@ -1,39 +1,119 @@
from app.extensions import db
from app.models.stock import StockBuy
from app.models.material import MaterialBase
from sqlalchemy.exc import SQLAlchemyError
def create_inbound_stock(data):
"""
处理采购入库逻辑
"""
try:
# 1. 计算总价
qty = data.get('qty_inbound')
price = data.get('price_unit', 0)
total = float(qty) * float(price)
class StockService:
@staticmethod
def create_inbound(data):
"""
处理入库逻辑:
1. 根据 SKU 查找物料。
2. 如果没找到,创建新物料 (MaterialBase)。
3. 创建入库单 (StockBuy)。
"""
try:
sku = data.get('sku_code')
material_id = data.get('material_id')
# 2. 创建库存记录
# 注意:入库时,当前库存(current)和可用库存(available)通常等于入库数量
new_stock = StockBuy(
material_id=data['material_id'],
barcode=data.get('barcode'),
batch_no=data.get('batch_no'),
qty_inbound=qty,
qty_current=qty, # 初始:当前=入库
qty_available=qty, # 初始:可用=入库
price_unit=price,
price_total=total,
supplier_name=data.get('supplier_name'),
warehouse_loc=data.get('warehouse_loc'),
inbound_date=data.get('inbound_date') # 如果前端没传Model会默认用当前时间
# --- 第一步:确定 material_id ---
# 如果前端没传 ID或者传了但我们想二次确认都通过 SKU 查一遍
existing_material = MaterialBase.query.filter_by(sku_code=sku).first()
if existing_material:
# 场景 A: 物料已存在 -> 直接使用其 ID
material_id = existing_material.id
else:
# 场景 B: 物料不存在 -> 自动创建新物料
if not data.get('material_name'):
raise ValueError(f"SKU [{sku}] 是新物料,必须填写【物料名称】才能入库。")
new_material = MaterialBase(
sku_code=sku,
name=data.get('material_name'),
spec_model=data.get('spec_model'),
unit=data.get('unit'),
category=data.get('category')
)
db.session.add(new_material)
db.session.flush() # 关键:将对象刷入暂存区,以获取自增的 ID
material_id = new_material.id
# --- 第二步:创建入库单 ---
qty = data.get('qty_inbound')
price = data.get('price_unit', 0)
new_stock = StockBuy(
material_id=material_id,
inbound_date=data.get('inbound_date'),
batch_no=data.get('batch_no'),
warehouse_loc=data.get('warehouse_loc'),
supplier_name=data.get('supplier_name'),
# 数量逻辑:初始时,当前量 = 可用量 = 入库量
qty_inbound=qty,
qty_current=qty,
qty_available=qty,
# 财务逻辑
price_unit=price,
price_total=float(qty) * float(price) if qty else 0
)
db.session.add(new_stock)
db.session.commit() # 统一提交事务
return new_stock
except SQLAlchemyError as e:
db.session.rollback() # 数据库报错回滚
raise e
except ValueError as e:
db.session.rollback() # 业务报错回滚
raise e
@staticmethod
def get_list(page, per_page):
"""获取分页列表"""
pagination = StockBuy.query.order_by(StockBuy.inbound_date.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return {
'items': [item.to_dict() for item in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page
}
@staticmethod
def update_inbound(stock_id, data):
"""更新入库单信息 (通常不允许改物料本身,只改入库相关)"""
stock = StockBuy.query.get_or_404(stock_id)
if 'warehouse_loc' in data: stock.warehouse_loc = data['warehouse_loc']
if 'supplier_name' in data: stock.supplier_name = data['supplier_name']
if 'batch_no' in data: stock.batch_no = data['batch_no']
if 'price_unit' in data: stock.price_unit = data['price_unit']
# 如果修改了数量,需要级联更新当前库存
if 'qty_inbound' in data:
old_qty = float(stock.qty_inbound)
new_qty = float(data['qty_inbound'])
diff = new_qty - old_qty
stock.qty_inbound = new_qty
stock.qty_current = float(stock.qty_current) + diff
stock.qty_available = float(stock.qty_available) + diff
db.session.add(new_stock)
db.session.commit()
return stock
return new_stock
except SQLAlchemyError as e:
db.session.rollback()
raise e
@staticmethod
def delete_inbound(stock_id):
"""删除入库单"""
stock = StockBuy.query.get_or_404(stock_id)
db.session.delete(stock)
db.session.commit()

View File

@ -5,7 +5,7 @@ class Config:
# 数据库连接配置
# 请务必将 '你的密码' 替换为你 PostgreSQL 的真实密码
# 如果数据库不在本地,请将 localhost 替换为 IP 地址
SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:1234@localhost:5432/inventory_system'
SQLALCHEMY_DATABASE_URI = 'postgresql://test:1234@localhost:5432/inventory_system'
# 关闭 SQLAlchemy 的事件追踪,减少内存消耗
SQLALCHEMY_TRACK_MODIFICATIONS = False

View File

@ -1,6 +1,8 @@
# 文件路径: run.py (在项目根目录下,与 config.py 同级)
from app import create_app
app = create_app()
if __name__ == '__main__':
# debug=True 修改代码后会自动重启
app.run(host='0.0.0.0', port=5000, debug=True)