添加半成品页面进行数据

This commit is contained in:
dxc
2026-01-28 17:44:39 +08:00
parent cd55a6aee1
commit b0df5c7458
16 changed files with 1649 additions and 71 deletions

View File

@ -1,38 +1,53 @@
# 文件路径: inventory-backend/app/__init__.py
from flask import Flask
from config import Config
from app.extensions import db, migrate, cors
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# 初始化插件
# 1. 初始化插件
db.init_app(app)
migrate.init_app(app, db)
# 确保跨域配置正确,允许前端访问
# 确保跨域配置
cors.init_app(app, resources={r"/api/*": {"origins": "*"}})
# --- 注册蓝图 ---
# =========================================================
# 2. 注册蓝图 (Blueprints)
# =========================================================
# 1. 保持原有的 stocks 模块
try:
from app.api.v1.stocks import stocks_bp
app.register_blueprint(stocks_bp, url_prefix='/api/v1/stocks')
except ImportError as e:
print(f"⚠️ 警告: 原有 stocks 蓝图导入失败: {e}")
# 2. 注册新的入库聚合蓝图
# 核心:必须先导入,再注册。路径对应 app/api/v1/inbound/__init__.py
# 注册入库聚合模块 (Inbound)
try:
# 指向聚合文件: app/api/v1/inbound/__init__.py
from app.api.v1.inbound import inbound_bp
# 最终路径结构:/api/v1/inbound/buy/list
app.register_blueprint(inbound_bp, url_prefix='/api/v1/inbound')
print("✅ 入库模块蓝图注册成功")
except ImportError as e:
print(f"❌ 严重错误: 入库模块 inbound 蓝图导入失败: {e}")
# 打印路由映射,仅在本地调试时建议开启
# with app.app_context():
# print(app.url_map)
# 注册父蓝图,路由前缀为 /api/v1/inbound
# 最终路由效果:
# /api/v1/inbound + /buy/list -> /api/v1/inbound/buy/list
# /api/v1/inbound + /semi/list -> /api/v1/inbound/semi/list
app.register_blueprint(inbound_bp, url_prefix='/api/v1/inbound')
print("✅ Inbound (Buy & Semi) 模块注册成功")
except ImportError as e:
print(f"❌ 错误: Inbound 模块导入失败: {e}")
# =========================================================
# 3. 预加载数据模型 (解决 relationship 找不到模型的问题)
# =========================================================
with app.app_context():
try:
# ✅ 修正点:引用新路径 (不再引用 app.models.stock)
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.material import MaterialBase
# 如果是开发环境且没有迁移文件,可以取消注释下面这行来创建表
# db.create_all()
except ImportError as e:
print(f"⚠️ 模型预加载失败: {e}")
return app

View File

@ -1,18 +1,22 @@
from flask import Blueprint
# 1. 导入同目录下的 buy 模块 (假设文件名为 buy.py)
# 1. 导入子模块蓝图
# 注意:确保 .buy, .semi, .base 文件在同级目录下真实存在
from .buy import inbound_buy_bp
from .semi import inbound_semi_bp
# 2. 【关键修改】导入同目录下的 base 模块
# 使用相对导入 .base这样 Python 就会去 app/api/v1/inbound/base.py 找
# 如果你还有 base.py 文件,就取消注释下面这行
from .base import inbound_base_bp
# 创建父级蓝图 'inbound'
# 2. 创建父级聚合蓝图
inbound_bp = Blueprint('inbound', __name__)
# 3. 挂载子蓝图
# 最终路由将是: /api/v1/inbound/buy/...
# 访问地址: /api/v1/inbound/buy/list
inbound_bp.register_blueprint(inbound_buy_bp, url_prefix='/buy')
# 最终路由将是: /api/v1/inbound/base/...
# 访问地址: /api/v1/inbound/semi/list
inbound_bp.register_blueprint(inbound_semi_bp, url_prefix='/semi')
# 如果有 base
inbound_bp.register_blueprint(inbound_base_bp, url_prefix='/base')

View File

