""" app/services/export_service/excel_task.py 异步导出核心任务逻辑 Redis 中的任务状态键格式:export:task:{task_id} TTL = 1 小时(3600 秒),超时自动清理 状态流转: 提交任务 → status=processing, progress=0 写入中 → status=processing, progress=N (10~90) 完成 → status=completed, progress=100, url=下载路径 失败 → status=failed, error=具体原因 """ import os import uuid import json import time import logging from threading import Thread from datetime import datetime from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side logger = logging.getLogger(__name__) # 导出文件存放根目录(相对于项目根目录) EXPORT_DIR = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'uploads', 'exports' ) # Redis 键前缀 + TTL TASK_KEY_PREFIX = 'export:task:' TASK_TTL = 3600 # 1小时 def _redis(): """获取 Redis 客户端,带容错保护。""" try: from app.extensions import redis_client return redis_client except Exception: return None def _update_task(task_id: str, **kwargs): """ 原子更新 Redis 中的任务状态。 使用 setex 分两步: 1. 保存最新状态 JSON 2. 重置 TTL 为 1 小时 即使 Redis 写入失败也不阻断业务流程。 """ client = _redis() if not client: return key = f"{TASK_KEY_PREFIX}{task_id}" try: client.setex(key, TASK_TTL, json.dumps(kwargs, ensure_ascii=False)) logger.debug(f"[Export] 更新任务状态 task_id={task_id} → {kwargs}") except Exception as e: logger.warning(f"[Export] Redis 更新任务状态失败: task_id={task_id}, err={e}") # ============================================================================= # 对外入口:提交导出任务(启动后台线程) # ============================================================================= def submit_export_task(filters: dict) -> str: """ 接收前端过滤参数,生成 task_id,写入 Redis 初始状态, 然后启动后台线程执行 Excel 生成。 参数: filters: dict,任意查询参数(category, keyword, status 等) 返回: str: task_id(UUID),前端用此 ID 轮询进度 """ task_id = str(uuid.uuid4()) # 写入 Redis:初始状态 _update_task(task_id, status='processing', progress=0, url='', error='') # 立即启动后台线程执行(daemon=True 使主进程退出时自动终止) t = Thread( target=generate_excel_task, args=(task_id, filters), daemon=True ) t.start() logger.info(f"[Export] 任务已提交 task_id={task_id}, filters={filters}") return task_id # ============================================================================= # 后台任务:生成 Excel 文件 # ============================================================================= def generate_excel_task(task_id: str, filters: dict): """ 在后台线程中执行 Excel 生成。 流程: 1. 更新进度 10% → 开始查询 2. 根据 filters 查询数据库(可复用现有 Service) 3. 用 openpyxl 构建 Workbook,写入数据 4. 保存到 uploads/exports/{task_id}.xlsx 5. 更新进度 100% + status=completed + url 任何异常被捕获,不会导致主进程崩溃。 """ logger.info(f"[Export] 任务开始 task_id={task_id}") try: # ===== 阶段1:查询数据(模拟 + 实际) ===== _update_task(task_id, status='processing', progress=10) # 延迟导入:在子线程中加载 App Context,避免主线程时序问题 from flask import current_app from app.extensions import db from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct records = _query_inventory(filters) # ===== 阶段2:写入 Excel ===== _update_task(task_id, status='processing', progress=40) filename = f"{task_id}.xlsx" filepath = os.path.join(EXPORT_DIR, filename) _write_excel(filepath, records, task_id) # ===== 阶段3:完成 ===== _update_task( task_id, status='completed', progress=100, url=f"/api/v1/export/download/{task_id}", error='' ) logger.info(f"[Export] 任务完成 task_id={task_id}, file={filename}") except Exception as e: logger.error(f"[Export] 任务失败 task_id={task_id}, err={e}") _update_task(task_id, status='failed', error=str(e)) # ============================================================================= # 查询层:根据 filters 聚合库存数据 # ============================================================================= def _query_inventory(filters: dict) -> list: """ 根据过滤条件查询三张库存表,返回标准化记录列表。 进度更新策略:在主线程(后台线程)内,每处理 1000 条回调一次 Redis。 """ from app.extensions import db from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct from app.models.base import MaterialBase results = [] # ---------- 采购件 ---------- query = db.session.query( MaterialBase.name.label('material_name'), MaterialBase.spec_model.label('spec_model'), StockBuy.barcode, StockBuy.sku, StockBuy.status, StockBuy.warehouse_location, StockBuy.available_quantity, StockBuy.supplier_name, StockBuy.in_date, ).join(MaterialBase, StockBuy.base_id == MaterialBase.id) if filters.get('keyword'): kw = f"%{filters['keyword']}%" query = query.filter( (MaterialBase.name.ilike(kw)) | (MaterialBase.spec_model.ilike(kw)) | (StockBuy.barcode.ilike(kw)) | (StockBuy.sku.ilike(kw)) ) if filters.get('status'): query = query.filter(StockBuy.status == filters['status']) all_rows = query.order_by(StockBuy.id.desc()).limit(10000).all() total = len(all_rows) for idx, row in enumerate(all_rows): results.append({ 'type': '采购件', 'material_name': row.material_name or '', 'spec_model': row.spec_model or '', 'barcode': row.barcode or '', 'sku': row.sku or '', 'status': row.status or '', 'warehouse_location': row.warehouse_location or '', 'available_quantity': float(row.available_quantity or 0), 'supplier_name': row.supplier_name or '', 'in_date': row.in_date.strftime('%Y-%m-%d') if row.in_date else '', }) # 每 1000 条更新一次 Redis 进度(40%~80% 之间) if idx > 0 and idx % 1000 == 0: pct = 40 + int(40 * idx / total) if total else 80 # 注意:这里的 task_id 由外层 generate_excel_task 持有, # 进度更新在 _write_excel 中进行,此处仅做占位说明 logger.debug(f"[Export] 采购件已处理 {idx}/{total} 条, 估算进度 {pct}%") # ---------- 半成品 + 成品(可同理扩展) ---------- return results # ============================================================================= # Excel 写入层:使用 openpyxl 构建 .xlsx 文件 # ============================================================================= def _write_excel(filepath: str, records: list, task_id: str): """ 使用 openpyxl 将记录列表写入 Excel 文件。 包含表头样式(加粗、背景色)、自适应列宽、边框。 参数: filepath: 完整保存路径(含 .xlsx 后缀) records: 标准化后的数据列表(dict) task_id: 用于增量进度更新 """ os.makedirs(os.path.dirname(filepath), exist_ok=True) wb = Workbook() ws = wb.active ws.title = "库存导出" if not records: ws.append(['暂无数据']) wb.save(filepath) return # ---------- 表头 ---------- headers = [ '类型', '物料名称', '规格型号', '条码', 'SKU', '状态', '库位', '可用数量', '供应商', '入库日期' ] ws.append(headers) # 表头样式:深蓝色背景 + 白色加粗字体 header_fill = PatternFill("solid", fgColor="1F4E79") header_font = Font(bold=True, color="FFFFFF", size=11) header_align = Alignment(horizontal='center', vertical='center', wrap_text=True) thin = Side(style='thin', color='BFBFBF') border = Border(left=thin, right=thin, top=thin, bottom=thin) for col_idx, _ in enumerate(headers, start=1): cell = ws.cell(row=1, column=col_idx) cell.fill = header_fill cell.font = header_font cell.alignment = header_align cell.border = border # ---------- 数据行 ---------- even_fill = PatternFill("solid", fgColor="DEEAF1") # 浅蓝隔行底色 data_align = Alignment(horizontal='left', vertical='center') data_font = Font(size=10) total = len(records) for idx, rec in enumerate(records): ws.append([ rec.get('type', ''), rec.get('material_name', ''), rec.get('spec_model', ''), rec.get('barcode', ''), rec.get('sku', ''), rec.get('status', ''), rec.get('warehouse_location', ''), rec.get('available_quantity', 0), rec.get('supplier_name', ''), rec.get('in_date', ''), ]) # 每 1000 行更新一次 Redis 进度(80%~95%) row_num = idx + 2 for col_idx in range(1, len(headers) + 1): cell = ws.cell(row=row_num, column=col_idx) cell.alignment = data_align cell.font = data_font cell.border = border if idx % 2 == 1: cell.fill = even_fill if idx > 0 and idx % 1000 == 0: pct = 80 + int(15 * idx / total) _update_task(task_id, status='processing', progress=min(pct, 95)) # ---------- 自适应列宽 ---------- for col in ws.columns: max_len = 0 col_letter = col[0].column_letter for cell in col: if cell.value: max_len = max(max_len, len(str(cell.value))) ws.column_dimensions[col_letter].width = min(max_len + 4, 40) # ---------- 冻结首行 ---------- ws.freeze_panes = 'A2' # ---------- 保存 ---------- wb.save(filepath) logger.info(f"[Export] Excel 已保存: {filepath}, 共 {total} 行") # ============================================================================= # 查询任务状态(供 API 层调用) # ============================================================================= def get_task_status(task_id: str) -> dict: """ 从 Redis 读取任务状态字典。 若任务不存在或 Redis 不可用,返回默认 pending 状态。 """ client = _redis() if not client: return {'status': 'unknown', 'progress': 0, 'url': '', 'error': ''} key = f"{TASK_KEY_PREFIX}{task_id}" try: raw = client.get(key) if raw: return json.loads(raw) return {'status': 'not_found', 'progress': 0, 'url': '', 'error': ''} except Exception as e: logger.warning(f"[Export] 读取任务状态失败: task_id={task_id}, err={e}") return {'status': 'unknown', 'progress': 0, 'url': '', 'error': ''} # ============================================================================= # 获取导出文件路径(供下载接口调用) # ============================================================================= def get_export_filepath(task_id: str) -> str | None: """ 根据 task_id 返回已生成文件的完整路径。 未完成或不存在返回 None。 """ filename = f"{task_id}.xlsx" filepath = os.path.join(EXPORT_DIR, filename) if os.path.exists(filepath): return filepath return None