import uuid # .material -> .base refactor checked from datetime import datetime, timezone, timedelta from sqlalchemy import or_, func, desc, and_ from app.extensions import db from app.models.outbound import TransOutbound, OutboundApproval # 引入所有库存模型以进行查询 from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct # 引入基础信息表 from app.models.base import MaterialBase # 引入维修单表 from app.models.transaction import TransRepair # 引入系统用户表 from app.models.system import SysUser class OutboundService: @staticmethod def generate_outbound_no(): """ 生成出库单号: OUT-yyyyMMdd-HHmm-当日流水(4位) 例如: OUT-20260205-1558-0001 """ beijing_tz = timezone(timedelta(hours=8)) now = datetime.now(beijing_tz) date_str = now.strftime('%Y%m%d') time_str = now.strftime('%H%M') prefix = f"OUT-{date_str}-" existing_count = db.session.query(func.count(func.distinct(TransOutbound.outbound_no))) \ .filter(TransOutbound.outbound_no.like(f"{prefix}%")).scalar() sequence = existing_count + 1 return f"OUT-{date_str}-{time_str}-{sequence:04d}" @staticmethod def get_stock_by_barcode(barcode): """ 根据扫码内容查找对应的库存物品,并附带价格信息 """ if not barcode: return None clean_code = barcode.strip() def get_price(item, table_type): if table_type == 'stock_product': return float(item.sale_price) if item.sale_price else 0 elif table_type == 'stock_buy': return float(item.pre_tax_unit_price) if item.pre_tax_unit_price else 0 return 0 prod = StockProduct.query.filter( or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code) ).first() if prod: res = OutboundService._format_scan_result(prod, 'stock_product') res['price'] = get_price(prod, 'stock_product') return res semi = StockSemi.query.filter( or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code) ).first() if semi: res = OutboundService._format_scan_result(semi, 'stock_semi') res['price'] = 0 return res buy = StockBuy.query.filter( or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code) ).first() if buy: res = OutboundService._format_scan_result(buy, 'stock_buy') res['price'] = get_price(buy, 'stock_buy') return res # 查询维修单表 (按SKU或序列号查询,排除已出库状态) repair = TransRepair.query.filter( or_(TransRepair.sku == clean_code, TransRepair.serial_number == clean_code) ).filter( TransRepair.repair_status != '已出库' ).first() if repair: res = { 'id': repair.id, 'sku': repair.sku, 'name': repair.material_name or "维修件", 'spec_model': "", 'category': "", 'material_type': "", 'source_table': 'trans_repair', 'stock_quantity': 1, 'available_quantity': 1, 'batch_number': repair.serial_number or '', 'serial_number': repair.serial_number or '', 'warehouse_location': repair.customer_location or '', 'barcode': repair.sku, 'price': float(repair.sale_price) if repair.sale_price else 0 } return res return None @staticmethod def _format_scan_result(item, table_name): base_name = "" base_spec = "" base_cat = "" base_type = "" if hasattr(item, 'base') and item.base: base_name = item.base.name base_spec = item.base.spec_model base_cat = item.base.category base_type = item.base.material_type if not base_name and hasattr(item, 'base_id') and item.base_id: try: base_info = MaterialBase.query.get(item.base_id) if base_info: base_name = base_info.name base_spec = base_info.spec_model base_cat = base_info.category base_type = base_info.material_type except Exception: pass if not base_name and hasattr(item, 'base') and item.base: base_name = item.base.name stock_qty = float(item.stock_quantity) if item.stock_quantity else 0 avail_qty = float(item.available_quantity) if item.available_quantity else 0 return { 'id': item.id, 'sku': item.sku, 'name': base_name or "未知物品", 'spec_model': base_spec or "", 'category': base_cat or "", 'material_type': base_type or "", 'source_table': table_name, 'stock_quantity': stock_qty, 'available_quantity': avail_qty, 'batch_number': getattr(item, 'batch_number', ''), 'warehouse_location': getattr(item, 'warehouse_location', ''), 'barcode': getattr(item, 'barcode', '') } @staticmethod def create_outbound_batch(data, operator_name='System'): items = data.get('items', []) if not items: raise ValueError("出库商品列表不能为空") outbound_no = OutboundService.generate_outbound_no() common_data = { 'outbound_no': outbound_no, 'consumer_name': data.get('consumer_name'), 'outbound_type': data.get('outbound_type', 'SALES'), 'signature_path': data.get('signature_path'), 'operator_name': operator_name, 'remark': data.get('remark') } beijing_tz = timezone(timedelta(hours=8)) current_time = datetime.now(beijing_tz).replace(tzinfo=None) # ★ 审批单相关逻辑 request_id = data.get('request_id') approval = None if request_id: # 根据 request_id 查询审批单 approval = OutboundApproval.query.get(request_id) if not approval: raise ValueError(f"关联的审批单不存在 (ID: {request_id})") if approval.status != 1: status_map = {0: '待审批', 1: '已通过', 2: '已驳回', 3: '已完成'} current_status = status_map.get(approval.status, str(approval.status)) raise ValueError( f"关联的审批单状态不允许出库 (当前状态: {current_status})," f"仅已通过的审批单方可执行出库" ) model_map = { 'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct } try: for item in items: source_table = item.get('source_table') stock_id = item.get('stock_id') quantity = float(item.get('quantity', 0)) unit_price = float(item.get('price', 0)) if quantity <= 0: raise ValueError(f"SKU {item.get('sku')} 的出库数量必须大于0") # 处理维修单出库 if source_table == 'trans_repair': repair = TransRepair.query.with_for_update().get(stock_id) if not repair: raise ValueError(f"维修单不存在 (ID: {stock_id})") # 更新维修单状态为已出库 repair.repair_status = '已出库' repair.shipping_date = current_time # 创建出库记录 new_record = TransOutbound( sku=item.get('sku'), source_table=source_table, stock_id=stock_id, barcode=item.get('barcode'), quantity=quantity, unit_price=unit_price, outbound_time=current_time, **common_data ) db.session.add(new_record) continue ModelClass = model_map.get(source_table) if not ModelClass: continue stock_record = ModelClass.query.with_for_update().get(stock_id) if not stock_record: raise ValueError(f"库存记录不存在 (ID: {stock_id})") if float(stock_record.available_quantity) < quantity: raise ValueError(f"SKU {stock_record.sku} 库存不足,当前可用: {stock_record.available_quantity}") stock_record.stock_quantity = float(stock_record.stock_quantity) - quantity stock_record.available_quantity = float(stock_record.available_quantity) - quantity new_record = TransOutbound( sku=item.get('sku'), source_table=source_table, stock_id=stock_id, barcode=item.get('barcode'), quantity=quantity, unit_price=unit_price, outbound_time=current_time, **common_data ) db.session.add(new_record) # ★ 如果关联了审批单,出库成功后更新审批单状态为"已完成" if approval: approval.status = 3 # 3-已完成 # updated_at 会在 commit 时由 SQLAlchemy 自动更新 db.session.commit() return outbound_no except Exception as e: db.session.rollback() raise e @staticmethod def get_grouped_list(page=1, per_page=10, keyword=None, search_type='all', start_date=None, end_date=None): """ 查询出库记录(按出库单号分组),包含详细物品信息 支持跨表搜索:单号、领用人、SKU、物料名称、规格型号 search_type: all, no, name, sku, material_name, spec_model """ # 日期补全:解决零点截断问题 if end_date and len(str(end_date).strip()) == 10: end_date = f"{str(end_date).strip()} 23:59:59" if start_date and len(str(start_date).strip()) == 10: start_date = f"{str(start_date).strip()} 00:00:00" # 1. 构建基础查询 # 如果有关键词,需要联表搜索物料名称和规格型号 if keyword: # 根据 search_type 构建不同的搜索条件 if search_type == 'all': # 原有逻辑:or_ 联表全局模糊搜索 # 查询 stock_buy 路径匹配的名称/规格 buy_match = db.session.query(TransOutbound.outbound_no).join( StockBuy, and_( TransOutbound.stock_id == StockBuy.id, TransOutbound.source_table == 'stock_buy' ) ).join( MaterialBase, StockBuy.base_id == MaterialBase.id ).filter( or_( MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') ) ).subquery() # 查询 stock_semi 路径匹配的名称/规格 semi_match = db.session.query(TransOutbound.outbound_no).join( StockSemi, and_( TransOutbound.stock_id == StockSemi.id, TransOutbound.source_table == 'stock_semi' ) ).join( MaterialBase, StockSemi.base_id == MaterialBase.id ).filter( or_( MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') ) ).subquery() # 查询 stock_product 路径匹配的名称/规格 product_match = db.session.query(TransOutbound.outbound_no).join( StockProduct, and_( TransOutbound.stock_id == StockProduct.id, TransOutbound.source_table == 'stock_product' ) ).join( MaterialBase, StockProduct.base_id == MaterialBase.id ).filter( or_( MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') ) ).subquery() # 合并三种来源的匹配单号 all_matches = db.session.query(buy_match.c.outbound_no).union( db.session.query(semi_match.c.outbound_no), db.session.query(product_match.c.outbound_no) ).subquery() keyword_conditions = or_( TransOutbound.outbound_no.ilike(f'%{keyword}%'), TransOutbound.consumer_name.ilike(f'%{keyword}%'), TransOutbound.sku.ilike(f'%{keyword}%'), TransOutbound.outbound_no.in_(all_matches) ) elif search_type == 'no': keyword_conditions = TransOutbound.outbound_no.ilike(f'%{keyword}%') elif search_type == 'name': keyword_conditions = TransOutbound.consumer_name.ilike(f'%{keyword}%') elif search_type == 'sku': keyword_conditions = TransOutbound.sku.ilike(f'%{keyword}%') elif search_type == 'material_name': # 联表查询物料名称 buy_match = db.session.query(TransOutbound.outbound_no).join( StockBuy, and_( TransOutbound.stock_id == StockBuy.id, TransOutbound.source_table == 'stock_buy' ) ).join( MaterialBase, StockBuy.base_id == MaterialBase.id ).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery() semi_match = db.session.query(TransOutbound.outbound_no).join( StockSemi, and_( TransOutbound.stock_id == StockSemi.id, TransOutbound.source_table == 'stock_semi' ) ).join( MaterialBase, StockSemi.base_id == MaterialBase.id ).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery() product_match = db.session.query(TransOutbound.outbound_no).join( StockProduct, and_( TransOutbound.stock_id == StockProduct.id, TransOutbound.source_table == 'stock_product' ) ).join( MaterialBase, StockProduct.base_id == MaterialBase.id ).filter(MaterialBase.name.ilike(f'%{keyword}%')).subquery() all_matches = db.session.query(buy_match.c.outbound_no).union( db.session.query(semi_match.c.outbound_no), db.session.query(product_match.c.outbound_no) ).subquery() keyword_conditions = TransOutbound.outbound_no.in_(all_matches) elif search_type == 'spec_model': # 联表查询规格型号 buy_match = db.session.query(TransOutbound.outbound_no).join( StockBuy, and_( TransOutbound.stock_id == StockBuy.id, TransOutbound.source_table == 'stock_buy' ) ).join( MaterialBase, StockBuy.base_id == MaterialBase.id ).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery() semi_match = db.session.query(TransOutbound.outbound_no).join( StockSemi, and_( TransOutbound.stock_id == StockSemi.id, TransOutbound.source_table == 'stock_semi' ) ).join( MaterialBase, StockSemi.base_id == MaterialBase.id ).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery() product_match = db.session.query(TransOutbound.outbound_no).join( StockProduct, and_( TransOutbound.stock_id == StockProduct.id, TransOutbound.source_table == 'stock_product' ) ).join( MaterialBase, StockProduct.base_id == MaterialBase.id ).filter(MaterialBase.spec_model.ilike(f'%{keyword}%')).subquery() all_matches = db.session.query(buy_match.c.outbound_no).union( db.session.query(semi_match.c.outbound_no), db.session.query(product_match.c.outbound_no) ).subquery() keyword_conditions = TransOutbound.outbound_no.in_(all_matches) else: keyword_conditions = None else: keyword_conditions = None stmt = db.session.query( TransOutbound.outbound_no, func.max(TransOutbound.outbound_time).label('max_time') ).group_by(TransOutbound.outbound_no) if keyword_conditions is not None: stmt = stmt.filter(keyword_conditions) if start_date and end_date: stmt = stmt.filter(TransOutbound.outbound_time.between(start_date, end_date)) stmt = stmt.order_by(desc('max_time')) # 使用 distinct 确保跨表查询不重复 stmt = stmt.distinct() pagination = stmt.paginate(page=page, per_page=per_page, error_out=False) outbound_nos = [row.outbound_no for row in pagination.items] if not outbound_nos: return { 'items': [], 'total': 0, 'pages': 0, 'current_page': page } # 2. 查询详细记录 details = TransOutbound.query.filter(TransOutbound.outbound_no.in_(outbound_nos)).all() # 3. 组装数据并查询物品详情 grouped_map = {} # 映射表模型以便查询 model_map = { 'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct } for d in details: ono = d.outbound_no if ono not in grouped_map: grouped_map[ono] = { 'outbound_no': ono, 'outbound_time': d.outbound_time.strftime('%Y-%m-%d %H:%M:%S'), 'outbound_type': d.outbound_type, 'consumer_name': d.consumer_name, 'operator_name': d.operator_name, 'signature_path': d.signature_path, 'remark': d.remark, 'total_amount': 0.0, 'items': [] } # --- 查询物品详细信息 (名称, 规格, 类型, 类别, 批号/SN) --- item_name = "未知物品" item_spec = "" item_cat = "" item_type = "" batch_sn = "-" ModelClass = model_map.get(d.source_table) if ModelClass and d.stock_id: # 注意:这里在循环中查询可能会有N+1问题,但考虑到单页数据量(通常每单条目不多),暂时可接受 # 生产环境建议优化为预加载或批量查询 try: stock_item = ModelClass.query.get(d.stock_id) if stock_item: # 获取批号/序列号用于追溯 batch_sn = getattr(stock_item, 'batch_number', None) or getattr(stock_item, 'serial_number', None) or '-' if stock_item.base: item_name = stock_item.base.name item_spec = stock_item.base.spec_model item_cat = stock_item.base.category item_type = stock_item.base.material_type elif stock_item and hasattr(stock_item, 'base_id') and stock_item.base_id: base_info = MaterialBase.query.get(stock_item.base_id) if base_info: item_name = base_info.name item_spec = base_info.spec_model item_cat = base_info.category item_type = base_info.material_type except Exception as e: print(f"Error fetching detail for stock_id {d.stock_id}: {e}") # 计算金额 price = float(d.unit_price) if d.unit_price else 0 qty = float(d.quantity) subtotal = price * qty grouped_map[ono]['total_amount'] += subtotal grouped_map[ono]['items'].append({ 'sku': d.sku, 'name': item_name, 'spec_model': item_spec, 'category': item_cat, 'material_type': item_type, 'quantity': qty, 'unit_price': price, 'subtotal': subtotal, 'batch_sn': batch_sn }) # 4. 排序输出 result_list = [] for ono in outbound_nos: if ono in grouped_map: obj = grouped_map[ono] obj['items'].sort(key=lambda x: x['unit_price'], reverse=True) obj['total_amount'] = round(obj['total_amount'], 2) result_list.append(obj) return { 'items': result_list, 'total': pagination.total, 'pages': pagination.pages, 'current_page': page } class OutboundApprovalService: """出库审批服务""" @staticmethod def generate_request_no(): """ 生成审批单号: APR-OUT-yyyyMMdd-HHmm-当日流水(4位) """ beijing_tz = timezone(timedelta(hours=8)) now = datetime.now(beijing_tz) date_str = now.strftime('%Y%m%d') time_str = now.strftime('%H%M') prefix = f"APR-OUT-{date_str}-" from app.models.outbound import OutboundApproval latest = db.session.query(OutboundApproval.request_no).filter( OutboundApproval.request_no.like(f"{prefix}%") ).order_by(OutboundApproval.id.desc()).first() if latest: last_seq = int(latest[0].split('-')[-1]) sequence = last_seq + 1 else: sequence = 1 return f"APR-OUT-{date_str}-{time_str}-{sequence:04d}" @staticmethod def create_request(applicant_id, items, allowed_approvers, remark=None, approver_id=None): """ 创建出库审批单(申请阶段,直接存储前端传来的物料信息快照,不关联具体库存记录) Args: applicant_id: 申请人ID items: 出库物品明细列表,每个物品应包含: - name: 物料名称 (必填) - spec_model: 规格型号 (必填) - quantity: 计划出库数量 (必填) - warehouse_location: 库位 (可选) - remark: 物品备注 (可选) allowed_approvers: 允许审批的人员/角色列表 approver_id: 指定审批人ID(可选,传则覆盖 allowed_approvers) remark: 申请说明 Returns: OutboundApproval 实例 Raises: ValueError: 当 items 为空或缺少必填字段时抛出 """ from app.models.outbound import OutboundApproval # 校验 items 非空 if not items: raise ValueError("出库物品列表不能为空") # 校验每个物品的宏观字段 (name, spec_model, quantity) required_fields = ['name', 'spec_model', 'quantity'] for idx, item in enumerate(items): missing_fields = [f for f in required_fields if f not in item or str(item.get(f) or '').strip() == ''] if missing_fields: raise ValueError( f"第 {idx + 1} 条物品缺少必填字段: {', '.join(missing_fields)}。" f"必须包含: name, spec_model, quantity" ) try: qty = float(item.get('quantity', 0)) if qty <= 0: raise ValueError(f"第 {idx + 1} 条物品的出库数量必须大于0") except (TypeError, ValueError) as e: raise ValueError(f"第 {idx + 1} 条物品的 quantity 格式无效: {str(e)}") # ★ 校验 allowed_approvers 非空 if not allowed_approvers: raise ValueError("必须指定至少一位审批人") # ★ 指定审批人模式:approver_id 覆盖 allowed_approvers if approver_id: allowed_approvers = [{"type": "user", "value": int(approver_id)}] request_no = OutboundApprovalService.generate_request_no() approval = OutboundApproval( request_no=request_no, applicant_id=applicant_id, remark=remark, status=0, # 待审批 ) # 直接存储前端传来的物料信息快照,不查询/不关联具体库存记录 approval.set_items(items) approval.set_allowed_approvers(allowed_approvers) db.session.add(approval) db.session.commit() # ★ 创建成功后,发送邮件通知审批人(精确通知 approver_id 对应的邮箱) OutboundApprovalService._notify_new_request(approval, applicant_id, approver_id=approver_id) return approval @staticmethod def _get_emails_by_identifiers(applicant_id=None, role_codes=None): """ 根据用户ID或角色列表查询邮箱地址 Args: applicant_id: 用户ID (按 SysUser.id 查找) role_codes: 角色代码列表,如 ['ADMIN', 'WAREHOUSE_ADMIN'] Returns: 去重后的邮箱地址列表 """ emails = [] if applicant_id: user = SysUser.query.get(int(applicant_id)) if user and user.email: emails.append(user.email) if role_codes: for code in role_codes: users = SysUser.query.filter_by(role=code).all() for u in users: if u.email: emails.append(u.email) return list(set(emails)) @staticmethod def _notify_new_request(approval, applicant_id, approver_id=None): """发送新申请通知邮件给审批人(静默处理,不阻断主流程)""" try: from flask import current_app from app.utils.email_service import send_new_request_notify emails = [] if approver_id: # ★ 精准通知模式:直接查询指定审批人 user = SysUser.query.get(int(approver_id)) if user and user.email: emails.append(user.email) else: # 兜底:按角色查询 approvers = approval.get_allowed_approvers() role_codes = [] for a in approvers: if a.get('type') == 'role': role_codes.append(a.get('value', '')) emails = OutboundApprovalService._get_emails_by_identifiers(role_codes=role_codes) if not emails: current_app.logger.info(f"[Email] 审批单 {approval.request_no} 无审批人邮箱,跳过通知") return # 获取申请人姓名 applicant_name = '' if applicant_id: u = SysUser.query.get(applicant_id) if u: # username 格式为 "姓名/账号",取姓名部分 applicant_name = str(u.username).split('/')[0] if '/' in u.username else (u.username or str(applicant_id)) send_new_request_notify( to_emails=emails, request_no=approval.request_no, applicant_name=applicant_name, remark=approval.remark or '' ) except Exception as e: # ★ 捕获所有异常,确保邮件发送失败不阻断主流程 try: from flask import current_app current_app.logger.error(f"[Email] 发送新申请通知邮件失败: {e}") except RuntimeError: # 如果不在 Flask 应用上下文内,降级为标准日志 logging.getLogger(__name__).error(f"[Email] 发送新申请通知邮件失败: {e}") @staticmethod def can_approve(approval, user_id, user_role): """ 检查用户是否有权限审批 Args: approval: OutboundApproval 实例 user_id: 用户ID user_role: 用户角色 Returns: bool, 是否有权限 """ approvers = approval.get_allowed_approvers() # 超级管理员可以直接审批 if user_role and user_role.upper() == 'SUPER_ADMIN': return True for approver in approvers: approver_type = approver.get('type', '') approver_value = approver.get('value', '') if approver_type == 'user' and str(approver_value) == str(user_id): return True if approver_type == 'role' and approver_value == user_role: return True return False @staticmethod def approve(request_id, user_id, user_role, action='approve', reject_reason=None): """ 执行审批操作 Args: request_id: 审批单ID user_id: 审批人ID user_role: 审批人角色 action: 'approve' 通过, 'reject' 驳回 reject_reason: 驳回原因 Returns: (success: bool, message: str, approval: OutboundApproval or None) """ from app.models.outbound import OutboundApproval beijing_tz = timezone(timedelta(hours=8)) current_time = datetime.now(beijing_tz).replace(tzinfo=None) approval = OutboundApproval.query.get(request_id) if not approval: return False, "审批单不存在", None if approval.status != 0: return False, f"审批单状态已更新,无法重复审批 (当前状态: {approval.status})", None if not OutboundApprovalService.can_approve(approval, user_id, user_role): return False, "您没有审批此单的权限", None try: if action == 'approve': approval.status = 1 # 已通过 approval.actual_approver_id = user_id approval.approved_at = current_time elif action == 'reject': approval.status = 2 # 已驳回 approval.reject_reason = reject_reason else: return False, "无效的审批操作", None db.session.commit() # ★ 审批成功后,发送邮件通知仓库管理员 OutboundApprovalService._notify_approval_result(approval, user_id, action) return True, "审批成功", approval except Exception as e: db.session.rollback() return False, f"审批失败: {str(e)}", None @staticmethod def _notify_approval_result(approval, approver_id, action): """发送审批结果通知邮件(静默处理,不阻断主流程)""" try: from app.utils.email_service import send_approval_result_notify # 仓库管理员角色代码 warehouse_role_codes = ['WAREHOUSE_MGR', 'OUTBOUND'] # 查询库管邮箱 + 申请人本人邮箱 emails = OutboundApprovalService._get_emails_by_identifiers( applicant_id=approval.applicant_id, role_codes=warehouse_role_codes ) if not emails: return send_approval_result_notify( to_emails=emails, request_no=approval.request_no, is_passed=(action == 'approve'), reject_reason=approval.reject_reason or '' ) except Exception as e: logging.getLogger(__name__).warning(f"[Email] 发送审批结果通知邮件失败: {e}") @staticmethod def get_request_list(page=1, per_page=10, applicant_id=None, status=None): """ 获取审批单列表 Args: page: 页码 per_page: 每页数量 applicant_id: 按申请人筛选 (可选) status: 按状态筛选 (可选) Returns: 分页结果 """ from app.models.outbound import OutboundApproval from sqlalchemy import desc query = OutboundApproval.query if applicant_id: query = query.filter(OutboundApproval.applicant_id == applicant_id) if status is not None: query = query.filter(OutboundApproval.status == status) query = query.order_by(desc(OutboundApproval.created_at)) pagination = query.paginate(page=page, per_page=per_page, error_out=False) return { 'items': [item.to_dict() for item in pagination.items], 'total': pagination.total, 'pages': pagination.pages, 'current_page': page } @staticmethod def get_request_by_id(request_id): """根据ID获取审批单""" from app.models.outbound import OutboundApproval return OutboundApproval.query.get(request_id)