feat: 实现异步导出骨架(Threading + Redis 状态流转),支持 POST 提交/轮询状态/下载文件

This commit is contained in:
DXC
2026-05-19 10:35:33 +08:00
parent 6e1e1aa998
commit 4d81056075
4 changed files with 523 additions and 0 deletions

View File

@ -234,6 +234,17 @@ def create_app():
except Exception as e:
print(f"❌ 错误: Scan 模块注册失败: {e}")
# -----------------------------------------------------
# 2.x 注册异步导出模块 (Export)
# -----------------------------------------------------
try:
from app.api.v1.export import export_bp
app.register_blueprint(export_bp, url_prefix='/api/v1/export')
app.register_blueprint(export_bp, url_prefix='/api/export', name='export_legacy')
print("✅ Export 模块注册成功")
except Exception as e:
print(f"❌ 错误: Export 模块注册失败: {e}")
# =========================================================
# 3. 预加载数据模型
# =========================================================

View File

@ -0,0 +1,10 @@
"""
app/api/v1/export/__init__.py
导出模块 Blueprint 注册文件
"""
from flask import Blueprint
export_bp = Blueprint('export', __name__, url_prefix='/api/v1/export')
from app.api.v1.export import inventory_export

View File

@ -0,0 +1,144 @@
"""
app/api/v1/export/inventory_export.py
异步导出核心接口
提供三个端点:
POST /api/v1/export/inventory → 提交导出任务,返回 task_id
GET /api/v1/export/status/<task_id> → 查询任务状态(轮询)
GET /api/v1/export/download/<task_id> → 下载已生成的 Excel 文件
"""
import os
from flask import Blueprint, request, jsonify, send_file, current_app
from flask_jwt_extended import jwt_required, get_jwt
from app.services.export_service.excel_task import (
submit_export_task,
get_task_status,
get_export_filepath,
)
from app.utils.decorators import permission_required
export_bp = Blueprint('export', __name__, url_prefix='/api/v1/export')
# =============================================================================
# 任务提交接口
# =============================================================================
@export_bp.route('/inventory', methods=['POST'])
@jwt_required()
@permission_required('inventory_manage')
def submit_export():
"""
接收前端导出请求,生成 task_id立即返回。
请求体JSON:
{
"keyword": "螺丝",
"category": "原材料",
"status": "在库"
}
响应:
{ "code": 200, "msg": "success", "data": { "task_id": "xxx" } }
"""
try:
filters = request.get_json() or {}
# 生成 task_id 并启动后台任务(同步返回,不阻塞)
task_id = submit_export_task(filters)
current_app.logger.info(
f"[Export] 用户 {get_jwt().get('username')} 提交导出任务 task_id={task_id}"
)
return jsonify({
'code': 200,
'msg': '导出任务已创建',
'data': {
'task_id': task_id, # 前端用此 ID 轮询 /export/status/<task_id>
}
})
except Exception as e:
current_app.logger.error(f"[Export] 提交导出任务失败: {e}")
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# =============================================================================
# 进度查询接口(前端轮询)
# =============================================================================
@export_bp.route('/status/<task_id>', methods=['GET'])
@jwt_required()
def get_export_status(task_id: str):
"""
从 Redis 读取任务状态,供前端轮询。
响应示例(处理中):
{
"code": 200,
"data": { "status": "processing", "progress": 45, "url": "", "error": "" }
}
响应示例(已完成):
{
"code": 200,
"data": { "status": "completed", "progress": 100, "url": "/api/v1/export/download/xxx", "error": "" }
}
"""
try:
status = get_task_status(task_id)
if status.get('status') == 'not_found':
return jsonify({'code': 404, 'msg': '任务不存在或已过期'}), 404
return jsonify({
'code': 200,
'msg': 'success',
'data': status
})
except Exception as e:
current_app.logger.error(f"[Export] 查询任务状态失败: task_id={task_id}, err={e}")
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
# =============================================================================
# 文件下载接口
# =============================================================================
@export_bp.route('/download/<task_id>', methods=['GET'])
@jwt_required()
def download_export_file(task_id: str):
"""
下载已生成的 Excel 文件。
前端轮询发现 status=completed 后,
取 data.url 拼接完整下载地址,发起下载请求。
安全只允许下载已完成且未过期的文件TTL=1h
"""
try:
# 再次确认任务状态,防止下载不存在的文件
status = get_task_status(task_id)
if status.get('status') != 'completed':
return jsonify({'code': 400, 'msg': '文件未就绪,请稍后'}), 400
filepath = get_export_filepath(task_id)
if not filepath:
return jsonify({'code': 404, 'msg': '文件不存在或已过期'}), 404
current_app.logger.info(f"[Export] 用户 {get_jwt().get('username')} 下载 task_id={task_id}")
# send_file 自动设置 Content-Disposition: attachment触发浏览器下载
return send_file(
filepath,
as_attachment=True,
download_name=f"库存导出_{task_id[:8]}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
current_app.logger.error(f"[Export] 下载失败: task_id={task_id}, err={e}")
return jsonify({'code': 500, 'msg': '下载失败,请重试'}), 500

View File

@ -0,0 +1,358 @@
"""
app/services/export_service/excel_task.py
异步导出核心任务逻辑
Redis 中的任务状态键格式export:task:{task_id}
TTL = 1 小时3600 秒),超时自动清理
状态流转:
提交任务 → status=processing, progress=0
写入中 → status=processing, progress=N (10~90)
完成 → status=completed, progress=100, url=下载路径
失败 → status=failed, error=具体原因
"""
import os
import uuid
import json
import time
import logging
from threading import Thread
from datetime import datetime
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
logger = logging.getLogger(__name__)
# 导出文件存放根目录(相对于项目根目录)
EXPORT_DIR = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
'uploads', 'exports'
)
# Redis 键前缀 + TTL
TASK_KEY_PREFIX = 'export:task:'
TASK_TTL = 3600 # 1小时
def _redis():
"""获取 Redis 客户端,带容错保护。"""
try:
from app.extensions import redis_client
return redis_client
except Exception:
return None
def _update_task(task_id: str, **kwargs):
"""
原子更新 Redis 中的任务状态。
使用 setex 分两步:
1. 保存最新状态 JSON
2. 重置 TTL 为 1 小时
即使 Redis 写入失败也不阻断业务流程。
"""
client = _redis()
if not client:
return
key = f"{TASK_KEY_PREFIX}{task_id}"
try:
client.setex(key, TASK_TTL, json.dumps(kwargs, ensure_ascii=False))
logger.debug(f"[Export] 更新任务状态 task_id={task_id}{kwargs}")
except Exception as e:
logger.warning(f"[Export] Redis 更新任务状态失败: task_id={task_id}, err={e}")
# =============================================================================
# 对外入口:提交导出任务(启动后台线程)
# =============================================================================
def submit_export_task(filters: dict) -> str:
"""
接收前端过滤参数,生成 task_id写入 Redis 初始状态,
然后启动后台线程执行 Excel 生成。
参数:
filters: dict任意查询参数category, keyword, status 等)
返回:
str: task_idUUID前端用此 ID 轮询进度
"""
task_id = str(uuid.uuid4())
# 写入 Redis初始状态
_update_task(task_id, status='processing', progress=0, url='', error='')
# 立即启动后台线程执行daemon=True 使主进程退出时自动终止)
t = Thread(
target=generate_excel_task,
args=(task_id, filters),
daemon=True
)
t.start()
logger.info(f"[Export] 任务已提交 task_id={task_id}, filters={filters}")
return task_id
# =============================================================================
# 后台任务:生成 Excel 文件
# =============================================================================
def generate_excel_task(task_id: str, filters: dict):
"""
在后台线程中执行 Excel 生成。
流程:
1. 更新进度 10% → 开始查询
2. 根据 filters 查询数据库(可复用现有 Service
3. 用 openpyxl 构建 Workbook写入数据
4. 保存到 uploads/exports/{task_id}.xlsx
5. 更新进度 100% + status=completed + url
任何异常被捕获,不会导致主进程崩溃。
"""
logger.info(f"[Export] 任务开始 task_id={task_id}")
try:
# ===== 阶段1查询数据模拟 + 实际) =====
_update_task(task_id, status='processing', progress=10)
# 延迟导入:在子线程中加载 App Context避免主线程时序问题
from flask import current_app
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
records = _query_inventory(filters)
# ===== 阶段2写入 Excel =====
_update_task(task_id, status='processing', progress=40)
filename = f"{task_id}.xlsx"
filepath = os.path.join(EXPORT_DIR, filename)
_write_excel(filepath, records, task_id)
# ===== 阶段3完成 =====
_update_task(
task_id,
status='completed',
progress=100,
url=f"/api/v1/export/download/{task_id}",
error=''
)
logger.info(f"[Export] 任务完成 task_id={task_id}, file={filename}")
except Exception as e:
logger.error(f"[Export] 任务失败 task_id={task_id}, err={e}")
_update_task(task_id, status='failed', error=str(e))
# =============================================================================
# 查询层:根据 filters 聚合库存数据
# =============================================================================
def _query_inventory(filters: dict) -> list:
"""
根据过滤条件查询三张库存表,返回标准化记录列表。
进度更新策略:在主线程(后台线程)内,每处理 1000 条回调一次 Redis。
"""
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
results = []
# ---------- 采购件 ----------
query = db.session.query(
MaterialBase.name.label('material_name'),
MaterialBase.spec_model.label('spec_model'),
StockBuy.barcode,
StockBuy.sku,
StockBuy.status,
StockBuy.warehouse_location,
StockBuy.available_quantity,
StockBuy.supplier_name,
StockBuy.in_date,
).join(MaterialBase, StockBuy.base_id == MaterialBase.id)
if filters.get('keyword'):
kw = f"%{filters['keyword']}%"
query = query.filter(
(MaterialBase.name.ilike(kw)) |
(MaterialBase.spec_model.ilike(kw)) |
(StockBuy.barcode.ilike(kw)) |
(StockBuy.sku.ilike(kw))
)
if filters.get('status'):
query = query.filter(StockBuy.status == filters['status'])
all_rows = query.order_by(StockBuy.id.desc()).limit(10000).all()
total = len(all_rows)
for idx, row in enumerate(all_rows):
results.append({
'type': '采购件',
'material_name': row.material_name or '',
'spec_model': row.spec_model or '',
'barcode': row.barcode or '',
'sku': row.sku or '',
'status': row.status or '',
'warehouse_location': row.warehouse_location or '',
'available_quantity': float(row.available_quantity or 0),
'supplier_name': row.supplier_name or '',
'in_date': row.in_date.strftime('%Y-%m-%d') if row.in_date else '',
})
# 每 1000 条更新一次 Redis 进度40%~80% 之间)
if idx > 0 and idx % 1000 == 0:
pct = 40 + int(40 * idx / total) if total else 80
# 注意:这里的 task_id 由外层 generate_excel_task 持有,
# 进度更新在 _write_excel 中进行,此处仅做占位说明
logger.debug(f"[Export] 采购件已处理 {idx}/{total} 条, 估算进度 {pct}%")
# ---------- 半成品 + 成品(可同理扩展) ----------
return results
# =============================================================================
# Excel 写入层:使用 openpyxl 构建 .xlsx 文件
# =============================================================================
def _write_excel(filepath: str, records: list, task_id: str):
"""
使用 openpyxl 将记录列表写入 Excel 文件。
包含表头样式(加粗、背景色)、自适应列宽、边框。
参数:
filepath: 完整保存路径(含 .xlsx 后缀)
records: 标准化后的数据列表dict
task_id: 用于增量进度更新
"""
os.makedirs(os.path.dirname(filepath), exist_ok=True)
wb = Workbook()
ws = wb.active
ws.title = "库存导出"
if not records:
ws.append(['暂无数据'])
wb.save(filepath)
return
# ---------- 表头 ----------
headers = [
'类型', '物料名称', '规格型号', '条码', 'SKU',
'状态', '库位', '可用数量', '供应商', '入库日期'
]
ws.append(headers)
# 表头样式:深蓝色背景 + 白色加粗字体
header_fill = PatternFill("solid", fgColor="1F4E79")
header_font = Font(bold=True, color="FFFFFF", size=11)
header_align = Alignment(horizontal='center', vertical='center', wrap_text=True)
thin = Side(style='thin', color='BFBFBF')
border = Border(left=thin, right=thin, top=thin, bottom=thin)
for col_idx, _ in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx)
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_align
cell.border = border
# ---------- 数据行 ----------
even_fill = PatternFill("solid", fgColor="DEEAF1") # 浅蓝隔行底色
data_align = Alignment(horizontal='left', vertical='center')
data_font = Font(size=10)
total = len(records)
for idx, rec in enumerate(records):
ws.append([
rec.get('type', ''),
rec.get('material_name', ''),
rec.get('spec_model', ''),
rec.get('barcode', ''),
rec.get('sku', ''),
rec.get('status', ''),
rec.get('warehouse_location', ''),
rec.get('available_quantity', 0),
rec.get('supplier_name', ''),
rec.get('in_date', ''),
])
# 每 1000 行更新一次 Redis 进度80%~95%
row_num = idx + 2
for col_idx in range(1, len(headers) + 1):
cell = ws.cell(row=row_num, column=col_idx)
cell.alignment = data_align
cell.font = data_font
cell.border = border
if idx % 2 == 1:
cell.fill = even_fill
if idx > 0 and idx % 1000 == 0:
pct = 80 + int(15 * idx / total)
_update_task(task_id, status='processing', progress=min(pct, 95))
# ---------- 自适应列宽 ----------
for col in ws.columns:
max_len = 0
col_letter = col[0].column_letter
for cell in col:
if cell.value:
max_len = max(max_len, len(str(cell.value)))
ws.column_dimensions[col_letter].width = min(max_len + 4, 40)
# ---------- 冻结首行 ----------
ws.freeze_panes = 'A2'
# ---------- 保存 ----------
wb.save(filepath)
logger.info(f"[Export] Excel 已保存: {filepath}, 共 {total}")
# =============================================================================
# 查询任务状态(供 API 层调用)
# =============================================================================
def get_task_status(task_id: str) -> dict:
"""
从 Redis 读取任务状态字典。
若任务不存在或 Redis 不可用,返回默认 pending 状态。
"""
client = _redis()
if not client:
return {'status': 'unknown', 'progress': 0, 'url': '', 'error': ''}
key = f"{TASK_KEY_PREFIX}{task_id}"
try:
raw = client.get(key)
if raw:
return json.loads(raw)
return {'status': 'not_found', 'progress': 0, 'url': '', 'error': ''}
except Exception as e:
logger.warning(f"[Export] 读取任务状态失败: task_id={task_id}, err={e}")
return {'status': 'unknown', 'progress': 0, 'url': '', 'error': ''}
# =============================================================================
# 获取导出文件路径(供下载接口调用)
# =============================================================================
def get_export_filepath(task_id: str) -> str | None:
"""
根据 task_id 返回已生成文件的完整路径。
未完成或不存在返回 None。
"""
filename = f"{task_id}.xlsx"
filepath = os.path.join(EXPORT_DIR, filename)
if os.path.exists(filepath):
return filepath
return None