from datetime import datetime import pytz from app.extensions import db from app.models.borrow import BorrowApproval from app.models.system import SysUser class BorrowApprovalService: """借库审批服务""" @staticmethod def generate_request_no(): """ 生成审批单号: APR-BOR-yyyyMMdd-HHmm-当日流水(4位) """ beijing_tz = pytz.timezone('Asia/Shanghai') now = datetime.now(beijing_tz) date_str = now.strftime('%Y%m%d') time_str = now.strftime('%H%M') prefix = f"APR-BOR-{date_str}-" latest = db.session.query(BorrowApproval.request_no).filter( BorrowApproval.request_no.like(f"{prefix}%") ).order_by(BorrowApproval.id.desc()).first() if latest: last_seq = int(latest[0].split('-')[-1]) sequence = last_seq + 1 else: sequence = 1 return f"APR-BOR-{date_str}-{time_str}-{sequence:04d}" @staticmethod def submit_approval(applicant_id, items, allowed_approvers, remark=None, approver_id=None, borrower_name=None): """ 提交借库申请(仅存储意向,不扣库存) Args: applicant_id: 申请人ID items: 借库物品明细列表,每个物品应包含: - name: 物料名称 (必填) - spec_model: 规格型号 (必填) - quantity: 计划借库数量 (必填) - warehouse_location: 库位 (可选) - remark: 物品备注 (可选) allowed_approvers: 允许审批的人员/角色列表 approver_id: 指定审批人ID(可选) remark: 申请说明 borrower_name: 借库人姓名(必填) Returns: BorrowApproval 实例 Raises: ValueError: 当 items 为空或缺少必填字段时抛出 """ if not items: raise ValueError("借库物品列表不能为空") required_fields = ['name', 'spec_model', 'quantity'] for idx, item in enumerate(items): missing_fields = [f for f in required_fields if f not in item or str(item.get(f) or '').strip() == ''] if missing_fields: raise ValueError( f"第 {idx + 1} 条物品缺少必填字段: {', '.join(missing_fields)}。" f"必须包含: name, spec_model, quantity" ) try: qty = float(item.get('quantity', 0)) if qty <= 0: raise ValueError(f"第 {idx + 1} 条物品的借库数量必须大于0") except (TypeError, ValueError) as e: raise ValueError(f"第 {idx + 1} 条物品的 quantity 格式无效: {str(e)}") if not allowed_approvers: raise ValueError("必须指定至少一位审批人") if approver_id: allowed_approvers = [{"type": "user", "value": int(approver_id)}] request_no = BorrowApprovalService.generate_request_no() approval = BorrowApproval( request_no=request_no, applicant_id=applicant_id, remark=remark, borrower_name=borrower_name, status=0, # 待审批 ) approval.set_items(items) approval.set_allowed_approvers(allowed_approvers) db.session.add(approval) db.session.commit() # ★ 创建成功后,发送邮件通知审批人(静默处理,不阻断主流程) BorrowApprovalService._notify_new_request(approval, applicant_id, approver_id=approver_id) return approval @staticmethod def _get_emails_by_identifiers(applicant_id=None, role_codes=None): """ 根据用户ID或角色列表查询邮箱地址 Args: applicant_id: 用户ID (按 SysUser.id 查找) role_codes: 角色代码列表,如 ['SUPERVISOR', 'SUPER_ADMIN'] Returns: 去重后的邮箱地址列表 """ emails = [] if applicant_id: user = SysUser.query.get(int(applicant_id)) if user and user.email: emails.append(user.email) if role_codes: for code in role_codes: users = SysUser.query.filter_by(role=code).all() for u in users: if u.email: emails.append(u.email) return list(set(emails)) @staticmethod def _notify_new_request(approval, applicant_id, approver_id=None): """发送新借库申请通知邮件给审批人和申请人(静默处理,不阻断主流程)""" try: from flask import current_app from app.utils.email_service import send_new_request_notify from app.models.system import SysUser applicant_name = '' applicant_emails = [] # 1. 收集申请人信息 if applicant_id: user = SysUser.query.get(int(applicant_id)) if user and user.email: applicant_emails.append(user.email) applicant_name = str(user.username).split('/')[0] if '/' in (user.username or '') else (user.username or str(applicant_id)) # 2. 收集审批人信息 approver_emails = [] if approver_id: user = SysUser.query.get(int(approver_id)) if user and user.email: approver_emails.append(user.email) else: # 兜底:按角色查询 approvers = approval.get_allowed_approvers() role_codes = [] for a in approvers: if a.get('type') == 'role': role_codes.append(a.get('value', '')) approver_emails = BorrowApprovalService._get_emails_by_identifiers(role_codes=role_codes) # 去重 all_emails = list(set(applicant_emails + approver_emails)) if not all_emails: current_app.logger.info(f"[Email] 借库审批单 {approval.request_no} 无收件人邮箱,跳过通知") return # 3. 获取物料明细 items = approval.get_items() # 4. 分别发送邮件 if applicant_emails: try: send_new_request_notify( to_emails=applicant_emails, request_no=approval.request_no, applicant_name=applicant_name, remark=f"您的借库申请已提交,等待审批。{approval.remark or ''}", items=items, is_applicant_notify=True ) except Exception as e: current_app.logger.error(f"[Email] 通知申请人失败: {e}") if approver_emails: try: send_new_request_notify( to_emails=approver_emails, request_no=approval.request_no, applicant_name=applicant_name, remark=approval.remark or '', items=items, is_applicant_notify=False ) except Exception as e: current_app.logger.error(f"[Email] 通知审批人失败: {e}") except Exception as e: try: from flask import current_app current_app.logger.error(f"[Email] 发送新借库申请通知邮件失败: {e}") except RuntimeError: import traceback traceback.print_exc() @staticmethod def _notify_approval_result(approval, approver_id, action): """发送借库审批结果通知邮件(静默处理,不阻断主流程)""" import logging logger = logging.getLogger(__name__) try: from app.utils.email_service import send_approval_result_notify, send_warehouse_dispatch_notify from app.models.system import SysUser as SU # 1. 提取申请人信息 applicant_name = '' applicant_emails = [] if approval.applicant_id: user = SU.query.get(approval.applicant_id) if user: applicant_name = str(user.username).split('/')[0] if '/' in (user.username or '') else (user.username or '') if user.email: applicant_emails.append(user.email) # 2. 提取物料明细 items = approval.items_json if approval.items_json else [] # 3. 分支逻辑 if action == 'approve': # 3.1 通知库管(带明细) warehouse_role_codes = ['WAREHOUSE_MGR', 'OUTBOUND'] warehouse_emails = BorrowApprovalService._get_emails_by_identifiers(role_codes=warehouse_role_codes) if warehouse_emails: try: send_warehouse_dispatch_notify( to_emails=warehouse_emails, request_no=approval.request_no, applicant_name=applicant_name, items=items ) except Exception as e: logger.error(f"[Email] 通知库管失败: {e}") # 3.2 通知申请人(审批通过,带完整物料清单) if applicant_emails: try: send_warehouse_dispatch_notify( to_emails=applicant_emails, request_no=approval.request_no, applicant_name=applicant_name, items=items ) except Exception as e: logger.error(f"[Email] 通知申请人(通过)失败: {e}") elif action == 'reject': # 3.3 通知申请人(已驳回) if applicant_emails: try: send_approval_result_notify( to_emails=applicant_emails, request_no=approval.request_no, is_passed=False, reject_reason=approval.reject_reason or '未说明原因', applicant_name=applicant_name ) except Exception as e: logger.error(f"[Email] 通知申请人驳回失败: {e}") else: logger.warning("[Email] 申请人无邮箱,无法发送驳回通知") except Exception as e: import traceback traceback.print_exc() logger.error(f"[Email] 外层发送异常: {e}") @staticmethod def can_approve(approval, user_id, user_role): """ 检查用户是否有权限审批 """ approvers = approval.get_allowed_approvers() if user_role and user_role.upper() == 'SUPER_ADMIN': return True for approver in approvers: approver_type = approver.get('type', '') approver_value = approver.get('value', '') if approver_type == 'user' and str(approver_value) == str(user_id): return True if approver_type == 'role' and approver_value == user_role: return True return False @staticmethod def approve(request_id, user_id, user_role, action='approve', reject_reason=None): """ 执行审批操作 Returns: (success: bool, message: str, approval: BorrowApproval or None) """ beijing_tz = pytz.timezone('Asia/Shanghai') current_time = datetime.now(beijing_tz).replace(tzinfo=None) approval = BorrowApproval.query.get(request_id) if not approval: return False, "审批单不存在", None if approval.status != 0: return False, f"审批单状态已更新,无法重复审批 (当前状态: {approval.status})", None if not BorrowApprovalService.can_approve(approval, user_id, user_role): return False, "您没有审批此单的权限", None try: if action == 'approve': approval.status = 1 # 已通过 approval.actual_approver_id = user_id approval.approved_at = current_time elif action == 'reject': approval.status = 2 # 已驳回 approval.reject_reason = reject_reason else: return False, "无效的审批操作", None db.session.commit() # ★ 审批后,发送邮件通知(静默处理,不阻断主流程) BorrowApprovalService._notify_approval_result(approval, user_id, action) return True, "审批成功", approval except Exception as e: db.session.rollback() return False, f"审批失败: {str(e)}", None @staticmethod def get_request_list(page=1, per_page=10, applicant_id=None, status=None): """ 获取审批单列表 """ from sqlalchemy import desc query = BorrowApproval.query if applicant_id: query = query.filter(BorrowApproval.applicant_id == applicant_id) if status is not None: query = query.filter(BorrowApproval.status == status) query = query.order_by(desc(BorrowApproval.created_at)) pagination = query.paginate(page=page, per_page=per_page, error_out=False) return { 'items': [item.to_dict() for item in pagination.items], 'total': pagination.total, 'pages': pagination.pages, 'current_page': page } @staticmethod def get_request_by_id(request_id): """根据ID获取审批单""" return BorrowApproval.query.get(request_id) @staticmethod def mark_completed(request_id): """标记审批单为已完成(借库执行完成后调用)""" approval = BorrowApproval.query.get(request_id) if not approval: return False, "审批单不存在", None if approval.status != 1: return False, f"只有已通过的审批单才能标记为完成 (当前状态: {approval.status})", None try: approval.status = 3 # 已完成 db.session.commit() return True, "审批单已完成", approval except Exception as e: db.session.rollback() return False, f"操作失败: {str(e)}", None