@ -0,0 +1,91 @@
from flask import Blueprint, request, jsonify
from app.services.inbound.semi_service import SemiInboundService
import traceback
# 定义蓝图url_prefix 通常在注册蓝图时指定,例如 /api/v1/inbound/semi
inbound_semi_bp = Blueprint('inbound_semi', __name__)
# ------------------------------------------------------------------
# 0. 基础物料搜索 (复用逻辑)
# ------------------------------------------------------------------
@inbound_semi_bp.route('/search-base', methods=['GET'])
def search_base():
"""
供前端下拉框远程搜索使用 (搜索半成品类型的基础物料)
Query Param: keyword (名称或规格)
"""
try:
keyword = request.args.get('keyword', '')
# 这里复用 Service 中的搜索逻辑
data = SemiInboundService.search_base_material(keyword)
return jsonify({
"code": 200,
"msg": "success",
"data": data
})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 1. 获取半成品列表
# ------------------------------------------------------------------
@inbound_semi_bp.route('/list', methods=['GET'])
def get_list():
try:
page = request.args.get('page', 1, type=int)
limit = request.args.get('pageSize', 15, type=int)
# 支持按关键字搜索BOM号、工单号、SN、批号等
keyword = request.args.get('keyword', '')
result = SemiInboundService.get_list(page, limit, keyword)
return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 2. 新增半成品入库
# ------------------------------------------------------------------
@inbound_semi_bp.route('/submit', methods=['POST'])
def submit():
try:
data = request.get_json()
if not data:
return jsonify({"code": 400, "msg": "No data"}), 400
SemiInboundService.handle_inbound(data)
return jsonify({"code": 200, "msg": "入库成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 3. 更新半成品入库信息
# ------------------------------------------------------------------
@inbound_semi_bp.route('/<int:id>', methods=['PUT'])
def update_semi(id):
try:
data = request.get_json()
SemiInboundService.update_inbound(id, data)
return jsonify({"code": 200, "msg": "更新成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 4. 删除半成品入库记录
# ------------------------------------------------------------------
@inbound_semi_bp.route('/<int:id>', methods=['DELETE'])
def delete_semi(id):
try:
SemiInboundService.delete_inbound(id)
return jsonify({"code": 200, "msg": "删除成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500

View File

@ -1,2 +1,13 @@
# app/models/__init__.py
# 1. 基础物料
from app.models.material import MaterialBase
from app.models.stock import StockBuy
# 2. 采购入库 (指向新路径)
from app.models.inbound.buy import StockBuy
# 3. 半成品入库 (指向新路径)
from app.models.inbound.semi import StockSemi
# 如果有其他模型 (比如 sys_user 等),保留它们
# from app.models.sys_user import SysUser

View File

@ -1,4 +1,4 @@
# app/models/stock.py
# app/models/buy.py
from app.extensions import db
from datetime import datetime

View File

@ -0,0 +1,143 @@
# app/models/inbound/semi.py
from app.extensions import db
from datetime import datetime
class StockSemi(db.Model):
"""
半成品入库库存表
对应数据库表: stock_semi
"""
__tablename__ = 'stock_semi'
# =========================================================
# 1. 基础字段 (Strictly matching SQL Schema)
# =========================================================
# 主键
id = db.Column(db.Integer, primary_key=True)
# 外键关联 material_base 表
base_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
# 身份标识
sku = db.Column(db.String(100))
# SQL字段名为 production_date, 对应前端的 "入库日期/生产日期"
production_date = db.Column(db.Date)
barcode = db.Column(db.String(100)) # 条码
serial_number = db.Column(db.String(100)) # 序列号
# 注意:提供的 SQL 中 stock_semi 没有 batch_number 字段,这里不定义,以免报错。
# 如果后续数据库加上了该字段,请取消下方注释:
# batch_number = db.Column(db.String(100))
# --- 数量 ---
in_quantity = db.Column(db.Numeric(19, 4), default=0)
stock_quantity = db.Column(db.Numeric(19, 4), default=0)
available_quantity = db.Column(db.Numeric(19, 4), default=0)
# --- 状态与位置 ---
status = db.Column(db.String(50)) # 在库/出库/损耗
warehouse_location = db.Column(db.String(100)) # 仓库位
# =========================================================
# 2. 半成品特有字段 (SQL 字段映射)
# =========================================================
# BOM 相关
# 数据库列名: bom_id, Python属性: bom_code (为了适配前端习惯)
bom_code = db.Column('bom_id', db.String(100))
bom_version = db.Column(db.String(50))
# 工单 相关
# 数据库列名: work_order_id, Python属性: work_order_code
work_order_code = db.Column('work_order_id', db.String(100))
# 成本 相关
raw_material_cost = db.Column(db.Numeric(19, 4), default=0) # 原材料成本
manual_cost = db.Column(db.Numeric(19, 4), default=0) # 手动/人工成本
# 生产信息
# 数据库列名: producer_name, Python属性: production_manager
production_manager = db.Column('producer_name', db.String(100))
# 生产起止时间 (SQL定义为 VARCHAR(255))
production_time_range = db.Column(db.String(255))
# 质量与链接
quality_status = db.Column(db.String(50)) # 质量状态
quality_report_link = db.Column(db.Text) # 质量报告链接
detail_link = db.Column(db.Text) # 详细信息链接
# =========================================================
# 3. 关系定义
# =========================================================
# 建立与 MaterialBase 的关系
# 注意:确保 MaterialBase 模型中定义了 back_populates='stock_semis'
material = db.relationship('MaterialBase', back_populates='stock_semis')
def to_dict(self):
"""
序列化将模型转换为字典供API返回JSON使用
在这里处理字段名称转换,确保前端能正确显示数据
"""
# 计算单件总成本 (原料 + 人工)
raw_val = float(self.raw_material_cost or 0)
man_val = float(self.manual_cost or 0)
unit_total = raw_val + man_val
return {
'id': self.id,
'base_id': self.base_id,
# --- 级联基础信息 (防止 None 报错) ---
'material_name': self.material.name if self.material else '',
'spec_model': self.material.spec_model if self.material else '',
'category': self.material.category if self.material else '',
'unit': self.material.unit if self.material else '',
'material_type': self.material.material_type if self.material else '',
# --- 实体信息 ---
'sku': self.sku,
# 将 production_date 映射回前端通用的 inbound_date
'inbound_date': self.production_date.strftime('%Y-%m-%d') if self.production_date else '',
'barcode': self.barcode,
'serial_number': self.serial_number,
# 'batch_number': self.batch_number, # SQL无此字段暂不返回
'warehouse_loc': self.warehouse_location,
'status': self.status,
# --- 数量 (转为float防止json序列化报错) ---
'in_quantity': float(self.in_quantity or 0),
'qty_inbound': float(self.in_quantity or 0), # 兼容字段
'stock_quantity': float(self.stock_quantity or 0),
'qty_stock': float(self.stock_quantity or 0), # 兼容字段
'available_quantity': float(self.available_quantity or 0),
'qty_available': float(self.available_quantity or 0), # 兼容字段
# --- 半成品特有数据 ---
'bom_code': self.bom_code,
'bom_version': self.bom_version,
'work_order_code': self.work_order_code,
'raw_material_cost': raw_val,
'manual_cost': man_val,
'unit_total_cost': unit_total, # 前端展示总成本用
'production_manager': self.production_manager,
# 时间范围 (SQL存的是字符串直接返回即可或者根据需要拆分)
# 如果 service 层存的是 "Start ~ End",这里直接返回
'production_time_range': self.production_time_range,
# 为了兼容前端分开的时间字段(如果有):
'production_start_time': self.production_time_range.split(' ~ ')[
0] if self.production_time_range and ' ~ ' in self.production_time_range else '',
'production_end_time': self.production_time_range.split(' ~ ')[
1] if self.production_time_range and ' ~ ' in self.production_time_range else '',
'quality_status': self.quality_status,
'quality_report_link': self.quality_report_link,
'detail_link': self.detail_link
}

View File

@ -10,7 +10,7 @@ class MaterialBase(db.Model):
"""
__tablename__ = 'material_base'
# 1. 基础字段 (必须与 SQL 建表语句完全一致)
# 1. 基础字段 (保持不变)
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False, comment='基础信息名称')
@ -32,26 +32,31 @@ class MaterialBase(db.Model):
manual_link = db.Column(db.Text, comment='通用说明书链接')
product_image = db.Column(db.Text, comment='通用产品图链接')
# 启用状态 (注意SQL中是 boolean)
# 启用状态
is_enabled = db.Column(db.Boolean, default=True, comment='是否启用')
# ============================================================
# ⚠️ 注意:你之前提供的 SQL 建表语句中【没有】下面这两个时间字段。
# 如果数据库里没有这两列,代码运行到这里会报错 (UndefinedColumn)。
# 我先将其注释掉。如果你确认数据库已经 Alter Table 加了这两列,请取消注释。
# 时间字段 (保持你原本的注释状态,以免报错)
# ============================================================
# create_time = db.Column(db.DateTime, default=datetime.utcnow)
# update_time = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 【核心关联】
# 关联采购库存表 (StockBuy)lazy='dynamic' 允许后续做 .count() 统计
# 确保 app/models/stock.py 中有 back_populates='material'
# ============================================================
# 关联关系区域 (修改重点)
# ============================================================
# 1. 关联采购库存 (StockBuy) - 保持不变
# 注意:确保 app/models/inbound/buy.py 中的 StockBuy 定义了 back_populates='material'
stock_buys = db.relationship('StockBuy', back_populates='material', lazy='dynamic')
# 2. 【新增】关联半成品库存 (StockSemi)
# 注意:确保 app/models/inbound/semi.py 中的 StockSemi 定义了 back_populates='material'
# 这样以后可以通过 material.stock_semis 来访问该物料下的所有半成品库存记录
stock_semis = db.relationship('StockSemi', back_populates='material', lazy='dynamic')
def to_dict(self):
"""
序列化方法将模型转换为字典供API返回JSON使用
这里是解决【前端表格空白】最关键的地方
"""
return {
'id': self.id,
@ -59,7 +64,7 @@ class MaterialBase(db.Model):
'category': self.category,
# =========================================
# 关键映射区 (解决前后端字段名不一致问题)
# 关键映射区 (保持不变)
# =========================================
# 数据库叫 material_type -> 前端叫 type
'type': self.material_type,
@ -74,11 +79,9 @@ class MaterialBase(db.Model):
'generalManual': self.manual_link,
'generalImage': self.product_image,
# 状态处理:前端 Switch 通常接受 boolean 或 1/0
# 数据库里的 true -> 返回 1 (启用)
# 数据库里的 false/None -> 返回 0 (禁用)
# 状态处理
'isEnabled': 1 if self.is_enabled else 0,
# 如果上方注释了 create_time这里也要注释否则会报错
# 时间字段保持注释
# 'createTime': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if hasattr(self, 'create_time') and self.create_time else None
}

View File

@ -1,18 +1,62 @@
# 文件路径: app/services/inbound/base_service.py
from app.extensions import db
from app.models.material import MaterialBase
from app.models.stock import StockBuy # 需要引入库存表做删除时的依赖检查
# ==============================================================================
# ✅ 正确的引用方式
# ==============================================================================
from app.models.inbound.buy import StockBuy # 引用采购库存模型
from app.models.inbound.semi import StockSemi # 引用半成品库存模型
from sqlalchemy import or_
import traceback
class MaterialBaseService:
"""
基础物料服务层
负责处理 MaterialBase 的增删改查及搜索逻辑
"""
@staticmethod
def search_material(keyword):
"""
根据关键字搜索已启用的基础物料
(供 /api/v1/inbound/base/search 接口调用)
"""
try:
if not keyword:
return []
# 搜索名称或规格型号,且必须是启用的
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def get_list(page, limit, filters=None):
"""
获取基础信息列表
:param page: 页码
:param limit: 每页条数
:param filters: 筛选条件字典 {keyword, category, type, isEnabled}
获取基础信息列表 (带分页和筛选)
"""
try:
query = MaterialBase.query
@ -46,7 +90,6 @@ class MaterialBaseService:
except Exception as e:
print(f"查询基础信息列表失败: {e}")
# 生产环境建议记录日志
return {"total": 0, "items": []}
@staticmethod
@ -65,16 +108,16 @@ class MaterialBaseService:
if exist:
raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})")
# 2. 创建对象 (注意前端驼峰 -> 后端下划线映射)
# 2. 创建对象
new_material = MaterialBase(
name=data['name'],
spec_model=data['spec'], # 映射
spec_model=data['spec'],
category=data.get('category'),
material_type=data.get('type'), # 映射
material_type=data.get('type'),
unit=data.get('unit'),
visibility_level=data.get('visibilityLevel'), # 映射
manual_link=data.get('generalManual'), # 映射
product_image=data.get('generalImage'), # 映射
visibility_level=data.get('visibilityLevel'),
manual_link=data.get('generalManual'),
product_image=data.get('generalImage'),
is_enabled=True if data.get('isEnabled', 1) == 1 else False
)
@ -94,7 +137,7 @@ class MaterialBaseService:
if not material:
raise ValueError("数据不存在")
# 更新字段 (仅更新传入的字段)
# 更新字段
if 'name' in data: material.name = data['name']
if 'spec' in data: material.spec_model = data['spec']
if 'category' in data: material.category = data['category']
@ -116,19 +159,32 @@ class MaterialBaseService:
@staticmethod
def delete_material(m_id):
"""删除基础信息 (带依赖检查)"""
"""
删除基础信息 (带依赖检查)
✅ 已升级:同时检查采购库(Buy)和半成品库(Semi)
"""
try:
material = MaterialBase.query.get(m_id)
if not material:
raise ValueError("数据不存在")
# 1. 依赖检查:如果该基础信息已经在库存表(StockBuy)中使用,禁止物理删除
# 这里假设 StockBuy 表有一个外键或字段指向 MaterialBase (e.g., base_id)
usage_count = StockBuy.query.filter_by(base_id=m_id).count()
if usage_count > 0:
raise ValueError(f"无法删除:该基础信息已被 {usage_count} 条库存记录引用,请先清理库存或仅禁用此条目。")
# 1. 依赖检查:采购入库引用
buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count()
# 2. 执行删除
# 2. 依赖检查:半成品入库引用
semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count()
total_usage = buy_usage_count + semi_usage_count
if total_usage > 0:
raise ValueError(
f"无法删除:该基础物料正被使用中。\n"
f"- 采购库存记录: {buy_usage_count}\n"
f"- 半成品库存记录: {semi_usage_count}\n"
f"请先清理相关库存或仅‘禁用’此条目。"
)
# 3. 执行删除
db.session.delete(material)
db.session.commit()
return True

View File

@ -1,8 +1,8 @@
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.material import MaterialBase
from app.models.stock import StockBuy
from datetime import datetime
from sqlalchemy import or_, func # 引入 func 用于聚合计算
from sqlalchemy import or_, func
import traceback

View File

@ -0,0 +1,339 @@
from app.extensions import db
from app.models.material import MaterialBase
from app.models.inbound.semi import StockSemi
from datetime import datetime
from sqlalchemy import or_, func
import traceback
class SemiInboundService:
@staticmethod
def search_base_material(keyword):
"""
搜索基础物料,逻辑与采购入库基本一致。
如果需要只搜索'半成品'类型的物料,可以在 filter 中增加条件。
"""
try:
if not keyword:
return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料 (缺少 base_id)")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
# 1. 处理入库日期
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
date_str = str(data['in_date'])[:10]
in_date_val = datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass
# 2. 处理生产时间 (前端传来的是字符串 "YYYY-MM-DD HH:mm:ss")
p_start = None
p_end = None
if data.get('production_start_time'):
try:
p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
if data.get('production_end_time'):
try:
p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
# 3. 处理数值和成本
in_qty = float(data.get('in_quantity') or 0)
raw_cost = float(data.get('raw_material_cost') or 0)
manual_cost = float(data.get('manual_cost') or 0)
# 单件总成本 = 原料 + 人工
unit_total_cost = raw_cost + manual_cost
# 总价值 = 单件总成本 * 数量
total_value = unit_total_cost * in_qty
# 4. 创建记录
new_stock = StockSemi(
base_id=material.id,
sku=data.get('sku'),
in_date=in_date_val,
# 标识信息
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
barcode=data.get('barcode'),
# 状态与数量
status='在库',
quality_status=data.get('quality_status', '合格'), # 半成品使用质量状态
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
warehouse_location=data.get('warehouse_location'),
# 生产任务信息 (半成品特有)
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_start_time=p_start,
production_end_time=p_end,
# 成本信息 (半成品特有)
raw_material_cost=raw_cost,
manual_cost=manual_cost,
unit_total_cost=unit_total_cost,
total_price=total_value, # 数据库字段可能复用 total_price 或叫 total_value
# 链接
quality_report_link=data.get('quality_report_link'),
detail_link=data.get('detail_link'),
remark=data.get('remark')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_inbound(stock_id, data):
try:
print(f"----- UPDATE SEMI DEBUG: ID={stock_id} -----")
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
# 1. 简单字段映射
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number',
'batch_number': 'batch_number',
'status': 'status',
'quality_status': 'quality_status', # 质量状态
# 生产信息
'bom_code': 'bom_code',
'bom_version': 'bom_version',
'work_order_code': 'work_order_code',
'production_manager': 'production_manager',
'quality_report_link': 'quality_report_link',
'detail_link': 'detail_link'
}
for frontend_key, db_attr in field_mapping.items():
if frontend_key in data:
setattr(stock, db_attr, data[frontend_key])
# 2. 处理时间更新
if 'production_start_time' in data:
try:
if data['production_start_time']:
stock.production_start_time = datetime.strptime(str(data['production_start_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_start_time = None
except:
pass
if 'production_end_time' in data:
try:
if data['production_end_time']:
stock.production_end_time = datetime.strptime(str(data['production_end_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_end_time = None
except:
pass
# 3. 处理数量和成本变更联动
qty_changed = False
cost_changed = False
# 更新数量
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
old_qty = float(stock.in_quantity)
if new_qty != old_qty:
diff = new_qty - old_qty
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
# 更新成本 (原材料 or 人工)
if 'raw_material_cost' in data:
stock.raw_material_cost = float(data['raw_material_cost'])
cost_changed = True
if 'manual_cost' in data:
stock.manual_cost = float(data['manual_cost'])
cost_changed = True
# 如果成本或数量变了,重新计算单价和总价
if cost_changed:
stock.unit_total_cost = float(stock.raw_material_cost) + float(stock.manual_cost)
if cost_changed or qty_changed:
stock.total_price = float(stock.in_quantity) * float(stock.unit_total_cost)
db.session.commit()
print("----- UPDATE SEMI SUCCESS -----")
return stock
except Exception as e:
db.session.rollback()
print(f"----- UPDATE SEMI FAILED: {str(e)} -----")
traceback.print_exc()
raise e
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page, limit, keyword=None):
try:
# 1. 查询分页数据
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockSemi.batch_number.ilike(f'%{keyword}%'),
StockSemi.serial_number.ilike(f'%{keyword}%'),
StockSemi.sku.ilike(f'%{keyword}%'),
# 增加半成品特有的搜索字段
StockSemi.work_order_code.ilike(f'%{keyword}%'),
StockSemi.bom_code.ilike(f'%{keyword}%')
)
)
pagination = query.order_by(StockSemi.id.desc()).paginate(page=page, per_page=limit, error_out=False)
# 2. 聚合统计 (计算该物料的总库存)
current_items = pagination.items
base_ids = list(set([item.base_id for item in current_items if item.base_id]))
stock_map = {}
if base_ids:
aggregates = db.session.query(
StockSemi.base_id,
func.sum(StockSemi.stock_quantity).label('total_stock'),
func.sum(StockSemi.available_quantity).label('total_avail')
).filter(StockSemi.base_id.in_(base_ids)).group_by(StockSemi.base_id).all()
for agg in aggregates:
stock_map[agg.base_id] = {
'total_stock': float(agg.total_stock or 0),
'total_avail': float(agg.total_avail or 0)
}
items = []
for item in current_items:
mat_name = item.material.name if item.material else '未知物料'
mat_spec = item.material.spec_model if item.material else ''
mat_cat = item.material.category if item.material else ''
mat_unit = item.material.unit if item.material else ''
mat_type = item.material.material_type if item.material else ''
stats = stock_map.get(item.base_id, {'total_stock': 0, 'total_avail': 0})
d = {
'id': item.id,
'base_id': item.base_id,
'material_name': mat_name,
'spec_model': mat_spec,
'category': mat_cat,
'unit': mat_unit,
'material_type': mat_type,
'sku': item.sku,
'inbound_date': str(item.in_date) if item.in_date else '',
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
'status': item.status,
'quality_status': item.quality_status, # 半成品使用质量状态
'qty_inbound': float(item.in_quantity or 0),
'qty_stock': float(item.stock_quantity or 0),
'qty_available': float(item.available_quantity or 0),
'sum_stock': stats['total_stock'],
'sum_available': stats['total_avail'],
'warehouse_loc': item.warehouse_location,
# 半成品特有字段返回
'bom_code': item.bom_code,
'bom_version': item.bom_version,
'work_order_code': item.work_order_code,
'raw_material_cost': float(item.raw_material_cost or 0),
'manual_cost': float(item.manual_cost or 0),
'unit_total_cost': float(item.unit_total_cost or 0),
'production_manager': item.production_manager,
'production_start_time': str(item.production_start_time) if item.production_start_time else '',
'production_end_time': str(item.production_end_time) if item.production_end_time else '',
'quality_report_link': item.quality_report_link,
'detail_link': item.detail_link,
'remark': item.remark
}
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"List Error: {e}")
traceback.print_exc()
return {"total": 0, "items": []}