fix(outbound+trans): 修复POST接口错误数据清洗导致的sku/quantity字段被清除Bug,并新增出库审批工作流全链路

This commit is contained in:
DXC
2026-04-28 16:02:34 +08:00
parent 97e7618bf3
commit 62c0e3738e
12 changed files with 1248 additions and 112 deletions

View File

@ -2,7 +2,7 @@ import uuid # .material -> .base refactor checked
from datetime import datetime, timezone, timedelta
from sqlalchemy import or_, func, desc, and_
from app.extensions import db
from app.models.outbound import TransOutbound
from app.models.outbound import TransOutbound, OutboundApproval
# 引入所有库存模型以进行查询
from app.models.inbound.buy import StockBuy
@ -12,6 +12,8 @@ from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
# 引入维修单表
from app.models.transaction import TransRepair
# 引入系统用户表
from app.models.system import SysUser
class OutboundService:
@ -169,6 +171,22 @@ class OutboundService:
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
# ★ 审批单相关逻辑
request_id = data.get('request_id')
approval = None
if request_id:
# 根据 request_id 查询审批单
approval = OutboundApproval.query.get(request_id)
if not approval:
raise ValueError(f"关联的审批单不存在 (ID: {request_id})")
if approval.status != 1:
status_map = {0: '待审批', 1: '已通过', 2: '已驳回', 3: '已完成'}
current_status = status_map.get(approval.status, str(approval.status))
raise ValueError(
f"关联的审批单状态不允许出库 (当前状态: {current_status})"
f"仅已通过的审批单方可执行出库"
)
model_map = {
'stock_buy': StockBuy,
'stock_semi': StockSemi,
@ -190,11 +208,11 @@ class OutboundService:
repair = TransRepair.query.with_for_update().get(stock_id)
if not repair:
raise ValueError(f"维修单不存在 (ID: {stock_id})")
# 更新维修单状态为已出库
repair.repair_status = '已出库'
repair.shipping_date = current_time
# 创建出库记录
new_record = TransOutbound(
sku=item.get('sku'),
@ -235,6 +253,11 @@ class OutboundService:
)
db.session.add(new_record)
# ★ 如果关联了审批单,出库成功后更新审批单状态为"已完成"
if approval:
approval.status = 3 # 3-已完成
# updated_at 会在 commit 时由 SQLAlchemy 自动更新
db.session.commit()
return outbound_no
@ -525,3 +548,336 @@ class OutboundService:
'pages': pagination.pages,
'current_page': page
}
class OutboundApprovalService:
"""出库审批服务"""
@staticmethod
def generate_request_no():
"""
生成审批单号: APR-OUT-yyyyMMdd-HHmm-当日流水(4位)
"""
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
date_str = now.strftime('%Y%m%d')
time_str = now.strftime('%H%M')
prefix = f"APR-OUT-{date_str}-"
from app.models.outbound import OutboundApproval
latest = db.session.query(OutboundApproval.request_no).filter(
OutboundApproval.request_no.like(f"{prefix}%")
).order_by(OutboundApproval.id.desc()).first()
if latest:
last_seq = int(latest[0].split('-')[-1])
sequence = last_seq + 1
else:
sequence = 1
return f"APR-OUT-{date_str}-{time_str}-{sequence:04d}"
@staticmethod
def create_request(applicant_id, items, allowed_approvers, remark=None, approver_id=None):
"""
创建出库审批单(申请阶段,直接存储前端传来的物料信息快照,不关联具体库存记录)
Args:
applicant_id: 申请人ID
items: 出库物品明细列表,每个物品应包含:
- name: 物料名称 (必填)
- spec_model: 规格型号 (必填)
- quantity: 计划出库数量 (必填)
- warehouse_location: 库位 (可选)
- remark: 物品备注 (可选)
allowed_approvers: 允许审批的人员/角色列表
approver_id: 指定审批人ID可选传则覆盖 allowed_approvers
remark: 申请说明
Returns:
OutboundApproval 实例
Raises:
ValueError: 当 items 为空或缺少必填字段时抛出
"""
from app.models.outbound import OutboundApproval
# 校验 items 非空
if not items:
raise ValueError("出库物品列表不能为空")
# 校验每个物品的宏观字段 (name, spec_model, quantity)
required_fields = ['name', 'spec_model', 'quantity']
for idx, item in enumerate(items):
missing_fields = [f for f in required_fields if f not in item or str(item.get(f) or '').strip() == '']
if missing_fields:
raise ValueError(
f"{idx + 1} 条物品缺少必填字段: {', '.join(missing_fields)}"
f"必须包含: name, spec_model, quantity"
)
try:
qty = float(item.get('quantity', 0))
if qty <= 0:
raise ValueError(f"{idx + 1} 条物品的出库数量必须大于0")
except (TypeError, ValueError) as e:
raise ValueError(f"{idx + 1} 条物品的 quantity 格式无效: {str(e)}")
# ★ 校验 allowed_approvers 非空
if not allowed_approvers:
raise ValueError("必须指定至少一位审批人")
# ★ 指定审批人模式approver_id 覆盖 allowed_approvers
if approver_id:
allowed_approvers = [{"type": "user", "value": int(approver_id)}]
request_no = OutboundApprovalService.generate_request_no()
approval = OutboundApproval(
request_no=request_no,
applicant_id=applicant_id,
remark=remark,
status=0, # 待审批
)
# 直接存储前端传来的物料信息快照,不查询/不关联具体库存记录
approval.set_items(items)
approval.set_allowed_approvers(allowed_approvers)
db.session.add(approval)
db.session.commit()
# ★ 创建成功后,发送邮件通知审批人(精确通知 approver_id 对应的邮箱)
OutboundApprovalService._notify_new_request(approval, applicant_id, approver_id=approver_id)
return approval
@staticmethod
def _get_emails_by_identifiers(applicant_id=None, role_codes=None):
"""
根据用户ID或角色列表查询邮箱地址
Args:
applicant_id: 用户ID (按 SysUser.id 查找)
role_codes: 角色代码列表,如 ['ADMIN', 'WAREHOUSE_ADMIN']
Returns:
去重后的邮箱地址列表
"""
emails = []
if applicant_id:
user = SysUser.query.get(int(applicant_id))
if user and user.email:
emails.append(user.email)
if role_codes:
for code in role_codes:
users = SysUser.query.filter_by(role=code).all()
for u in users:
if u.email:
emails.append(u.email)
return list(set(emails))
@staticmethod
def _notify_new_request(approval, applicant_id, approver_id=None):
"""发送新申请通知邮件给审批人(静默处理,不阻断主流程)"""
try:
from flask import current_app
from app.utils.email_service import send_new_request_notify
emails = []
if approver_id:
# ★ 精准通知模式:直接查询指定审批人
user = SysUser.query.get(int(approver_id))
if user and user.email:
emails.append(user.email)
else:
# 兜底:按角色查询
approvers = approval.get_allowed_approvers()
role_codes = []
for a in approvers:
if a.get('type') == 'role':
role_codes.append(a.get('value', ''))
emails = OutboundApprovalService._get_emails_by_identifiers(role_codes=role_codes)
if not emails:
current_app.logger.info(f"[Email] 审批单 {approval.request_no} 无审批人邮箱,跳过通知")
return
# 获取申请人姓名
applicant_name = ''
if applicant_id:
u = SysUser.query.get(applicant_id)
if u:
# username 格式为 "姓名/账号",取姓名部分
applicant_name = str(u.username).split('/')[0] if '/' in u.username else (u.username or str(applicant_id))
send_new_request_notify(
to_emails=emails,
request_no=approval.request_no,
applicant_name=applicant_name,
remark=approval.remark or ''
)
except Exception as e:
# ★ 捕获所有异常,确保邮件发送失败不阻断主流程
try:
from flask import current_app
current_app.logger.error(f"[Email] 发送新申请通知邮件失败: {e}")
except RuntimeError:
# 如果不在 Flask 应用上下文内,降级为标准日志
logging.getLogger(__name__).error(f"[Email] 发送新申请通知邮件失败: {e}")
@staticmethod
def can_approve(approval, user_id, user_role):
"""
检查用户是否有权限审批
Args:
approval: OutboundApproval 实例
user_id: 用户ID
user_role: 用户角色
Returns:
bool, 是否有权限
"""
approvers = approval.get_allowed_approvers()
# 超级管理员可以直接审批
if user_role and user_role.upper() == 'SUPER_ADMIN':
return True
for approver in approvers:
approver_type = approver.get('type', '')
approver_value = approver.get('value', '')
if approver_type == 'user' and str(approver_value) == str(user_id):
return True
if approver_type == 'role' and approver_value == user_role:
return True
return False
@staticmethod
def approve(request_id, user_id, user_role, action='approve', reject_reason=None):
"""
执行审批操作
Args:
request_id: 审批单ID
user_id: 审批人ID
user_role: 审批人角色
action: 'approve' 通过, 'reject' 驳回
reject_reason: 驳回原因
Returns:
(success: bool, message: str, approval: OutboundApproval or None)
"""
from app.models.outbound import OutboundApproval
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
approval = OutboundApproval.query.get(request_id)
if not approval:
return False, "审批单不存在", None
if approval.status != 0:
return False, f"审批单状态已更新,无法重复审批 (当前状态: {approval.status})", None
if not OutboundApprovalService.can_approve(approval, user_id, user_role):
return False, "您没有审批此单的权限", None
try:
if action == 'approve':
approval.status = 1 # 已通过
approval.actual_approver_id = user_id
approval.approved_at = current_time
elif action == 'reject':
approval.status = 2 # 已驳回
approval.reject_reason = reject_reason
else:
return False, "无效的审批操作", None
db.session.commit()
# ★ 审批成功后,发送邮件通知仓库管理员
OutboundApprovalService._notify_approval_result(approval, user_id, action)
return True, "审批成功", approval
except Exception as e:
db.session.rollback()
return False, f"审批失败: {str(e)}", None
@staticmethod
def _notify_approval_result(approval, approver_id, action):
"""发送审批结果通知邮件(静默处理,不阻断主流程)"""
try:
from app.utils.email_service import send_approval_result_notify
# 仓库管理员角色代码
warehouse_role_codes = ['WAREHOUSE_MGR', 'OUTBOUND']
# 查询库管邮箱 + 申请人本人邮箱
emails = OutboundApprovalService._get_emails_by_identifiers(
applicant_id=approval.applicant_id,
role_codes=warehouse_role_codes
)
if not emails:
return
send_approval_result_notify(
to_emails=emails,
request_no=approval.request_no,
is_passed=(action == 'approve'),
reject_reason=approval.reject_reason or ''
)
except Exception as e:
logging.getLogger(__name__).warning(f"[Email] 发送审批结果通知邮件失败: {e}")
@staticmethod
def get_request_list(page=1, per_page=10, applicant_id=None, status=None):
"""
获取审批单列表
Args:
page: 页码
per_page: 每页数量
applicant_id: 按申请人筛选 (可选)
status: 按状态筛选 (可选)
Returns:
分页结果
"""
from app.models.outbound import OutboundApproval
from sqlalchemy import desc
query = OutboundApproval.query
if applicant_id:
query = query.filter(OutboundApproval.applicant_id == applicant_id)
if status is not None:
query = query.filter(OutboundApproval.status == status)
query = query.order_by(desc(OutboundApproval.created_at))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return {
'items': [item.to_dict() for item in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
}
@staticmethod
def get_request_by_id(request_id):
"""根据ID获取审批单"""
from app.models.outbound import OutboundApproval
return OutboundApproval.query.get(request_id)