From af56c94efe2c822d7ec21a6e9a785dab5ed1e7fa Mon Sep 17 00:00:00 2001 From: zhanghuilai Date: Wed, 11 Feb 2026 16:28:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E4=BB=BB=E5=8A=A1=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- PROJECT_README.md | 205 ++++++++++++++++++++++ debug_cleanup.py | 97 +++++++++++ gasflux.ini | 14 +- gasflux.ini.example | 3 +- src/gasflux/app.py | 17 +- src/gasflux/blueprints/config.py | 3 + src/gasflux/blueprints/download.py | 11 +- src/gasflux/blueprints/stats.py | 5 +- src/gasflux/blueprints/upload.py | 269 +++++++++++++++++++++-------- src/gasflux/config_reader.py | 10 ++ src/gasflux/data_processor.py | 59 ++++--- src/gasflux/janitor.py | 258 +++++++++++++++++++++------ web_api_data/outputs/gasflux.db | Bin 0 -> 49152 bytes 13 files changed, 782 insertions(+), 169 deletions(-) create mode 100644 PROJECT_README.md create mode 100644 debug_cleanup.py create mode 100644 web_api_data/outputs/gasflux.db diff --git a/PROJECT_README.md b/PROJECT_README.md new file mode 100644 index 0000000..7dd094f --- /dev/null +++ b/PROJECT_README.md @@ -0,0 +1,205 @@ +# GasFlux 项目说明文档 + +## 项目概述 + +GasFlux 是一个基于 Flask 的气体通量分析 Web API 系统,专门用于处理无人机或飞行器采集的气体浓度数据,支持 CO₂、CH₄ 等温室气体通量的计算。 + +## 核心功能 + +### 🧪 数据处理 +- **数据预处理**: Excel/CSV 文件解析,数据验证和清洗 +- **背景校正**: 集成 FastChrom 算法的智能背景扣除 +- **空间插值**: 克里金插值算法,支持多种半变异函数模型 +- **通量计算**: 基于质量平衡法的气体通量计算 + +### 🌐 Web API 服务 +- **RESTful API**: 完整的 HTTP API 接口 +- **异步处理**: 支持大文件后台处理 +- **实时监控**: 任务状态实时查询和进度跟踪 +- **文件管理**: 自动上传、处理和下载管理 + +### 🔐 安全与认证 +- **API 密钥认证**: 安全的密钥管理系统 +- **权限控制**: 支持不同范围的 API 访问权限 +- **引导密钥**: 初始管理员密钥配置 + +### 🧹 自动维护 +- **任务清理**: 自动清理过期任务和文件 +- **存储管理**: 智能的磁盘空间管理 +- **日志记录**: 完整的操作日志和性能监控 + +## 技术架构 + +### 后端技术栈 +- **框架**: Flask (Python Web 框架) +- **数据库**: SQLite (任务状态和配置存储) +- **部署**: Waitress (生产环境 WSGI 服务器) +- **认证**: 自定义 API 密钥系统 + +### 数据处理引擎 +- **核心算法**: GasFlux 气体通量分析引擎 +- **数据格式**: Excel, CSV, YAML 配置 +- **输出格式**: CSV 数据, HTML 报告, PNG 图表 + +### 前端界面 +- **Web 界面**: 内置 HTML/CSS/JavaScript 控制台 +- **API 客户端**: 支持 cURL, Python requests 等 +- **实时更新**: WebSocket 风格的任务状态轮询 + +## 快速开始 + +### 1. 环境准备 +```bash +# Python 3.8+ 环境 +python --version + +# 安装依赖 +pip install -r requirements.txt +``` + +### 2. 配置系统 +```bash +# 编辑配置文件 +notepad gasflux.ini + +# 创建初始 API 密钥 +python create_api_key.py +``` + +### 3. 启动服务 +```bash +# 启动 Web API 服务 +python -m src.gasflux.app + +# 服务启动后访问: http://localhost:5001 +``` + +### 4. 使用 API +```bash +# 上传处理文件 +curl -X POST \ + -H "X-API-Key: your-api-key" \ + -F "file=@data.xlsx" \ + http://localhost:5001/upload +``` + +## 配置文件说明 + +### gasflux.ini (主配置文件) +```ini +[server] +host = 0.0.0.0 +port = 5001 +debug = false + +[paths] +uploads = ./web_api_data/uploads +outputs = ./web_api_data/outputs + +[cleanup] +task_cleanup_interval = 30 +successful_task_cleanup_age = 60 +failed_task_cleanup_age = 60 +janitor_dry_run = false + +[security] +admin_bootstrap_key = bootstrap_key_2024 +``` + +### gasflux_config.yaml (处理配置) +```yaml +output_dir: ./output +gases: + ch4: [1.5, 10.0] + co2: [300, 500] +strategies: + background: "algorithm" + spatial: "curtain" + interpolation: "kriging" +``` + +## API 接口列表 + +### 核心接口 +- `POST /upload` - 文件上传和处理 +- `GET /task/{task_id}` - 查询任务状态 +- `GET /download/{path}` - 文件下载 +- `GET /health` - 系统健康检查 + +### 管理接口 +- `POST /api-keys` - 创建 API 密钥 +- `GET /api-keys` - 列出 API 密钥 +- `DELETE /api-keys/{key_id}` - 撤销 API 密钥 + +### 监控接口 +- `GET /stats` - 系统统计信息 +- `GET /config` - 配置信息查询 + +## 部署选项 + +### 开发环境 +```bash +# 直接运行 +python -m src.gasflux.app +``` + +### 生产环境 (Waitress) +```bash +# 使用 Waitress 服务器 +python server_waitress.py +``` + +### Docker 部署 +```bash +# 构建 Docker 镜像 +docker build -t gasflux . + +# 运行容器 +docker run -p 5001:5001 gasflux +``` + +## 目录结构 + +``` +gasflux-develop/ +├── src/gasflux/ # 核心源码 +│ ├── app.py # Flask 应用主文件 +│ ├── auth.py # 认证模块 +│ ├── db.py # 数据库模块 +│ ├── janitor.py # 自动清理模块 +│ └── blueprints/ # API 蓝图 +├── gasflux.ini # 主配置文件 +├── create_api_key.py # API 密钥创建工具 +├── requirements.txt # Python 依赖 +├── web_api_data/ # 数据目录 +│ ├── uploads/ # 上传文件 +│ └── outputs/ # 输出文件 +└── logs/ # 日志文件 +``` + +## 常见问题 + +### Q: 如何创建 API 密钥? +A: 运行 `python create_api_key.py`,按照提示输入描述信息。 + +### Q: 如何查看任务处理状态? +A: 使用 `GET /task/{task_id}` 接口,或在 Web 界面中查看。 + +### Q: 如何修改清理策略? +A: 编辑 `gasflux.ini` 中的 `[cleanup]` 部分配置。 + +### Q: 系统支持哪些气体? +A: 目前支持 CO₂、CH₄ 等主要温室气体,可在配置中扩展。 + +## 技术支持 + +- **项目主页**: [GitHub Repository] +- **文档**: [API_DOCUMENTATION.md](API_DOCUMENTATION.md) +- **配置指南**: [CONFIG_README.md](CONFIG_README.md) +- **部署指南**: [WAITRESS_DEPLOYMENT.md](WAITRESS_DEPLOYMENT.md) + +## 版本信息 + +- **当前版本**: v0.2.1 +- **Python 版本**: 3.8+ +- **许可证**: AGPL v3.0 \ No newline at end of file diff --git a/debug_cleanup.py b/debug_cleanup.py new file mode 100644 index 0000000..67567b6 --- /dev/null +++ b/debug_cleanup.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +Debug script to test failed task cleanup logic +""" + +import sqlite3 +import os +from pathlib import Path + +def test_cleanup_logic(): + """Test the cleanup logic with a simple SQLite database""" + + # Create a temporary database for testing + db_path = Path("test_cleanup.db") + if db_path.exists(): + db_path.unlink() + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + + # Create tasks table + conn.execute(""" + CREATE TABLE tasks ( + task_id TEXT PRIMARY KEY, + status TEXT, + created_at TEXT, + started_at TEXT, + finished_at TEXT, + deleted_at TEXT, + output_dir TEXT + ) + """) + + # Insert test data - a failed task from 2 minutes ago + conn.execute(""" + INSERT INTO tasks (task_id, status, created_at, started_at, finished_at) + VALUES (?, 'failed', datetime('now', '-2 minutes'), datetime('now', '-2 minutes'), datetime('now', '-2 minutes')) + """, ('test_task_123',)) + + # Insert test data - a failed task from 30 seconds ago (should not be cleaned) + conn.execute(""" + INSERT INTO tasks (task_id, status, created_at, started_at, finished_at) + VALUES (?, 'failed', datetime('now', '-30 seconds'), datetime('now', '-30 seconds'), datetime('now', '-30 seconds')) + """, ('test_task_456',)) + + conn.commit() + + # Test the cleanup query with 60 seconds threshold + failed_task_cleanup_age = 60 + + print("Testing cleanup logic...") + print(f"Cleanup age: {failed_task_cleanup_age} seconds") + + # Show all failed tasks + all_failed = conn.execute(""" + SELECT task_id, status, created_at, finished_at + FROM tasks + WHERE status = 'failed' AND deleted_at IS NULL + """).fetchall() + + print(f"\nTotal failed tasks: {len(all_failed)}") + for row in all_failed: + print(f" Task {row['task_id']}: finished_at={row['finished_at']}") + + # Test cleanup query + cleanup_rows = conn.execute(""" + SELECT task_id, output_dir + FROM tasks + WHERE status = 'failed' + AND deleted_at IS NULL + AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '-' || ? || ' seconds') + """, (failed_task_cleanup_age,)).fetchall() + + print(f"\nTasks to be cleaned up: {len(cleanup_rows)}") + for row in cleanup_rows: + print(f" Task {row['task_id']} will be cleaned up") + + # Test with timezone adjustment (+8 hours) + print(" +Testing with timezone adjustment (+8 hours):") + cleanup_rows_tz = conn.execute(""" + SELECT task_id, output_dir + FROM tasks + WHERE status = 'failed' + AND deleted_at IS NULL + AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds') + """, (failed_task_cleanup_age,)).fetchall() + + print(f"Tasks to be cleaned up (with TZ): {len(cleanup_rows_tz)}") + for row in cleanup_rows_tz: + print(f" Task {row['task_id']} will be cleaned up") + + conn.close() + db_path.unlink() + +if __name__ == "__main__": + test_cleanup_logic() \ No newline at end of file diff --git a/gasflux.ini b/gasflux.ini index 754b2b3..9a3456c 100644 --- a/gasflux.ini +++ b/gasflux.ini @@ -1,9 +1,9 @@ [server] # Server configuration host = 0.0.0.0 -port = 5000 +port = 5001 debug = false -base_url = http://localhost:5000 +base_url = http://localhost:5001 [paths] # Directory paths (relative to project root or absolute) @@ -25,8 +25,14 @@ admin_bootstrap_key = bootstrap_key_2024 [cleanup] # Task cleanup settings -task_cleanup_interval = 3600 # 1 hour in seconds -max_task_age = 86400 # 24 hours in seconds +# 30 seconds +task_cleanup_interval = 30 +# 24 hours in seconds +max_task_age = 86400 +# 1 minute in seconds for successful tasks +successful_task_cleanup_age = 60 +# 1 minute in seconds for failed tasks +failed_task_cleanup_age = 60 janitor_dry_run = false [performance] diff --git a/gasflux.ini.example b/gasflux.ini.example index 87bc281..7b1f693 100644 --- a/gasflux.ini.example +++ b/gasflux.ini.example @@ -28,9 +28,8 @@ admin_bootstrap_key = task_cleanup_interval = 3600 # 1 hour in seconds max_task_age = 86400 # 24 hours in seconds janitor_dry_run = false - [performance] # Performance tuning threads = 8 connection_limit = 100 -channel_timeout = 300 \ No newline at end of file +channel_timeout = 300 diff --git a/src/gasflux/app.py b/src/gasflux/app.py index 14981a2..197c02b 100644 --- a/src/gasflux/app.py +++ b/src/gasflux/app.py @@ -56,6 +56,8 @@ class Config: # Task management TASK_CLEANUP_INTERVAL = config_reader.task_cleanup_interval MAX_TASK_AGE = config_reader.max_task_age + SUCCESSFUL_TASK_CLEANUP_AGE = config_reader.successful_task_cleanup_age + FAILED_TASK_CLEANUP_AGE = config_reader.failed_task_cleanup_age # Performance tuning THREADS = config_reader.threads @@ -151,6 +153,8 @@ class Config: 'cors_origins': cls.CORS_ORIGINS, 'task_cleanup_interval': cls.TASK_CLEANUP_INTERVAL, 'max_task_age': cls.MAX_TASK_AGE, + 'successful_task_cleanup_age': cls.SUCCESSFUL_TASK_CLEANUP_AGE, + 'failed_task_cleanup_age': cls.FAILED_TASK_CLEANUP_AGE, 'threads': cls.THREADS, 'connection_limit': cls.CONNECTION_LIMIT, 'channel_timeout': cls.CHANNEL_TIMEOUT @@ -672,6 +676,15 @@ app.config['JANITOR_DRY_RUN'] = str(Config.JANITOR_DRY_RUN).lower() if Config.ADMIN_BOOTSTRAP_KEY: app.config['ADMIN_BOOTSTRAP_KEY'] = Config.ADMIN_BOOTSTRAP_KEY +# Task cleanup configuration +app.config['SUCCESSFUL_TASK_CLEANUP_AGE'] = Config.SUCCESSFUL_TASK_CLEANUP_AGE +app.config['FAILED_TASK_CLEANUP_AGE'] = Config.FAILED_TASK_CLEANUP_AGE +app.config['TASK_CLEANUP_INTERVAL'] = Config.TASK_CLEANUP_INTERVAL + +# Debug logging for cleanup configuration +logger.info(f"App config: FAILED_TASK_CLEANUP_AGE = {app.config['FAILED_TASK_CLEANUP_AGE']}") +logger.info(f"App config: TASK_CLEANUP_INTERVAL = {app.config['TASK_CLEANUP_INTERVAL']}") + # Log current configuration logger.info(f"Upload folder: {Config.UPLOAD_FOLDER}") logger.info(f"Output folder: {Config.OUTPUT_FOLDER}") @@ -736,10 +749,10 @@ app.register_blueprint(download_bp) app.register_blueprint(web_bp) app.register_blueprint(api_keys_bp) -# Task status persistence now uses SQLite only + # Task status persistence now uses SQLite only # JSON persistence has been disabled - functions removed from shared.py -# Initialize janitor for background cleanup + # Initialize janitor for background cleanup try: from .janitor import start_janitor, reconcile_tasks_on_startup with app.app_context(): diff --git a/src/gasflux/blueprints/config.py b/src/gasflux/blueprints/config.py index 3305639..db98f5e 100644 --- a/src/gasflux/blueprints/config.py +++ b/src/gasflux/blueprints/config.py @@ -40,6 +40,8 @@ def get_config(): "GASFLUX_CORS_ORIGINS", "GASFLUX_TASK_CLEANUP_INTERVAL", "GASFLUX_MAX_TASK_AGE", + "GASFLUX_SUCCESSFUL_TASK_CLEANUP_AGE", + "GASFLUX_FAILED_TASK_CLEANUP_AGE", "GASFLUX_THREADS", "GASFLUX_CONNECTION_LIMIT", "GASFLUX_CHANNEL_TIMEOUT" @@ -52,6 +54,7 @@ def get_config(): "GASFLUX_MAX_CONTENT_LENGTH", "GASFLUX_LOG_LEVEL", "GASFLUX_LOG_FILE", "GASFLUX_CORS_ORIGINS", "GASFLUX_TASK_CLEANUP_INTERVAL", "GASFLUX_MAX_TASK_AGE", + "GASFLUX_SUCCESSFUL_TASK_CLEANUP_AGE", "GASFLUX_FAILED_TASK_CLEANUP_AGE", "GASFLUX_THREADS", "GASFLUX_CONNECTION_LIMIT", "GASFLUX_CHANNEL_TIMEOUT" ] diff --git a/src/gasflux/blueprints/download.py b/src/gasflux/blueprints/download.py index 80ad0f2..894c054 100644 --- a/src/gasflux/blueprints/download.py +++ b/src/gasflux/blueprints/download.py @@ -19,21 +19,24 @@ def _mark_task_downloaded(task_id): from ..db import get_db_path as get_config_db_path db_path = get_config_db_path(current_app) + # Get cleanup age for successful tasks from config (in seconds) + successful_task_cleanup_age = current_app.config.get('SUCCESSFUL_TASK_CLEANUP_AGE', 3600) + try: conn = sqlite3.connect(str(db_path), check_same_thread=False) conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=3000") - # Update downloaded timestamp and set deletion time (10 minutes later) + # Update downloaded timestamp and set deletion time based on config conn.execute(""" UPDATE tasks SET downloaded_at = datetime('now', '+8 hours'), - delete_after_at = datetime('now', '+8 hours', '+10 minutes') + delete_after_at = datetime('now', '+8 hours', '+' || ? || ' seconds') WHERE task_id = ? - """, (task_id,)) + """, (successful_task_cleanup_age, task_id)) conn.commit() - logger.info(f"Task {task_id} marked as downloaded, scheduled for deletion in 10 minutes") + logger.info(f"Task {task_id} marked as downloaded, scheduled for deletion in {successful_task_cleanup_age} seconds") except Exception as e: logger.error(f"Failed to mark task {task_id} as downloaded: {str(e)}", exc_info=True) diff --git a/src/gasflux/blueprints/stats.py b/src/gasflux/blueprints/stats.py index 020fba3..5545efa 100644 --- a/src/gasflux/blueprints/stats.py +++ b/src/gasflux/blueprints/stats.py @@ -54,10 +54,11 @@ def get_stats(): current_time = time.time() rows = db.execute(""" - SELECT task_id, status, message, updated_at + SELECT task_id, status, message, + COALESCE(finished_at, started_at, created_at) as updated_at FROM tasks WHERE deleted_at IS NULL - ORDER BY updated_at DESC + ORDER BY COALESCE(finished_at, started_at, created_at) DESC LIMIT 20 """).fetchall() diff --git a/src/gasflux/blueprints/upload.py b/src/gasflux/blueprints/upload.py index 1057c38..f5d4e2a 100644 --- a/src/gasflux/blueprints/upload.py +++ b/src/gasflux/blueprints/upload.py @@ -16,6 +16,141 @@ from ..app import process_data_async from ..shared import _format_response, log_performance, logger, ALLOWED_DATA_EXTENSIONS, ALLOWED_CONFIG_EXTENSIONS, allowed_file, update_task_status, TASK_STATUS_PENDING, TASK_STATUS_FAILED from ..auth import require_api_key + +def get_file_extension(filename): + """获取文件扩展名,与 allowed_file 函数保持一致""" + if '.' not in filename: + return '' + return '.' + filename.rsplit('.', 1)[1].lower() + + +def validate_upload_files(data_file, config_file=None): + """验证上传文件并返回详细的错误信息""" + errors = [] + + # 检查数据文件 + if not data_file or data_file.filename == '': + errors.append({ + 'field': 'file', + 'error': '数据文件不能为空', + 'details': '请上传一个有效的Excel文件' + }) + elif not allowed_file(data_file.filename, ALLOWED_DATA_EXTENSIONS): + errors.append({ + 'field': 'file', + 'error': f'不支持的文件类型: {get_file_extension(data_file.filename)}', + 'details': f'只支持以下格式: {", ".join(ALLOWED_DATA_EXTENSIONS)}' + }) + + # 检查配置文件(如果提供) + if config_file and config_file.filename != '': + if not allowed_file(config_file.filename, ALLOWED_CONFIG_EXTENSIONS): + errors.append({ + 'field': 'config', + 'error': f'不支持的配置文件类型: {get_file_extension(config_file.filename)}', + 'details': f'只支持以下格式: {", ".join(ALLOWED_CONFIG_EXTENSIONS)}' + }) + + return errors + + +def parse_config_file(config_file): + """解析配置文件并提供详细错误信息""" + if not config_file or config_file.filename == '': + # 使用默认配置 + try: + default_config_path = Path(__file__).parent.parent / "gasflux_config.yaml" + with open(default_config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f), None + except Exception as e: + return None, f"加载默认配置文件失败: {str(e)}" + + try: + config_file.stream.seek(0) + config_text = config_file.read().decode('utf-8', errors='strict') + + if not config_text.strip(): + return None, "配置文件为空" + + config_data = yaml.safe_load(config_text) + + # 基本验证 + if not isinstance(config_data, dict): + return None, "配置文件格式错误:应为字典格式" + + # 重置流用于后续保存 + config_file.stream = BytesIO(config_text.encode('utf-8')) + return config_data, None + + except UnicodeDecodeError as e: + return None, f"配置文件编码错误,请使用UTF-8编码: {str(e)}" + except yaml.YAMLError as e: + return None, f"YAML语法错误: {str(e)}" + except Exception as e: + return None, f"配置文件解析失败: {str(e)}" + + +def save_upload_file(file_obj, save_path, task_id, file_type="file"): + """安全保存上传文件并提供详细错误信息""" + try: + # 检查磁盘空间 + save_path.parent.mkdir(parents=True, exist_ok=True) + + # 检查文件大小(如果有content_length) + if hasattr(file_obj, 'content_length') and file_obj.content_length: + max_size = current_app.config.get('MAX_CONTENT_LENGTH', 104857600) # 100MB + if file_obj.content_length > max_size: + return False, f"{file_type}文件过大 ({file_obj.content_length} bytes),最大限制: {max_size} bytes" + + # 保存文件 + file_obj.seek(0) + file_obj.save(str(save_path)) + + # 验证文件是否成功保存 + if not save_path.exists(): + return False, f"{file_type}文件保存失败:文件不存在" + + if save_path.stat().st_size == 0: + return False, f"{file_type}文件保存失败:文件为空" + + logger.info(f"Job {task_id}: {file_type} saved successfully - {save_path} ({save_path.stat().st_size} bytes)") + return True, None + + except PermissionError: + return False, f"{file_type}保存失败:没有写入权限 - {save_path.parent}" + except OSError as e: + if e.errno == 28: # No space left on device + return False, f"{file_type}保存失败:磁盘空间不足" + elif e.errno == 36: # File name too long + return False, f"{file_type}文件名过长" + else: + return False, f"{file_type}保存失败:磁盘错误 ({e.errno})" + except Exception as e: + logger.error(f"Job {task_id}: Failed to save {file_type}: {str(e)}", exc_info=True) + return False, f"{file_type}保存失败:{str(e)}" + + +def format_upload_error(error_code, message, details=None, task_id=None): + """统一的错误响应格式""" + response = { + 'error': message, + 'code': error_code + } + + if details: + response['details'] = details + + if task_id: + response['task_id'] = task_id + + # 记录详细错误日志 + log_level = 'warning' if error_code < 500 else 'error' + log_func = getattr(logger, log_level) + log_func(f"Upload error [{error_code}]: {message}" + (f" - Details: {details}" if details else "")) + + return _format_response(error_code, message, response if details else None) + + # Create blueprint upload_bp = Blueprint('upload', __name__, url_prefix='/upload') @@ -25,97 +160,80 @@ upload_bp = Blueprint('upload', __name__, url_prefix='/upload') @log_performance def upload_file(): logger.info("Received upload request") - logger.info(f"Request content length: {request.content_length} bytes") - # Check if data file is present + # 1. 基础验证 if 'file' not in request.files: - logger.warning("Upload failed: No data file part in request") - return _format_response(400, "未找到数据文件部分") + return format_upload_error(400, "未找到数据文件", "请求中缺少 'file' 字段") data_file = request.files['file'] config_file = request.files.get('config') - # Log file details - logger.info(f"Data file: {data_file.filename} (size: {getattr(data_file, 'content_length', 'unknown')} bytes)") - if config_file: - logger.info(f"Config file: {config_file.filename} (size: {getattr(config_file, 'content_length', 'unknown')} bytes)") - else: - logger.info("No custom config file provided, will use default") + # 2. 文件验证 + validation_errors = validate_upload_files(data_file, config_file) + if validation_errors: + # 返回第一个错误 + error = validation_errors[0] + return format_upload_error(400, error['error'], error['details']) - if data_file.filename == '': - logger.warning("Upload failed: No data file selected (empty filename)") - return _format_response(400, "未选择数据文件") - - if not allowed_file(data_file.filename, ALLOWED_DATA_EXTENSIONS): - logger.warning(f"Upload failed: Invalid data file type {data_file.filename} - allowed: {ALLOWED_DATA_EXTENSIONS}") - return _format_response(400, "无效的数据文件类型。只允许 .xlsx 和 .xls 格式。") - - # Generate unique job ID + # 3. 生成任务ID task_id = str(uuid.uuid4()) logger.info(f"Generated job ID: {task_id}") - # 1) Parse config content (parse in memory without saving first) - if config_file and config_file.filename != '': - if not allowed_file(config_file.filename, ALLOWED_CONFIG_EXTENSIONS): - return _format_response(400, "无效的配置文件类型。只允许 .yaml 和 .yml 格式。") - config_file.stream.seek(0) - config_text = config_file.read().decode('utf-8', errors='ignore') - try: - active_config = yaml.safe_load(config_text) - except Exception: - return _format_response(400, "配置文件解析失败") - # Reset stream for saving - config_file.stream = BytesIO(config_text.encode('utf-8')) - else: - default_config_path = Path(__file__).parent.parent / "gasflux_config.yaml" - with open(default_config_path, 'r', encoding='utf-8') as f: - active_config = yaml.safe_load(f) + # 4. 解析配置 + active_config, config_error = parse_config_file(config_file) + if config_error: + return format_upload_error(400, "配置文件错误", config_error, task_id) - # 2) Create job directories based on INI configuration - upload_base = Path(current_app.config['UPLOAD_FOLDER']) - output_base = Path(current_app.config['OUTPUT_FOLDER']) - job_upload_dir = upload_base / task_id - job_output_dir = output_base / task_id - job_upload_dir.mkdir(parents=True, exist_ok=True) - job_output_dir.mkdir(parents=True, exist_ok=True) - logger.info(f"Job {task_id}: Created directories - Upload: {job_upload_dir}, Output: {job_output_dir}") + # 5. 创建目录 + try: + upload_base = Path(current_app.config['UPLOAD_FOLDER']) + output_base = Path(current_app.config['OUTPUT_FOLDER']) + job_upload_dir = upload_base / task_id + job_output_dir = output_base / task_id - # 3) Save data file to job_upload_dir + job_upload_dir.mkdir(parents=True, exist_ok=True) + job_output_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Job {task_id}: Directories created successfully") + + except Exception as e: + return format_upload_error(500, "目录创建失败", str(e), task_id) + + # 6. 保存数据文件 data_filename = secure_filename(data_file.filename) data_path = job_upload_dir / data_filename - try: - data_file.seek(0) - data_file.save(str(data_path)) - logger.info(f"Job {task_id}: Data file saved successfully - Path: {data_path}") - except Exception as e: - logger.error(f"Job {task_id}: Failed to save data file {data_filename}: {str(e)}") - return _format_response(500, "保存数据文件失败") - # 4) Save config file to job_upload_dir + success, error_msg = save_upload_file(data_file, data_path, task_id, "数据文件") + if not success: + return format_upload_error(500, error_msg, task_id=task_id) + + # 7. 保存配置文件 if config_file and config_file.filename != '': config_filename = secure_filename(config_file.filename) config_path = job_upload_dir / config_filename - try: - config_file.seek(0) - config_file.save(str(config_path)) - active_config_path = config_path - logger.info(f"Job {task_id}: Custom config saved successfully - Path: {config_path}") - except Exception as e: - logger.error(f"Job {task_id}: Failed to save config file {config_filename}: {str(e)}") - return _format_response(500, "保存配置文件失败") - else: - # Copy default config for record keeping - config_path = job_upload_dir / "config.yaml" - with open(config_path, 'w', encoding='utf-8') as f: - yaml.safe_dump(active_config, f, allow_unicode=True) active_config_path = config_path - logger.info(f"Job {task_id}: Default config saved for record - Path: {config_path}") - # Initialize task status - update_task_status(task_id, TASK_STATUS_PENDING, "任务已加入处理队列", output_dir=str(job_output_dir)) - logger.info(f"Job {task_id}: Task status initialized as PENDING") + success, error_msg = save_upload_file(config_file, config_path, task_id, "配置文件") + if not success: + return format_upload_error(500, error_msg, task_id=task_id) + else: + # 保存默认配置 + config_path = job_upload_dir / "config.yaml" + try: + with open(config_path, 'w', encoding='utf-8') as f: + yaml.safe_dump(active_config, f, allow_unicode=True) + active_config_path = config_path + logger.info(f"Job {task_id}: Default config saved") + except Exception as e: + return format_upload_error(500, f"保存默认配置失败: {str(e)}", task_id=task_id) - # Start background processing + # 8. 初始化任务状态 + try: + update_task_status(task_id, TASK_STATUS_PENDING, "任务已加入处理队列", output_dir=str(job_output_dir)) + except Exception as e: + logger.error(f"Job {task_id}: Failed to update task status: {str(e)}") + # 继续处理,不返回错误 + + # 9. 启动后台处理 try: thread = threading.Thread( target=process_data_async, @@ -123,15 +241,16 @@ def upload_file(): ) thread.daemon = True thread.start() - logger.info(f"Job {task_id}: Background processing thread started successfully") + logger.info(f"Job {task_id}: Background processing started") except Exception as e: - logger.error(f"Job {task_id}: Failed to start background processing thread: {str(e)}") + logger.error(f"Job {task_id}: Failed to start processing thread: {str(e)}", exc_info=True) update_task_status(task_id, TASK_STATUS_FAILED, error=str(e)) - return _format_response(500, "启动处理失败") + return format_upload_error(500, "启动处理线程失败", str(e), task_id) - logger.info(f"Job {task_id}: Upload process completed successfully, returning job ID to client") + # 10. 返回成功响应 return _format_response(202, "任务已接受并加入处理队列", { "status": "accepted", "task_id": task_id, - "task_status_url": f"/task/{task_id}" + "task_status_url": f"/task/{task_id}", + "message": f"数据文件 '{data_filename}' 已上传,开始处理" }) \ No newline at end of file diff --git a/src/gasflux/config_reader.py b/src/gasflux/config_reader.py index 086352a..7142666 100644 --- a/src/gasflux/config_reader.py +++ b/src/gasflux/config_reader.py @@ -56,6 +56,8 @@ class ConfigReader: self.config.add_section('cleanup') self.config.set('cleanup', 'task_cleanup_interval', os.getenv('GASFLUX_TASK_CLEANUP_INTERVAL', '3600')) self.config.set('cleanup', 'max_task_age', os.getenv('GASFLUX_MAX_TASK_AGE', '86400')) + self.config.set('cleanup', 'successful_task_cleanup_age', os.getenv('GASFLUX_SUCCESSFUL_TASK_CLEANUP_AGE', '3600')) + self.config.set('cleanup', 'failed_task_cleanup_age', os.getenv('GASFLUX_FAILED_TASK_CLEANUP_AGE', '604800')) self.config.set('cleanup', 'janitor_dry_run', os.getenv('JANITOR_DRY_RUN', 'false')) # Performance defaults @@ -192,6 +194,14 @@ class ConfigReader: def max_task_age(self) -> int: return self.getint('cleanup', 'max_task_age', 86400) + @property + def successful_task_cleanup_age(self) -> int: + return self.getint('cleanup', 'successful_task_cleanup_age', 3600) + + @property + def failed_task_cleanup_age(self) -> int: + return self.getint('cleanup', 'failed_task_cleanup_age', 604800) + @property def janitor_dry_run(self) -> bool: return self.getboolean('cleanup', 'janitor_dry_run', False) diff --git a/src/gasflux/data_processor.py b/src/gasflux/data_processor.py index 4b46387..b58ccc1 100644 --- a/src/gasflux/data_processor.py +++ b/src/gasflux/data_processor.py @@ -85,7 +85,7 @@ try: from .qiya import get_pressure_at_location except ImportError as e: - print(f"❌ 导入qiya模块失败: {e}") + print(f"导入qiya模块失败: {e}") print("请确保GasFlux包结构完整") sys.exit(1) @@ -101,17 +101,18 @@ def load_excel_data(file_path): pd.DataFrame: 读取的数据 """ try: - print(f"正在读取文件: {file_path}") + print(f"正在读取Excel文件: {Path(file_path).name}") # 读取Excel文件 df = pd.read_excel(file_path) - print(f"SUCCESS: Successfully loaded data: {len(df)} rows, {len(df.columns)} columns") - print(f"列名:{list(df.columns)}") + print(f" 数据读取成功: {len(df):,} 行 × {len(df.columns)} 列") + print(f" 检测到的列: {', '.join(df.columns[:5])}{'...' if len(df.columns) > 5 else ''}") return df except Exception as e: - print(f"ERROR: Failed to read file: {e}") + print(f" 文件读取失败: {Path(file_path).name}") + print(f" 错误详情: {e}") sys.exit(1) @@ -126,7 +127,7 @@ def remove_columns(df, columns_to_remove): Returns: pd.DataFrame: 删除列后的DataFrame """ - print(f"\n删除列: {columns_to_remove}") + print(f"\n 删除不需要的列: {columns_to_remove}") # 检查要删除的列是否存在 existing_columns = [col for col in columns_to_remove if col in df.columns] @@ -174,7 +175,7 @@ def fix_time_column(df, filename): Returns: pd.DataFrame: 修正后的DataFrame """ - print("修正时间格式...") + print(" 根据文件名修正时间格式...") # 提取小时信息 hour_prefix = extract_hour_from_filename(filename) @@ -230,23 +231,23 @@ def convert_coordinates(df): Returns: pd.DataFrame: 转换后的DataFrame """ - print("转换经纬度坐标...") + print(" 转换经纬度坐标(GPS原始数据除以10^7)...") if 'stGPSPositionX' in df.columns: original_lon = df['stGPSPositionX'].head(3).tolist() df['stGPSPositionX'] = df['stGPSPositionX'] / 1e7 converted_lon = df['stGPSPositionX'].head(3).tolist() - print("经度转换示例:") - for orig, conv in zip(original_lon, converted_lon): - print(".6f") + print(" 经度转换示例:") + for i, (orig, conv) in enumerate(zip(original_lon, converted_lon)): + print(f" 第{i+1}行: {orig:.6f} → {conv:.6f}") if 'stGPSPositionY' in df.columns: original_lat = df['stGPSPositionY'].head(3).tolist() df['stGPSPositionY'] = df['stGPSPositionY'] / 1e7 converted_lat = df['stGPSPositionY'].head(3).tolist() - print("纬度转换示例:") - for orig, conv in zip(original_lat, converted_lat): - print(".6f") + print(" 纬度转换示例:") + for i, (orig, conv) in enumerate(zip(original_lat, converted_lat)): + print(f" 第{i+1}行: {orig:.6f} → {conv:.6f}") return df @@ -264,7 +265,7 @@ def calculate_pressure(df, max_samples=None, height_tolerance=10.0, height_bin_s Returns: pd.DataFrame: 添加气压列的DataFrame """ - print("计算气压数据...") + print(" 计算气压数据(使用qiya.py大气模型)...") # 检查必要列是否存在(支持原始列名和新列名) # 原始数据中的列名 @@ -554,11 +555,11 @@ def adjust_altitude(df): Returns: pd.DataFrame: 添加调整后高度字段的DataFrame """ - print("创建调整后的高度字段...") + print("📏 创建调整后的高度字段...") if 'fAltitudeFused' in df.columns: min_altitude = df['fAltitudeFused'].min() - print(".2f") + print(f" 最小高度值: {min_altitude:.2f} 米") # 创建新的调整后高度字段,而不是修改原始字段 df['height_ato'] = df['fAltitudeFused'] - min_altitude @@ -566,9 +567,11 @@ def adjust_altitude(df): original_alt = df['fAltitudeFused'].head(3).tolist() adjusted_alt = df['height_ato'].head(3).tolist() - print("高度调整示例:") - for orig, adj in zip(original_alt, adjusted_alt): - print(".2f") + print(" 高度调整示例:") + for i, (orig, adj) in enumerate(zip(original_alt, adjusted_alt)): + print(f" 第{i+1}行: {orig:.2f}m → {adj:.2f}m") + else: + print(" 未找到 'fAltitudeFused' 列,跳过高度调整") return df @@ -583,7 +586,7 @@ def merge_timestamp(df): Returns: pd.DataFrame: 融合后的DataFrame """ - print("融合日期和时间(修正时间格式)...") + print(" 融合日期和时间为时间戳...") if 'qStrDate' in df.columns and 'qStrTime' in df.columns: timestamps = [] @@ -639,7 +642,7 @@ def rename_columns(df): Returns: pd.DataFrame: 重命名后的DataFrame """ - print("重命名字段...") + print(" 重命名字段为GasFlux标准格式...") # 定义字段映射 column_mapping = { @@ -681,7 +684,7 @@ def ensure_float64_types(df, config=None): Returns: pd.DataFrame: 数据类型转换后的DataFrame """ - print("确保数值字段类型为float64...") + print(" 确保数值字段类型为float64(GasFlux要求)...") # 定义基础需要转换为float64的字段 float64_columns = [ @@ -726,7 +729,7 @@ def process_excel_file(file_path, config_path=None): file_path: Excel文件路径 config_path: 配置文件路径,用于读取gases配置 """ - print(f"=== 开始处理文件: {file_path} ===\n") + print(f" === 开始处理文件: {Path(file_path).name} ===\n") # 1. 读取数据 df = load_excel_data(file_path) @@ -767,10 +770,10 @@ def process_excel_file(file_path, config_path=None): output_path = Path(file_path).with_suffix('.processed.csv') df.to_csv(output_path, index=False, encoding='utf-8-sig') - print(f"\n处理完成!") - print(f"输出文件: {output_path}") - print(f"最终数据形状: {df.shape[0]} 行 × {df.shape[1]} 列") - print(f"最终列名: {list(df.columns)}") + print(f"\n 处理完成!") + print(f" 输出文件: {output_path}") + print(f" 最终数据: {df.shape[0]:,} 行 × {df.shape[1]} 列") + print(f" 最终字段: {', '.join(df.columns)}") return df diff --git a/src/gasflux/janitor.py b/src/gasflux/janitor.py index 972150b..501cbe3 100644 --- a/src/gasflux/janitor.py +++ b/src/gasflux/janitor.py @@ -21,8 +21,9 @@ def start_janitor(app): except Exception as e: app.logger.error(f"Janitor cleanup error: {str(e)}", exc_info=True) finally: - # Sleep for 30 seconds before next cleanup cycle - time.sleep(30) + # Sleep for configured interval before next cleanup cycle + cleanup_interval = int(current_app.config.get('TASK_CLEANUP_INTERVAL', 30)) + time.sleep(cleanup_interval) # Create daemon thread so it doesn't prevent app shutdown thread = threading.Thread(target=worker, daemon=True, name="janitor") @@ -39,6 +40,10 @@ def cleanup_expired_tasks(): else: dry_run = bool(dry_run_config) + # Get cleanup intervals from config (in seconds) + failed_task_cleanup_age = current_app.config.get('FAILED_TASK_CLEANUP_AGE', 86400) # Default 24 hours for failed tasks + current_app.logger.info(f"Janitor: FAILED_TASK_CLEANUP_AGE = {failed_task_cleanup_age} seconds") + try: conn = sqlite3.connect(str(db_path), check_same_thread=False) conn.row_factory = sqlite3.Row @@ -48,6 +53,10 @@ def cleanup_expired_tasks(): current_time = conn.execute("SELECT datetime('now', '+8 hours')").fetchone()[0] current_app.logger.info(f"Janitor cleanup check at: {current_time}") + # Bases used by both sections + upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '') + output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '') + # Debug: Check for tasks with delete_after_at set debug_rows = conn.execute(""" SELECT task_id, delete_after_at, downloaded_at @@ -60,7 +69,7 @@ def cleanup_expired_tasks(): for row in debug_rows: current_app.logger.info(f" Task {row['task_id']}: delete_at={row['delete_after_at']}, downloaded_at={row['downloaded_at']}") - # Find tasks that need to be deleted + # 1) Scheduled deletions (delete_after_at) rows = conn.execute(""" SELECT task_id, output_dir FROM tasks @@ -69,63 +78,133 @@ def cleanup_expired_tasks(): AND delete_after_at <= datetime('now', '+8 hours') """).fetchall() - if not rows: - return # No tasks to clean up + if rows: + current_app.logger.info(f"Janitor found {len(rows)} expired tasks to clean up") - current_app.logger.info(f"Janitor found {len(rows)} expired tasks to clean up") + for row in rows: + task_id = row['task_id'] + output_dir = row['output_dir'] - upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '') - output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '') + try: + delete_targets = [] - for row in rows: - task_id = row['task_id'] - output_dir = row['output_dir'] + # 记录在库中的 output_dir + if output_dir: + p = Path(output_dir) + delete_targets.append(p) - try: - delete_targets = [] + # 兜底:按约定 outputs/ + derived_output_dir = output_base / task_id if output_base else None + if derived_output_dir and derived_output_dir not in delete_targets: + delete_targets.append(derived_output_dir) - # 记录在库中的 output_dir - if output_dir: - p = Path(output_dir) - delete_targets.append(p) + # 同时删除 uploads/ + derived_upload_dir = upload_base / task_id if upload_base else None + if derived_upload_dir: + delete_targets.append(derived_upload_dir) - # 兜底:按约定 outputs/ - derived_output_dir = output_base / task_id if output_base else None - if derived_output_dir and derived_output_dir not in delete_targets: - delete_targets.append(derived_output_dir) + if dry_run: + for tgt in delete_targets: + if tgt: + current_app.logger.info(f"[DRY RUN] Would delete task {task_id} path: {tgt}") + else: + # 实际删除 + for tgt in delete_targets: + try: + if tgt and tgt.exists(): + shutil.rmtree(tgt, ignore_errors=True) + current_app.logger.info(f"Deleted path for task {task_id}: {tgt}") + else: + current_app.logger.warning(f"Path not found for task {task_id}: {tgt}") + except Exception as e: + current_app.logger.error(f"Failed to delete path {tgt} for task {task_id}: {e}") - # 同时删除 uploads/ - derived_upload_dir = upload_base / task_id if upload_base else None - if derived_upload_dir: - delete_targets.append(derived_upload_dir) + # Hard delete from database + conn.execute( + "DELETE FROM tasks WHERE task_id = ?", + (task_id,) + ) + conn.commit() + current_app.logger.info(f"Hard deleted task {task_id} from database") - if dry_run: - for tgt in delete_targets: - if tgt: - current_app.logger.info(f"[DRY RUN] Would delete task {task_id} path: {tgt}") - else: - # 实际删除 - for tgt in delete_targets: - try: - if tgt and tgt.exists(): - shutil.rmtree(tgt, ignore_errors=True) - current_app.logger.info(f"Deleted path for task {task_id}: {tgt}") - else: - current_app.logger.warning(f"Path not found for task {task_id}: {tgt}") - except Exception as e: - current_app.logger.error(f"Failed to delete path {tgt} for task {task_id}: {e}") + except Exception as e: + current_app.logger.error(f"Failed to delete task {task_id}: {str(e)}", exc_info=True) + # Continue with other tasks even if one fails - # Hard delete from database - conn.execute( - "DELETE FROM tasks WHERE task_id = ?", - (task_id,) - ) - conn.commit() - current_app.logger.info(f"Hard deleted task {task_id} from database") + # Debug: Check all failed tasks + debug_failed = conn.execute(""" + SELECT task_id, status, created_at, started_at, finished_at, deleted_at + FROM tasks + WHERE status = 'failed' AND deleted_at IS NULL + """).fetchall() + if debug_failed: + current_app.logger.info(f"Janitor found {len(debug_failed)} total failed tasks:") + for row in debug_failed: + current_app.logger.info(f" Task {row['task_id']}: created={row['created_at']}, started={row['started_at']}, finished={row['finished_at']}") - except Exception as e: - current_app.logger.error(f"Failed to delete task {task_id}: {str(e)}", exc_info=True) - # Continue with other tasks even if one fails + # Clean up failed tasks that are older than configured age + failed_rows = conn.execute(""" + SELECT task_id, output_dir + FROM tasks + WHERE status = 'failed' + AND deleted_at IS NULL + AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds') + """, (failed_task_cleanup_age,)).fetchall() + + if failed_rows: + current_app.logger.info(f"Janitor found {len(failed_rows)} failed tasks older than {failed_task_cleanup_age} seconds") + for row in failed_rows: + current_app.logger.info(f" Failed task: {row['task_id']}") + + for row in failed_rows: + task_id = row['task_id'] + output_dir = row['output_dir'] + + try: + delete_targets = [] + + # 记录在库中的 output_dir + if output_dir: + p = Path(output_dir) + delete_targets.append(p) + + # 兜底:按约定 outputs/ + derived_output_dir = output_base / task_id if output_base else None + if derived_output_dir and derived_output_dir not in delete_targets: + delete_targets.append(derived_output_dir) + + # 同时删除 uploads/ + derived_upload_dir = upload_base / task_id if upload_base else None + if derived_upload_dir: + delete_targets.append(derived_upload_dir) + + if dry_run: + for tgt in delete_targets: + if tgt: + current_app.logger.info(f"[DRY RUN] Would delete failed task {task_id} path: {tgt}") + else: + # 实际删除 + for tgt in delete_targets: + try: + if tgt and tgt.exists(): + shutil.rmtree(tgt, ignore_errors=True) + current_app.logger.info(f"Deleted failed task {task_id} path: {tgt}") + else: + current_app.logger.warning(f"Path not found for failed task {task_id}: {tgt}") + except Exception as e: + current_app.logger.error(f"Failed to delete path {tgt} for failed task {task_id}: {e}") + + # Hard delete from database + conn.execute( + "DELETE FROM tasks WHERE task_id = ?", + (task_id,) + ) + conn.commit() + current_app.logger.info(f"Auto-deleted failed task {task_id} from database (older than {failed_task_cleanup_age} seconds)") + + except Exception as e: + current_app.logger.error(f"Failed to delete failed task {task_id}: {str(e)}", exc_info=True) + # Continue with other tasks even if one fails except Exception as e: current_app.logger.error(f"Database error during cleanup: {str(e)}", exc_info=True) @@ -138,24 +217,99 @@ def reconcile_tasks_on_startup(): """Reconcile task states on application startup.""" db_path = _get_db_path() + # Get cleanup ages from config (in seconds) + successful_task_cleanup_age = current_app.config.get('SUCCESSFUL_TASK_CLEANUP_AGE', 3600) + failed_task_cleanup_age = current_app.config.get('FAILED_TASK_CLEANUP_AGE', 86400) + try: conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys=ON") # Fix tasks that were downloaded but don't have delete_after_at set # This handles cases where the app crashed after marking downloaded but before setting delete time conn.execute(""" UPDATE tasks - SET delete_after_at = COALESCE(delete_after_at, datetime(downloaded_at, '+10 minutes')) + SET delete_after_at = COALESCE(delete_after_at, datetime(downloaded_at, '+' || ? || ' seconds')) WHERE downloaded_at IS NOT NULL AND deleted_at IS NULL AND delete_after_at IS NULL - """) + """, (successful_task_cleanup_age,)) updated_count = conn.total_changes if updated_count > 0: current_app.logger.info(f"Startup reconciliation: Fixed delete_after_at for {updated_count} downloaded tasks") + # Clean up failed tasks that are older than configured age + failed_rows = conn.execute(""" + SELECT task_id, output_dir + FROM tasks + WHERE status = 'failed' + AND deleted_at IS NULL + AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds') + """, (failed_task_cleanup_age,)).fetchall() + + if failed_rows: + current_app.logger.info(f"Startup reconciliation: Found {len(failed_rows)} failed tasks older than {failed_task_cleanup_age} seconds") + + upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '') + output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '') + dry_run_config = current_app.config.get('JANITOR_DRY_RUN', 'false') + + if isinstance(dry_run_config, str): + dry_run = dry_run_config.lower() == 'true' + else: + dry_run = bool(dry_run_config) + + for row in failed_rows: + task_id = row['task_id'] + output_dir = row['output_dir'] + + try: + delete_targets = [] + + # 记录在库中的 output_dir + if output_dir: + p = Path(output_dir) + delete_targets.append(p) + + # 兜底:按约定 outputs/ + derived_output_dir = output_base / task_id if output_base else None + if derived_output_dir and derived_output_dir not in delete_targets: + delete_targets.append(derived_output_dir) + + # 同时删除 uploads/ + derived_upload_dir = upload_base / task_id if upload_base else None + if derived_upload_dir: + delete_targets.append(derived_upload_dir) + + if dry_run: + for tgt in delete_targets: + if tgt: + current_app.logger.info(f"[DRY RUN] Would delete failed task {task_id} path: {tgt}") + else: + # 实际删除 + for tgt in delete_targets: + try: + if tgt and tgt.exists(): + import shutil + shutil.rmtree(tgt, ignore_errors=True) + current_app.logger.info(f"Deleted failed task {task_id} path: {tgt}") + else: + current_app.logger.warning(f"Path not found for failed task {task_id}: {tgt}") + except Exception as e: + current_app.logger.error(f"Failed to delete path {tgt} for failed task {task_id}: {e}") + + # Hard delete from database + conn.execute( + "DELETE FROM tasks WHERE task_id = ?", + (task_id,) + ) + current_app.logger.info(f"Startup reconciliation: Auto-deleted failed task {task_id} from database (older than {failed_task_cleanup_age} seconds)") + + except Exception as e: + current_app.logger.error(f"Failed to delete failed task {task_id}: {str(e)}", exc_info=True) + # Check for tasks with output directories that no longer exist # This helps clean up database entries for manually deleted directories rows = conn.execute(""" diff --git a/web_api_data/outputs/gasflux.db b/web_api_data/outputs/gasflux.db new file mode 100644 index 0000000000000000000000000000000000000000..2a117ebae036bfdc2d51ba2262d5bb94137a1bb8 GIT binary patch literal 49152 zcmeI5&u<&Y6~{^amK=)#ik3~$G#g$dMOtZQe=mVFRV6wWBFnKXIlmMvhP$&8Es9I; zl9C+PFc`Q-;HH3c=&2|I6walGplJgXK~eNC$f3Qo$6gfYT%bUJ0!82Qm$j5+tz-s; z(?^-&aQ5w+`F>{h&F;>~d->90p~3~%^S4+=FAn|r5qsh$B!C2v z0225=AaGkt4yM!T<`(9z5Rd9rr z-0LgCtIG=)W|!X(Udg>N6|n+um1@v1TDrI*EUhjs3iG*hv#X0M!dRJ?oI+`REV|DY z4+6H%>pMlaaNqa*L-K8(vkG@C7)e(aF61t+%wBl4UCOuv+bV@EK0a3RcE+ZJu}MSN z@Tz_=HZc+P54H6>{X{L?La7jJbQBdealD;U(PPe|jX2Qe7IQ1yk5s$ESKTZsENbCZ zE9Gj%atci@g3p6$vC=>uHJv-3n|nnVZ#K!ok}&?T0iZ2>&j#@I<`BkgZ>t>E&~j~FgbqLBP4@tA;^NRiT1+Pk?W)xKtJM%%PY)j`b-zjIzW%+*W}>DI4yMg? z^4*CJm(IRstM4&%gya2&ok|)-?Y9U;M66yrpIgoe9VWuUWubix%`VM1x7T||xCsue zci-$EOixZGZ*SI*=MH8aUk4(`c*n-hqdjcXV~+JuS34rKDjMyA4;yXYfqJ-Uw~py^ z7ngGjFE53mX}np(31K;R4gfFBnpqZwd_ zHsK>oWRV3Gs~Q{=R|DHGlq&_V(OVz3-b~*>dhXeyTCc_1qODYa>Z_i~`q(&wViXJ;ttS!)#qz&_$Fn!Ne??u};dv`OlIlA)&QXH3ICZYw8XjGJCh?M7 zR|-NMrpbh_b7KR$EgIM{S&-?A^cp;NG3R`=5OI=|@>%tmu{2 z3ze$F$D-O4ElZSGS!&k_#1T`wHPk`C)l^rL6;Y;=A}UnoBFpQVsB%|R4Bcf+g(eSA z(P9#im8@h&2^^j!lkO9UA*Oa~FiJqtxM8R^h2E_K0V%JDrVBt!N8+xkFq7!{lMx_U zMeZPQL{`@c#1T`wH4r7>Dk`@for%L)T?$OA@6gBS5l})Ik8UI=Y1vW_6uF z95HvV^`%yyqY(1Lm|xrZU`IDhn$NqcXhTR86y0NwFg|-rzR*0eznP@tNPpY>KN_-s*M15K#L{(%} zQo)n1C~X6)e$N4H$TT%zY&9gs&}7pv4Mn|qa|-HFV)a^t)wFzG_WX(!VkJ-)iyG9B zwdsw@Rxy;|^CBpyY=FB=90oHne6}w$s(vx_JJZUT=~f9r;hj2WLnVZoMntsjm0W0+ zPQlmNDFq+nucgg8o!CRYABZ%DEJ>;XM;K8Z4MrPV6HOu!(WVT74V-r5b>#$+hGvPD zg|4~poU4(g9+K7NNb|FFJBC0M`CrD64y|TZjRabn4NpYBhCmx5WMZU|OpG+mP~D1L zUq__Trd!@yAr%5mB%C(y=58}TI1p2>243k@gGd~wk101!5iQLGhGH|Kx_O-nH+I6y z#RI~3i^1-xgva#znA&5O(v&YNs-kK)-yR9C*rSqxVKX8~rSGrup;ta1*|e z01`j~NB{{S0VIF~kN^@u0!RP}oPfZ1?`ks6wChOfPU7=8vy;PMc;AUJi8lm>H{#57 z4T9lvoGGCJFr0`pbJ8EGj&t`rTvh*$BPn?%QBBlc7pB&9(-j$mNj6<`WyXl?N~Rl5 z+h58{vO`>^h?2`;7LvG*XmTcr%yt!1W*jEu%&_D*_mBI-`Tu05jkN^@u z0!RP}AOR$R1dsp{KmthMPy)4=k|TEQ?1iC}EPrdOYWl%yVPPS7b!S6;{rtJ>iyQO# z*UlG|(q*xr&dlXFb0eMT$|)|!{M%MNVaOg>UyGc*MY^O6xQLw%>$XZrb}GI&cCpKnG?xU3acmg zXTicRGb`z^IzF8LPp1BofPeTx0!RP}AOR$R1dsp{Kmter2_OL^fCQct0@J;Kf3PfCrko=5-*AOR$R1dsp{Kmter2_OL^fCP}hQ3TSx zax(fq0Ms9J($~KJ|7+@Bse93%qkLgL5(BnKh+okns1l4lyp{$G z`{CZl??Yn$f>(G7mY!AV@mU5uD=U{>vpt8e&0UlnR$*)FEO3j}UD4s&yy%rXzl$YL zbUb^L`&ZO8c&L`e$_1;wqgSnztCe6aj*z!*WT2c(R>(lHnJFQ&=@-_mGV|G1n0q6$ z?G>wA9FkO;Hi$x0Q&l8NR7s~`-Pl_8LR*PUD@4(VOf;RsC)L{qCB6;@qCTw=qAIc~ zso+Uhl(vDzec$uJhD^gl#|+Jo6ho6u!!#82=FKUnM~T&I5mwXkdD-(TR*02AT`X!) zL)NA@DqF=+g3pVfpt1q(GI1Ep#PI2OJEfw>9INUVL%%bvjG1ng5ES02V>VPmsA)t* z+g{0qX6Y1sot;wfG5%WGPAT>n?=7K{q(($Gx~;X%{NRAntASTKRnx|C`j~R#6w%U5 z-~cfrs>{A-^B~|({n$Mqe76|vo=SL3zmKUsW+_eivZ5-gcJuAl|NlSwpKr(@f-;Z* z5x}}ZsXqlPrm&0qpUDi^h)c6O4Z?G(L0Q)o>laCSzW*B7)Q*10SbvL A=l}o! literal 0 HcmV?d00001