From 4d8105607538cc53d6aa3a0ad1966c02838cfdf8 Mon Sep 17 00:00:00 2001 From: DXC Date: Tue, 19 May 2026 10:35:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=AF=BC=E5=87=BA=E9=AA=A8=E6=9E=B6=EF=BC=88Threading=20+=20Re?= =?UTF-8?q?dis=20=E7=8A=B6=E6=80=81=E6=B5=81=E8=BD=AC=EF=BC=89=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=20POST=20=E6=8F=90=E4=BA=A4/=E8=BD=AE?= =?UTF-8?q?=E8=AF=A2=E7=8A=B6=E6=80=81/=E4=B8=8B=E8=BD=BD=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inventory-backend/app/__init__.py | 11 + .../app/api/v1/export/__init__.py | 10 + .../app/api/v1/export/inventory_export.py | 144 +++++++ .../app/services/export_service/excel_task.py | 358 ++++++++++++++++++ 4 files changed, 523 insertions(+) create mode 100644 inventory-backend/app/api/v1/export/__init__.py create mode 100644 inventory-backend/app/api/v1/export/inventory_export.py create mode 100644 inventory-backend/app/services/export_service/excel_task.py diff --git a/inventory-backend/app/__init__.py b/inventory-backend/app/__init__.py index 79168f0..5053b9a 100644 --- a/inventory-backend/app/__init__.py +++ b/inventory-backend/app/__init__.py @@ -234,6 +234,17 @@ def create_app(): except Exception as e: print(f"❌ 错误: Scan 模块注册失败: {e}") + # ----------------------------------------------------- + # 2.x 注册异步导出模块 (Export) + # ----------------------------------------------------- + try: + from app.api.v1.export import export_bp + app.register_blueprint(export_bp, url_prefix='/api/v1/export') + app.register_blueprint(export_bp, url_prefix='/api/export', name='export_legacy') + print("✅ Export 模块注册成功") + except Exception as e: + print(f"❌ 错误: Export 模块注册失败: {e}") + # ========================================================= # 3. 预加载数据模型 # ========================================================= diff --git a/inventory-backend/app/api/v1/export/__init__.py b/inventory-backend/app/api/v1/export/__init__.py new file mode 100644 index 0000000..dd7fcc6 --- /dev/null +++ b/inventory-backend/app/api/v1/export/__init__.py @@ -0,0 +1,10 @@ +""" +app/api/v1/export/__init__.py +导出模块 Blueprint 注册文件 +""" + +from flask import Blueprint + +export_bp = Blueprint('export', __name__, url_prefix='/api/v1/export') + +from app.api.v1.export import inventory_export \ No newline at end of file diff --git a/inventory-backend/app/api/v1/export/inventory_export.py b/inventory-backend/app/api/v1/export/inventory_export.py new file mode 100644 index 0000000..f9dc34f --- /dev/null +++ b/inventory-backend/app/api/v1/export/inventory_export.py @@ -0,0 +1,144 @@ +""" +app/api/v1/export/inventory_export.py +异步导出核心接口 + +提供三个端点: + POST /api/v1/export/inventory → 提交导出任务,返回 task_id + GET /api/v1/export/status/ → 查询任务状态(轮询) + GET /api/v1/export/download/ → 下载已生成的 Excel 文件 +""" + +import os +from flask import Blueprint, request, jsonify, send_file, current_app +from flask_jwt_extended import jwt_required, get_jwt +from app.services.export_service.excel_task import ( + submit_export_task, + get_task_status, + get_export_filepath, +) +from app.utils.decorators import permission_required + +export_bp = Blueprint('export', __name__, url_prefix='/api/v1/export') + + +# ============================================================================= +# 任务提交接口 +# ============================================================================= + +@export_bp.route('/inventory', methods=['POST']) +@jwt_required() +@permission_required('inventory_manage') +def submit_export(): + """ + 接收前端导出请求,生成 task_id,立即返回。 + + 请求体(JSON): + { + "keyword": "螺丝", + "category": "原材料", + "status": "在库" + } + + 响应: + { "code": 200, "msg": "success", "data": { "task_id": "xxx" } } + """ + try: + filters = request.get_json() or {} + + # 生成 task_id 并启动后台任务(同步返回,不阻塞) + task_id = submit_export_task(filters) + + current_app.logger.info( + f"[Export] 用户 {get_jwt().get('username')} 提交导出任务 task_id={task_id}" + ) + + return jsonify({ + 'code': 200, + 'msg': '导出任务已创建', + 'data': { + 'task_id': task_id, # 前端用此 ID 轮询 /export/status/ + } + }) + + except Exception as e: + current_app.logger.error(f"[Export] 提交导出任务失败: {e}") + return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500 + + +# ============================================================================= +# 进度查询接口(前端轮询) +# ============================================================================= + +@export_bp.route('/status/', methods=['GET']) +@jwt_required() +def get_export_status(task_id: str): + """ + 从 Redis 读取任务状态,供前端轮询。 + + 响应示例(处理中): + { + "code": 200, + "data": { "status": "processing", "progress": 45, "url": "", "error": "" } + } + + 响应示例(已完成): + { + "code": 200, + "data": { "status": "completed", "progress": 100, "url": "/api/v1/export/download/xxx", "error": "" } + } + """ + try: + status = get_task_status(task_id) + + if status.get('status') == 'not_found': + return jsonify({'code': 404, 'msg': '任务不存在或已过期'}), 404 + + return jsonify({ + 'code': 200, + 'msg': 'success', + 'data': status + }) + + except Exception as e: + current_app.logger.error(f"[Export] 查询任务状态失败: task_id={task_id}, err={e}") + return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500 + + +# ============================================================================= +# 文件下载接口 +# ============================================================================= + +@export_bp.route('/download/', methods=['GET']) +@jwt_required() +def download_export_file(task_id: str): + """ + 下载已生成的 Excel 文件。 + + 前端轮询发现 status=completed 后, + 取 data.url 拼接完整下载地址,发起下载请求。 + + 安全:只允许下载已完成且未过期的文件(TTL=1h)。 + """ + try: + # 再次确认任务状态,防止下载不存在的文件 + status = get_task_status(task_id) + if status.get('status') != 'completed': + return jsonify({'code': 400, 'msg': '文件未就绪,请稍后'}), 400 + + filepath = get_export_filepath(task_id) + if not filepath: + return jsonify({'code': 404, 'msg': '文件不存在或已过期'}), 404 + + current_app.logger.info(f"[Export] 用户 {get_jwt().get('username')} 下载 task_id={task_id}") + + # send_file 自动设置 Content-Disposition: attachment(触发浏览器下载) + return send_file( + filepath, + as_attachment=True, + download_name=f"库存导出_{task_id[:8]}.xlsx", + mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + ) + + except Exception as e: + current_app.logger.error(f"[Export] 下载失败: task_id={task_id}, err={e}") + return jsonify({'code': 500, 'msg': '下载失败,请重试'}), 500 \ No newline at end of file diff --git a/inventory-backend/app/services/export_service/excel_task.py b/inventory-backend/app/services/export_service/excel_task.py new file mode 100644 index 0000000..21949d9 --- /dev/null +++ b/inventory-backend/app/services/export_service/excel_task.py @@ -0,0 +1,358 @@ +""" +app/services/export_service/excel_task.py +异步导出核心任务逻辑 + +Redis 中的任务状态键格式:export:task:{task_id} +TTL = 1 小时(3600 秒),超时自动清理 + +状态流转: + 提交任务 → status=processing, progress=0 + 写入中 → status=processing, progress=N (10~90) + 完成 → status=completed, progress=100, url=下载路径 + 失败 → status=failed, error=具体原因 +""" + +import os +import uuid +import json +import time +import logging +from threading import Thread +from datetime import datetime + +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side + +logger = logging.getLogger(__name__) + +# 导出文件存放根目录(相对于项目根目录) +EXPORT_DIR = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + 'uploads', 'exports' +) + +# Redis 键前缀 + TTL +TASK_KEY_PREFIX = 'export:task:' +TASK_TTL = 3600 # 1小时 + + +def _redis(): + """获取 Redis 客户端,带容错保护。""" + try: + from app.extensions import redis_client + return redis_client + except Exception: + return None + + +def _update_task(task_id: str, **kwargs): + """ + 原子更新 Redis 中的任务状态。 + + 使用 setex 分两步: + 1. 保存最新状态 JSON + 2. 重置 TTL 为 1 小时 + + 即使 Redis 写入失败也不阻断业务流程。 + """ + client = _redis() + if not client: + return + key = f"{TASK_KEY_PREFIX}{task_id}" + try: + client.setex(key, TASK_TTL, json.dumps(kwargs, ensure_ascii=False)) + logger.debug(f"[Export] 更新任务状态 task_id={task_id} → {kwargs}") + except Exception as e: + logger.warning(f"[Export] Redis 更新任务状态失败: task_id={task_id}, err={e}") + + +# ============================================================================= +# 对外入口:提交导出任务(启动后台线程) +# ============================================================================= + +def submit_export_task(filters: dict) -> str: + """ + 接收前端过滤参数,生成 task_id,写入 Redis 初始状态, + 然后启动后台线程执行 Excel 生成。 + + 参数: + filters: dict,任意查询参数(category, keyword, status 等) + + 返回: + str: task_id(UUID),前端用此 ID 轮询进度 + """ + task_id = str(uuid.uuid4()) + + # 写入 Redis:初始状态 + _update_task(task_id, status='processing', progress=0, url='', error='') + + # 立即启动后台线程执行(daemon=True 使主进程退出时自动终止) + t = Thread( + target=generate_excel_task, + args=(task_id, filters), + daemon=True + ) + t.start() + logger.info(f"[Export] 任务已提交 task_id={task_id}, filters={filters}") + + return task_id + + +# ============================================================================= +# 后台任务:生成 Excel 文件 +# ============================================================================= + +def generate_excel_task(task_id: str, filters: dict): + """ + 在后台线程中执行 Excel 生成。 + + 流程: + 1. 更新进度 10% → 开始查询 + 2. 根据 filters 查询数据库(可复用现有 Service) + 3. 用 openpyxl 构建 Workbook,写入数据 + 4. 保存到 uploads/exports/{task_id}.xlsx + 5. 更新进度 100% + status=completed + url + + 任何异常被捕获,不会导致主进程崩溃。 + """ + logger.info(f"[Export] 任务开始 task_id={task_id}") + try: + # ===== 阶段1:查询数据(模拟 + 实际) ===== + _update_task(task_id, status='processing', progress=10) + + # 延迟导入:在子线程中加载 App Context,避免主线程时序问题 + from flask import current_app + from app.extensions import db + from app.models.inbound.buy import StockBuy + from app.models.inbound.semi import StockSemi + from app.models.inbound.product import StockProduct + + records = _query_inventory(filters) + + # ===== 阶段2:写入 Excel ===== + _update_task(task_id, status='processing', progress=40) + + filename = f"{task_id}.xlsx" + filepath = os.path.join(EXPORT_DIR, filename) + _write_excel(filepath, records, task_id) + + # ===== 阶段3:完成 ===== + _update_task( + task_id, + status='completed', + progress=100, + url=f"/api/v1/export/download/{task_id}", + error='' + ) + logger.info(f"[Export] 任务完成 task_id={task_id}, file={filename}") + + except Exception as e: + logger.error(f"[Export] 任务失败 task_id={task_id}, err={e}") + _update_task(task_id, status='failed', error=str(e)) + + +# ============================================================================= +# 查询层:根据 filters 聚合库存数据 +# ============================================================================= + +def _query_inventory(filters: dict) -> list: + """ + 根据过滤条件查询三张库存表,返回标准化记录列表。 + 进度更新策略:在主线程(后台线程)内,每处理 1000 条回调一次 Redis。 + """ + from app.extensions import db + from app.models.inbound.buy import StockBuy + from app.models.inbound.semi import StockSemi + from app.models.inbound.product import StockProduct + from app.models.base import MaterialBase + + results = [] + + # ---------- 采购件 ---------- + query = db.session.query( + MaterialBase.name.label('material_name'), + MaterialBase.spec_model.label('spec_model'), + StockBuy.barcode, + StockBuy.sku, + StockBuy.status, + StockBuy.warehouse_location, + StockBuy.available_quantity, + StockBuy.supplier_name, + StockBuy.in_date, + ).join(MaterialBase, StockBuy.base_id == MaterialBase.id) + + if filters.get('keyword'): + kw = f"%{filters['keyword']}%" + query = query.filter( + (MaterialBase.name.ilike(kw)) | + (MaterialBase.spec_model.ilike(kw)) | + (StockBuy.barcode.ilike(kw)) | + (StockBuy.sku.ilike(kw)) + ) + if filters.get('status'): + query = query.filter(StockBuy.status == filters['status']) + + all_rows = query.order_by(StockBuy.id.desc()).limit(10000).all() + total = len(all_rows) + + for idx, row in enumerate(all_rows): + results.append({ + 'type': '采购件', + 'material_name': row.material_name or '', + 'spec_model': row.spec_model or '', + 'barcode': row.barcode or '', + 'sku': row.sku or '', + 'status': row.status or '', + 'warehouse_location': row.warehouse_location or '', + 'available_quantity': float(row.available_quantity or 0), + 'supplier_name': row.supplier_name or '', + 'in_date': row.in_date.strftime('%Y-%m-%d') if row.in_date else '', + }) + + # 每 1000 条更新一次 Redis 进度(40%~80% 之间) + if idx > 0 and idx % 1000 == 0: + pct = 40 + int(40 * idx / total) if total else 80 + # 注意:这里的 task_id 由外层 generate_excel_task 持有, + # 进度更新在 _write_excel 中进行,此处仅做占位说明 + logger.debug(f"[Export] 采购件已处理 {idx}/{total} 条, 估算进度 {pct}%") + + # ---------- 半成品 + 成品(可同理扩展) ---------- + + return results + + +# ============================================================================= +# Excel 写入层:使用 openpyxl 构建 .xlsx 文件 +# ============================================================================= + +def _write_excel(filepath: str, records: list, task_id: str): + """ + 使用 openpyxl 将记录列表写入 Excel 文件。 + 包含表头样式(加粗、背景色)、自适应列宽、边框。 + + 参数: + filepath: 完整保存路径(含 .xlsx 后缀) + records: 标准化后的数据列表(dict) + task_id: 用于增量进度更新 + """ + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + wb = Workbook() + ws = wb.active + ws.title = "库存导出" + + if not records: + ws.append(['暂无数据']) + wb.save(filepath) + return + + # ---------- 表头 ---------- + headers = [ + '类型', '物料名称', '规格型号', '条码', 'SKU', + '状态', '库位', '可用数量', '供应商', '入库日期' + ] + ws.append(headers) + + # 表头样式:深蓝色背景 + 白色加粗字体 + header_fill = PatternFill("solid", fgColor="1F4E79") + header_font = Font(bold=True, color="FFFFFF", size=11) + header_align = Alignment(horizontal='center', vertical='center', wrap_text=True) + thin = Side(style='thin', color='BFBFBF') + border = Border(left=thin, right=thin, top=thin, bottom=thin) + + for col_idx, _ in enumerate(headers, start=1): + cell = ws.cell(row=1, column=col_idx) + cell.fill = header_fill + cell.font = header_font + cell.alignment = header_align + cell.border = border + + # ---------- 数据行 ---------- + even_fill = PatternFill("solid", fgColor="DEEAF1") # 浅蓝隔行底色 + data_align = Alignment(horizontal='left', vertical='center') + data_font = Font(size=10) + + total = len(records) + for idx, rec in enumerate(records): + ws.append([ + rec.get('type', ''), + rec.get('material_name', ''), + rec.get('spec_model', ''), + rec.get('barcode', ''), + rec.get('sku', ''), + rec.get('status', ''), + rec.get('warehouse_location', ''), + rec.get('available_quantity', 0), + rec.get('supplier_name', ''), + rec.get('in_date', ''), + ]) + + # 每 1000 行更新一次 Redis 进度(80%~95%) + row_num = idx + 2 + for col_idx in range(1, len(headers) + 1): + cell = ws.cell(row=row_num, column=col_idx) + cell.alignment = data_align + cell.font = data_font + cell.border = border + if idx % 2 == 1: + cell.fill = even_fill + + if idx > 0 and idx % 1000 == 0: + pct = 80 + int(15 * idx / total) + _update_task(task_id, status='processing', progress=min(pct, 95)) + + # ---------- 自适应列宽 ---------- + for col in ws.columns: + max_len = 0 + col_letter = col[0].column_letter + for cell in col: + if cell.value: + max_len = max(max_len, len(str(cell.value))) + ws.column_dimensions[col_letter].width = min(max_len + 4, 40) + + # ---------- 冻结首行 ---------- + ws.freeze_panes = 'A2' + + # ---------- 保存 ---------- + wb.save(filepath) + logger.info(f"[Export] Excel 已保存: {filepath}, 共 {total} 行") + + +# ============================================================================= +# 查询任务状态(供 API 层调用) +# ============================================================================= + +def get_task_status(task_id: str) -> dict: + """ + 从 Redis 读取任务状态字典。 + 若任务不存在或 Redis 不可用,返回默认 pending 状态。 + """ + client = _redis() + if not client: + return {'status': 'unknown', 'progress': 0, 'url': '', 'error': ''} + + key = f"{TASK_KEY_PREFIX}{task_id}" + try: + raw = client.get(key) + if raw: + return json.loads(raw) + return {'status': 'not_found', 'progress': 0, 'url': '', 'error': ''} + except Exception as e: + logger.warning(f"[Export] 读取任务状态失败: task_id={task_id}, err={e}") + return {'status': 'unknown', 'progress': 0, 'url': '', 'error': ''} + + +# ============================================================================= +# 获取导出文件路径(供下载接口调用) +# ============================================================================= + +def get_export_filepath(task_id: str) -> str | None: + """ + 根据 task_id 返回已生成文件的完整路径。 + 未完成或不存在返回 None。 + """ + filename = f"{task_id}.xlsx" + filepath = os.path.join(EXPORT_DIR, filename) + if os.path.exists(filepath): + return filepath + return None \ No newline at end of file