失败任务定时删除

This commit is contained in:
2026-02-11 16:28:51 +08:00
parent b9828a1b13
commit af56c94efe
13 changed files with 782 additions and 169 deletions

205
PROJECT_README.md Normal file
View File

@ -0,0 +1,205 @@
# GasFlux 项目说明文档
## 项目概述
GasFlux 是一个基于 Flask 的气体通量分析 Web API 系统,专门用于处理无人机或飞行器采集的气体浓度数据,支持 CO₂、CH₄ 等温室气体通量的计算。
## 核心功能
### 🧪 数据处理
- **数据预处理**: Excel/CSV 文件解析,数据验证和清洗
- **背景校正**: 集成 FastChrom 算法的智能背景扣除
- **空间插值**: 克里金插值算法,支持多种半变异函数模型
- **通量计算**: 基于质量平衡法的气体通量计算
### 🌐 Web API 服务
- **RESTful API**: 完整的 HTTP API 接口
- **异步处理**: 支持大文件后台处理
- **实时监控**: 任务状态实时查询和进度跟踪
- **文件管理**: 自动上传、处理和下载管理
### 🔐 安全与认证
- **API 密钥认证**: 安全的密钥管理系统
- **权限控制**: 支持不同范围的 API 访问权限
- **引导密钥**: 初始管理员密钥配置
### 🧹 自动维护
- **任务清理**: 自动清理过期任务和文件
- **存储管理**: 智能的磁盘空间管理
- **日志记录**: 完整的操作日志和性能监控
## 技术架构
### 后端技术栈
- **框架**: Flask (Python Web 框架)
- **数据库**: SQLite (任务状态和配置存储)
- **部署**: Waitress (生产环境 WSGI 服务器)
- **认证**: 自定义 API 密钥系统
### 数据处理引擎
- **核心算法**: GasFlux 气体通量分析引擎
- **数据格式**: Excel, CSV, YAML 配置
- **输出格式**: CSV 数据, HTML 报告, PNG 图表
### 前端界面
- **Web 界面**: 内置 HTML/CSS/JavaScript 控制台
- **API 客户端**: 支持 cURL, Python requests 等
- **实时更新**: WebSocket 风格的任务状态轮询
## 快速开始
### 1. 环境准备
```bash
# Python 3.8+ 环境
python --version
# 安装依赖
pip install -r requirements.txt
```
### 2. 配置系统
```bash
# 编辑配置文件
notepad gasflux.ini
# 创建初始 API 密钥
python create_api_key.py
```
### 3. 启动服务
```bash
# 启动 Web API 服务
python -m src.gasflux.app
# 服务启动后访问: http://localhost:5001
```
### 4. 使用 API
```bash
# 上传处理文件
curl -X POST \
-H "X-API-Key: your-api-key" \
-F "file=@data.xlsx" \
http://localhost:5001/upload
```
## 配置文件说明
### gasflux.ini (主配置文件)
```ini
[server]
host = 0.0.0.0
port = 5001
debug = false
[paths]
uploads = ./web_api_data/uploads
outputs = ./web_api_data/outputs
[cleanup]
task_cleanup_interval = 30
successful_task_cleanup_age = 60
failed_task_cleanup_age = 60
janitor_dry_run = false
[security]
admin_bootstrap_key = bootstrap_key_2024
```
### gasflux_config.yaml (处理配置)
```yaml
output_dir: ./output
gases:
ch4: [1.5, 10.0]
co2: [300, 500]
strategies:
background: "algorithm"
spatial: "curtain"
interpolation: "kriging"
```
## API 接口列表
### 核心接口
- `POST /upload` - 文件上传和处理
- `GET /task/{task_id}` - 查询任务状态
- `GET /download/{path}` - 文件下载
- `GET /health` - 系统健康检查
### 管理接口
- `POST /api-keys` - 创建 API 密钥
- `GET /api-keys` - 列出 API 密钥
- `DELETE /api-keys/{key_id}` - 撤销 API 密钥
### 监控接口
- `GET /stats` - 系统统计信息
- `GET /config` - 配置信息查询
## 部署选项
### 开发环境
```bash
# 直接运行
python -m src.gasflux.app
```
### 生产环境 (Waitress)
```bash
# 使用 Waitress 服务器
python server_waitress.py
```
### Docker 部署
```bash
# 构建 Docker 镜像
docker build -t gasflux .
# 运行容器
docker run -p 5001:5001 gasflux
```
## 目录结构
```
gasflux-develop/
├── src/gasflux/ # 核心源码
│ ├── app.py # Flask 应用主文件
│ ├── auth.py # 认证模块
│ ├── db.py # 数据库模块
│ ├── janitor.py # 自动清理模块
│ └── blueprints/ # API 蓝图
├── gasflux.ini # 主配置文件
├── create_api_key.py # API 密钥创建工具
├── requirements.txt # Python 依赖
├── web_api_data/ # 数据目录
│ ├── uploads/ # 上传文件
│ └── outputs/ # 输出文件
└── logs/ # 日志文件
```
## 常见问题
### Q: 如何创建 API 密钥?
A: 运行 `python create_api_key.py`,按照提示输入描述信息。
### Q: 如何查看任务处理状态?
A: 使用 `GET /task/{task_id}` 接口,或在 Web 界面中查看。
### Q: 如何修改清理策略?
A: 编辑 `gasflux.ini` 中的 `[cleanup]` 部分配置。
### Q: 系统支持哪些气体?
A: 目前支持 CO₂、CH₄ 等主要温室气体,可在配置中扩展。
## 技术支持
- **项目主页**: [GitHub Repository]
- **文档**: [API_DOCUMENTATION.md](API_DOCUMENTATION.md)
- **配置指南**: [CONFIG_README.md](CONFIG_README.md)
- **部署指南**: [WAITRESS_DEPLOYMENT.md](WAITRESS_DEPLOYMENT.md)
## 版本信息
- **当前版本**: v0.2.1
- **Python 版本**: 3.8+
- **许可证**: AGPL v3.0

97
debug_cleanup.py Normal file
View File

@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Debug script to test failed task cleanup logic
"""
import sqlite3
import os
from pathlib import Path
def test_cleanup_logic():
"""Test the cleanup logic with a simple SQLite database"""
# Create a temporary database for testing
db_path = Path("test_cleanup.db")
if db_path.exists():
db_path.unlink()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
# Create tasks table
conn.execute("""
CREATE TABLE tasks (
task_id TEXT PRIMARY KEY,
status TEXT,
created_at TEXT,
started_at TEXT,
finished_at TEXT,
deleted_at TEXT,
output_dir TEXT
)
""")
# Insert test data - a failed task from 2 minutes ago
conn.execute("""
INSERT INTO tasks (task_id, status, created_at, started_at, finished_at)
VALUES (?, 'failed', datetime('now', '-2 minutes'), datetime('now', '-2 minutes'), datetime('now', '-2 minutes'))
""", ('test_task_123',))
# Insert test data - a failed task from 30 seconds ago (should not be cleaned)
conn.execute("""
INSERT INTO tasks (task_id, status, created_at, started_at, finished_at)
VALUES (?, 'failed', datetime('now', '-30 seconds'), datetime('now', '-30 seconds'), datetime('now', '-30 seconds'))
""", ('test_task_456',))
conn.commit()
# Test the cleanup query with 60 seconds threshold
failed_task_cleanup_age = 60
print("Testing cleanup logic...")
print(f"Cleanup age: {failed_task_cleanup_age} seconds")
# Show all failed tasks
all_failed = conn.execute("""
SELECT task_id, status, created_at, finished_at
FROM tasks
WHERE status = 'failed' AND deleted_at IS NULL
""").fetchall()
print(f"\nTotal failed tasks: {len(all_failed)}")
for row in all_failed:
print(f" Task {row['task_id']}: finished_at={row['finished_at']}")
# Test cleanup query
cleanup_rows = conn.execute("""
SELECT task_id, output_dir
FROM tasks
WHERE status = 'failed'
AND deleted_at IS NULL
AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '-' || ? || ' seconds')
""", (failed_task_cleanup_age,)).fetchall()
print(f"\nTasks to be cleaned up: {len(cleanup_rows)}")
for row in cleanup_rows:
print(f" Task {row['task_id']} will be cleaned up")
# Test with timezone adjustment (+8 hours)
print("
Testing with timezone adjustment (+8 hours):")
cleanup_rows_tz = conn.execute("""
SELECT task_id, output_dir
FROM tasks
WHERE status = 'failed'
AND deleted_at IS NULL
AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds')
""", (failed_task_cleanup_age,)).fetchall()
print(f"Tasks to be cleaned up (with TZ): {len(cleanup_rows_tz)}")
for row in cleanup_rows_tz:
print(f" Task {row['task_id']} will be cleaned up")
conn.close()
db_path.unlink()
if __name__ == "__main__":
test_cleanup_logic()

View File

@ -1,9 +1,9 @@
[server]
# Server configuration
host = 0.0.0.0
port = 5000
port = 5001
debug = false
base_url = http://localhost:5000
base_url = http://localhost:5001
[paths]
# Directory paths (relative to project root or absolute)
@ -25,8 +25,14 @@ admin_bootstrap_key = bootstrap_key_2024
[cleanup]
# Task cleanup settings
task_cleanup_interval = 3600 # 1 hour in seconds
max_task_age = 86400 # 24 hours in seconds
# 30 seconds
task_cleanup_interval = 30
# 24 hours in seconds
max_task_age = 86400
# 1 minute in seconds for successful tasks
successful_task_cleanup_age = 60
# 1 minute in seconds for failed tasks
failed_task_cleanup_age = 60
janitor_dry_run = false
[performance]

View File

@ -28,9 +28,8 @@ admin_bootstrap_key =
task_cleanup_interval = 3600 # 1 hour in seconds
max_task_age = 86400 # 24 hours in seconds
janitor_dry_run = false
[performance]
# Performance tuning
threads = 8
connection_limit = 100
channel_timeout = 300
channel_timeout = 300

View File

@ -56,6 +56,8 @@ class Config:
# Task management
TASK_CLEANUP_INTERVAL = config_reader.task_cleanup_interval
MAX_TASK_AGE = config_reader.max_task_age
SUCCESSFUL_TASK_CLEANUP_AGE = config_reader.successful_task_cleanup_age
FAILED_TASK_CLEANUP_AGE = config_reader.failed_task_cleanup_age
# Performance tuning
THREADS = config_reader.threads
@ -151,6 +153,8 @@ class Config:
'cors_origins': cls.CORS_ORIGINS,
'task_cleanup_interval': cls.TASK_CLEANUP_INTERVAL,
'max_task_age': cls.MAX_TASK_AGE,
'successful_task_cleanup_age': cls.SUCCESSFUL_TASK_CLEANUP_AGE,
'failed_task_cleanup_age': cls.FAILED_TASK_CLEANUP_AGE,
'threads': cls.THREADS,
'connection_limit': cls.CONNECTION_LIMIT,
'channel_timeout': cls.CHANNEL_TIMEOUT
@ -672,6 +676,15 @@ app.config['JANITOR_DRY_RUN'] = str(Config.JANITOR_DRY_RUN).lower()
if Config.ADMIN_BOOTSTRAP_KEY:
app.config['ADMIN_BOOTSTRAP_KEY'] = Config.ADMIN_BOOTSTRAP_KEY
# Task cleanup configuration
app.config['SUCCESSFUL_TASK_CLEANUP_AGE'] = Config.SUCCESSFUL_TASK_CLEANUP_AGE
app.config['FAILED_TASK_CLEANUP_AGE'] = Config.FAILED_TASK_CLEANUP_AGE
app.config['TASK_CLEANUP_INTERVAL'] = Config.TASK_CLEANUP_INTERVAL
# Debug logging for cleanup configuration
logger.info(f"App config: FAILED_TASK_CLEANUP_AGE = {app.config['FAILED_TASK_CLEANUP_AGE']}")
logger.info(f"App config: TASK_CLEANUP_INTERVAL = {app.config['TASK_CLEANUP_INTERVAL']}")
# Log current configuration
logger.info(f"Upload folder: {Config.UPLOAD_FOLDER}")
logger.info(f"Output folder: {Config.OUTPUT_FOLDER}")
@ -736,10 +749,10 @@ app.register_blueprint(download_bp)
app.register_blueprint(web_bp)
app.register_blueprint(api_keys_bp)
# Task status persistence now uses SQLite only
# Task status persistence now uses SQLite only
# JSON persistence has been disabled - functions removed from shared.py
# Initialize janitor for background cleanup
# Initialize janitor for background cleanup
try:
from .janitor import start_janitor, reconcile_tasks_on_startup
with app.app_context():

View File

@ -40,6 +40,8 @@ def get_config():
"GASFLUX_CORS_ORIGINS",
"GASFLUX_TASK_CLEANUP_INTERVAL",
"GASFLUX_MAX_TASK_AGE",
"GASFLUX_SUCCESSFUL_TASK_CLEANUP_AGE",
"GASFLUX_FAILED_TASK_CLEANUP_AGE",
"GASFLUX_THREADS",
"GASFLUX_CONNECTION_LIMIT",
"GASFLUX_CHANNEL_TIMEOUT"
@ -52,6 +54,7 @@ def get_config():
"GASFLUX_MAX_CONTENT_LENGTH", "GASFLUX_LOG_LEVEL",
"GASFLUX_LOG_FILE", "GASFLUX_CORS_ORIGINS",
"GASFLUX_TASK_CLEANUP_INTERVAL", "GASFLUX_MAX_TASK_AGE",
"GASFLUX_SUCCESSFUL_TASK_CLEANUP_AGE", "GASFLUX_FAILED_TASK_CLEANUP_AGE",
"GASFLUX_THREADS", "GASFLUX_CONNECTION_LIMIT",
"GASFLUX_CHANNEL_TIMEOUT"
]

View File

@ -19,21 +19,24 @@ def _mark_task_downloaded(task_id):
from ..db import get_db_path as get_config_db_path
db_path = get_config_db_path(current_app)
# Get cleanup age for successful tasks from config (in seconds)
successful_task_cleanup_age = current_app.config.get('SUCCESSFUL_TASK_CLEANUP_AGE', 3600)
try:
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=3000")
# Update downloaded timestamp and set deletion time (10 minutes later)
# Update downloaded timestamp and set deletion time based on config
conn.execute("""
UPDATE tasks
SET downloaded_at = datetime('now', '+8 hours'),
delete_after_at = datetime('now', '+8 hours', '+10 minutes')
delete_after_at = datetime('now', '+8 hours', '+' || ? || ' seconds')
WHERE task_id = ?
""", (task_id,))
""", (successful_task_cleanup_age, task_id))
conn.commit()
logger.info(f"Task {task_id} marked as downloaded, scheduled for deletion in 10 minutes")
logger.info(f"Task {task_id} marked as downloaded, scheduled for deletion in {successful_task_cleanup_age} seconds")
except Exception as e:
logger.error(f"Failed to mark task {task_id} as downloaded: {str(e)}", exc_info=True)

View File

@ -54,10 +54,11 @@ def get_stats():
current_time = time.time()
rows = db.execute("""
SELECT task_id, status, message, updated_at
SELECT task_id, status, message,
COALESCE(finished_at, started_at, created_at) as updated_at
FROM tasks
WHERE deleted_at IS NULL
ORDER BY updated_at DESC
ORDER BY COALESCE(finished_at, started_at, created_at) DESC
LIMIT 20
""").fetchall()

View File

@ -16,6 +16,141 @@ from ..app import process_data_async
from ..shared import _format_response, log_performance, logger, ALLOWED_DATA_EXTENSIONS, ALLOWED_CONFIG_EXTENSIONS, allowed_file, update_task_status, TASK_STATUS_PENDING, TASK_STATUS_FAILED
from ..auth import require_api_key
def get_file_extension(filename):
"""获取文件扩展名,与 allowed_file 函数保持一致"""
if '.' not in filename:
return ''
return '.' + filename.rsplit('.', 1)[1].lower()
def validate_upload_files(data_file, config_file=None):
"""验证上传文件并返回详细的错误信息"""
errors = []
# 检查数据文件
if not data_file or data_file.filename == '':
errors.append({
'field': 'file',
'error': '数据文件不能为空',
'details': '请上传一个有效的Excel文件'
})
elif not allowed_file(data_file.filename, ALLOWED_DATA_EXTENSIONS):
errors.append({
'field': 'file',
'error': f'不支持的文件类型: {get_file_extension(data_file.filename)}',
'details': f'只支持以下格式: {", ".join(ALLOWED_DATA_EXTENSIONS)}'
})
# 检查配置文件(如果提供)
if config_file and config_file.filename != '':
if not allowed_file(config_file.filename, ALLOWED_CONFIG_EXTENSIONS):
errors.append({
'field': 'config',
'error': f'不支持的配置文件类型: {get_file_extension(config_file.filename)}',
'details': f'只支持以下格式: {", ".join(ALLOWED_CONFIG_EXTENSIONS)}'
})
return errors
def parse_config_file(config_file):
"""解析配置文件并提供详细错误信息"""
if not config_file or config_file.filename == '':
# 使用默认配置
try:
default_config_path = Path(__file__).parent.parent / "gasflux_config.yaml"
with open(default_config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f), None
except Exception as e:
return None, f"加载默认配置文件失败: {str(e)}"
try:
config_file.stream.seek(0)
config_text = config_file.read().decode('utf-8', errors='strict')
if not config_text.strip():
return None, "配置文件为空"
config_data = yaml.safe_load(config_text)
# 基本验证
if not isinstance(config_data, dict):
return None, "配置文件格式错误:应为字典格式"
# 重置流用于后续保存
config_file.stream = BytesIO(config_text.encode('utf-8'))
return config_data, None
except UnicodeDecodeError as e:
return None, f"配置文件编码错误请使用UTF-8编码: {str(e)}"
except yaml.YAMLError as e:
return None, f"YAML语法错误: {str(e)}"
except Exception as e:
return None, f"配置文件解析失败: {str(e)}"
def save_upload_file(file_obj, save_path, task_id, file_type="file"):
"""安全保存上传文件并提供详细错误信息"""
try:
# 检查磁盘空间
save_path.parent.mkdir(parents=True, exist_ok=True)
# 检查文件大小如果有content_length
if hasattr(file_obj, 'content_length') and file_obj.content_length:
max_size = current_app.config.get('MAX_CONTENT_LENGTH', 104857600) # 100MB
if file_obj.content_length > max_size:
return False, f"{file_type}文件过大 ({file_obj.content_length} bytes),最大限制: {max_size} bytes"
# 保存文件
file_obj.seek(0)
file_obj.save(str(save_path))
# 验证文件是否成功保存
if not save_path.exists():
return False, f"{file_type}文件保存失败:文件不存在"
if save_path.stat().st_size == 0:
return False, f"{file_type}文件保存失败:文件为空"
logger.info(f"Job {task_id}: {file_type} saved successfully - {save_path} ({save_path.stat().st_size} bytes)")
return True, None
except PermissionError:
return False, f"{file_type}保存失败:没有写入权限 - {save_path.parent}"
except OSError as e:
if e.errno == 28: # No space left on device
return False, f"{file_type}保存失败:磁盘空间不足"
elif e.errno == 36: # File name too long
return False, f"{file_type}文件名过长"
else:
return False, f"{file_type}保存失败:磁盘错误 ({e.errno})"
except Exception as e:
logger.error(f"Job {task_id}: Failed to save {file_type}: {str(e)}", exc_info=True)
return False, f"{file_type}保存失败:{str(e)}"
def format_upload_error(error_code, message, details=None, task_id=None):
"""统一的错误响应格式"""
response = {
'error': message,
'code': error_code
}
if details:
response['details'] = details
if task_id:
response['task_id'] = task_id
# 记录详细错误日志
log_level = 'warning' if error_code < 500 else 'error'
log_func = getattr(logger, log_level)
log_func(f"Upload error [{error_code}]: {message}" + (f" - Details: {details}" if details else ""))
return _format_response(error_code, message, response if details else None)
# Create blueprint
upload_bp = Blueprint('upload', __name__, url_prefix='/upload')
@ -25,97 +160,80 @@ upload_bp = Blueprint('upload', __name__, url_prefix='/upload')
@log_performance
def upload_file():
logger.info("Received upload request")
logger.info(f"Request content length: {request.content_length} bytes")
# Check if data file is present
# 1. 基础验证
if 'file' not in request.files:
logger.warning("Upload failed: No data file part in request")
return _format_response(400, "未找到数据文件部分")
return format_upload_error(400, "未找到数据文件", "请求中缺少 'file' 字段")
data_file = request.files['file']
config_file = request.files.get('config')
# Log file details
logger.info(f"Data file: {data_file.filename} (size: {getattr(data_file, 'content_length', 'unknown')} bytes)")
if config_file:
logger.info(f"Config file: {config_file.filename} (size: {getattr(config_file, 'content_length', 'unknown')} bytes)")
else:
logger.info("No custom config file provided, will use default")
# 2. 文件验证
validation_errors = validate_upload_files(data_file, config_file)
if validation_errors:
# 返回第一个错误
error = validation_errors[0]
return format_upload_error(400, error['error'], error['details'])
if data_file.filename == '':
logger.warning("Upload failed: No data file selected (empty filename)")
return _format_response(400, "未选择数据文件")
if not allowed_file(data_file.filename, ALLOWED_DATA_EXTENSIONS):
logger.warning(f"Upload failed: Invalid data file type {data_file.filename} - allowed: {ALLOWED_DATA_EXTENSIONS}")
return _format_response(400, "无效的数据文件类型。只允许 .xlsx 和 .xls 格式。")
# Generate unique job ID
# 3. 生成任务ID
task_id = str(uuid.uuid4())
logger.info(f"Generated job ID: {task_id}")
# 1) Parse config content (parse in memory without saving first)
if config_file and config_file.filename != '':
if not allowed_file(config_file.filename, ALLOWED_CONFIG_EXTENSIONS):
return _format_response(400, "无效的配置文件类型。只允许 .yaml 和 .yml 格式。")
config_file.stream.seek(0)
config_text = config_file.read().decode('utf-8', errors='ignore')
try:
active_config = yaml.safe_load(config_text)
except Exception:
return _format_response(400, "配置文件解析失败")
# Reset stream for saving
config_file.stream = BytesIO(config_text.encode('utf-8'))
else:
default_config_path = Path(__file__).parent.parent / "gasflux_config.yaml"
with open(default_config_path, 'r', encoding='utf-8') as f:
active_config = yaml.safe_load(f)
# 4. 解析配置
active_config, config_error = parse_config_file(config_file)
if config_error:
return format_upload_error(400, "配置文件错误", config_error, task_id)
# 2) Create job directories based on INI configuration
upload_base = Path(current_app.config['UPLOAD_FOLDER'])
output_base = Path(current_app.config['OUTPUT_FOLDER'])
job_upload_dir = upload_base / task_id
job_output_dir = output_base / task_id
job_upload_dir.mkdir(parents=True, exist_ok=True)
job_output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Job {task_id}: Created directories - Upload: {job_upload_dir}, Output: {job_output_dir}")
# 5. 创建目录
try:
upload_base = Path(current_app.config['UPLOAD_FOLDER'])
output_base = Path(current_app.config['OUTPUT_FOLDER'])
job_upload_dir = upload_base / task_id
job_output_dir = output_base / task_id
# 3) Save data file to job_upload_dir
job_upload_dir.mkdir(parents=True, exist_ok=True)
job_output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Job {task_id}: Directories created successfully")
except Exception as e:
return format_upload_error(500, "目录创建失败", str(e), task_id)
# 6. 保存数据文件
data_filename = secure_filename(data_file.filename)
data_path = job_upload_dir / data_filename
try:
data_file.seek(0)
data_file.save(str(data_path))
logger.info(f"Job {task_id}: Data file saved successfully - Path: {data_path}")
except Exception as e:
logger.error(f"Job {task_id}: Failed to save data file {data_filename}: {str(e)}")
return _format_response(500, "保存数据文件失败")
# 4) Save config file to job_upload_dir
success, error_msg = save_upload_file(data_file, data_path, task_id, "数据文件")
if not success:
return format_upload_error(500, error_msg, task_id=task_id)
# 7. 保存配置文件
if config_file and config_file.filename != '':
config_filename = secure_filename(config_file.filename)
config_path = job_upload_dir / config_filename
try:
config_file.seek(0)
config_file.save(str(config_path))
active_config_path = config_path
logger.info(f"Job {task_id}: Custom config saved successfully - Path: {config_path}")
except Exception as e:
logger.error(f"Job {task_id}: Failed to save config file {config_filename}: {str(e)}")
return _format_response(500, "保存配置文件失败")
else:
# Copy default config for record keeping
config_path = job_upload_dir / "config.yaml"
with open(config_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(active_config, f, allow_unicode=True)
active_config_path = config_path
logger.info(f"Job {task_id}: Default config saved for record - Path: {config_path}")
# Initialize task status
update_task_status(task_id, TASK_STATUS_PENDING, "任务已加入处理队列", output_dir=str(job_output_dir))
logger.info(f"Job {task_id}: Task status initialized as PENDING")
success, error_msg = save_upload_file(config_file, config_path, task_id, "配置文件")
if not success:
return format_upload_error(500, error_msg, task_id=task_id)
else:
# 保存默认配置
config_path = job_upload_dir / "config.yaml"
try:
with open(config_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(active_config, f, allow_unicode=True)
active_config_path = config_path
logger.info(f"Job {task_id}: Default config saved")
except Exception as e:
return format_upload_error(500, f"保存默认配置失败: {str(e)}", task_id=task_id)
# Start background processing
# 8. 初始化任务状态
try:
update_task_status(task_id, TASK_STATUS_PENDING, "任务已加入处理队列", output_dir=str(job_output_dir))
except Exception as e:
logger.error(f"Job {task_id}: Failed to update task status: {str(e)}")
# 继续处理,不返回错误
# 9. 启动后台处理
try:
thread = threading.Thread(
target=process_data_async,
@ -123,15 +241,16 @@ def upload_file():
)
thread.daemon = True
thread.start()
logger.info(f"Job {task_id}: Background processing thread started successfully")
logger.info(f"Job {task_id}: Background processing started")
except Exception as e:
logger.error(f"Job {task_id}: Failed to start background processing thread: {str(e)}")
logger.error(f"Job {task_id}: Failed to start processing thread: {str(e)}", exc_info=True)
update_task_status(task_id, TASK_STATUS_FAILED, error=str(e))
return _format_response(500, "启动处理失败")
return format_upload_error(500, "启动处理线程失败", str(e), task_id)
logger.info(f"Job {task_id}: Upload process completed successfully, returning job ID to client")
# 10. 返回成功响应
return _format_response(202, "任务已接受并加入处理队列", {
"status": "accepted",
"task_id": task_id,
"task_status_url": f"/task/{task_id}"
"task_status_url": f"/task/{task_id}",
"message": f"数据文件 '{data_filename}' 已上传,开始处理"
})

View File

@ -56,6 +56,8 @@ class ConfigReader:
self.config.add_section('cleanup')
self.config.set('cleanup', 'task_cleanup_interval', os.getenv('GASFLUX_TASK_CLEANUP_INTERVAL', '3600'))
self.config.set('cleanup', 'max_task_age', os.getenv('GASFLUX_MAX_TASK_AGE', '86400'))
self.config.set('cleanup', 'successful_task_cleanup_age', os.getenv('GASFLUX_SUCCESSFUL_TASK_CLEANUP_AGE', '3600'))
self.config.set('cleanup', 'failed_task_cleanup_age', os.getenv('GASFLUX_FAILED_TASK_CLEANUP_AGE', '604800'))
self.config.set('cleanup', 'janitor_dry_run', os.getenv('JANITOR_DRY_RUN', 'false'))
# Performance defaults
@ -192,6 +194,14 @@ class ConfigReader:
def max_task_age(self) -> int:
return self.getint('cleanup', 'max_task_age', 86400)
@property
def successful_task_cleanup_age(self) -> int:
return self.getint('cleanup', 'successful_task_cleanup_age', 3600)
@property
def failed_task_cleanup_age(self) -> int:
return self.getint('cleanup', 'failed_task_cleanup_age', 604800)
@property
def janitor_dry_run(self) -> bool:
return self.getboolean('cleanup', 'janitor_dry_run', False)

View File

@ -85,7 +85,7 @@ try:
from .qiya import get_pressure_at_location
except ImportError as e:
print(f"导入qiya模块失败: {e}")
print(f"导入qiya模块失败: {e}")
print("请确保GasFlux包结构完整")
sys.exit(1)
@ -101,17 +101,18 @@ def load_excel_data(file_path):
pd.DataFrame: 读取的数据
"""
try:
print(f"正在读取文件: {file_path}")
print(f"正在读取Excel文件: {Path(file_path).name}")
# 读取Excel文件
df = pd.read_excel(file_path)
print(f"SUCCESS: Successfully loaded data: {len(df)} rows, {len(df.columns)} columns")
print(f"列名:{list(df.columns)}")
print(f" 数据读取成功: {len(df):,}× {len(df.columns)} ")
print(f" 检测到的列: {', '.join(df.columns[:5])}{'...' if len(df.columns) > 5 else ''}")
return df
except Exception as e:
print(f"ERROR: Failed to read file: {e}")
print(f" 文件读取失败: {Path(file_path).name}")
print(f" 错误详情: {e}")
sys.exit(1)
@ -126,7 +127,7 @@ def remove_columns(df, columns_to_remove):
Returns:
pd.DataFrame: 删除列后的DataFrame
"""
print(f"\n删除列: {columns_to_remove}")
print(f"\n 删除不需要的列: {columns_to_remove}")
# 检查要删除的列是否存在
existing_columns = [col for col in columns_to_remove if col in df.columns]
@ -174,7 +175,7 @@ def fix_time_column(df, filename):
Returns:
pd.DataFrame: 修正后的DataFrame
"""
print("修正时间格式...")
print(" 根据文件名修正时间格式...")
# 提取小时信息
hour_prefix = extract_hour_from_filename(filename)
@ -230,23 +231,23 @@ def convert_coordinates(df):
Returns:
pd.DataFrame: 转换后的DataFrame
"""
print("转换经纬度坐标...")
print(" 转换经纬度坐标GPS原始数据除以10^7...")
if 'stGPSPositionX' in df.columns:
original_lon = df['stGPSPositionX'].head(3).tolist()
df['stGPSPositionX'] = df['stGPSPositionX'] / 1e7
converted_lon = df['stGPSPositionX'].head(3).tolist()
print("经度转换示例:")
for orig, conv in zip(original_lon, converted_lon):
print(".6f")
print(" 经度转换示例:")
for i, (orig, conv) in enumerate(zip(original_lon, converted_lon)):
print(f"{i+1}行: {orig:.6f}{conv:.6f}")
if 'stGPSPositionY' in df.columns:
original_lat = df['stGPSPositionY'].head(3).tolist()
df['stGPSPositionY'] = df['stGPSPositionY'] / 1e7
converted_lat = df['stGPSPositionY'].head(3).tolist()
print("纬度转换示例:")
for orig, conv in zip(original_lat, converted_lat):
print(".6f")
print(" 纬度转换示例:")
for i, (orig, conv) in enumerate(zip(original_lat, converted_lat)):
print(f"{i+1}行: {orig:.6f}{conv:.6f}")
return df
@ -264,7 +265,7 @@ def calculate_pressure(df, max_samples=None, height_tolerance=10.0, height_bin_s
Returns:
pd.DataFrame: 添加气压列的DataFrame
"""
print("计算气压数据...")
print(" 计算气压数据使用qiya.py大气模型...")
# 检查必要列是否存在(支持原始列名和新列名)
# 原始数据中的列名
@ -554,11 +555,11 @@ def adjust_altitude(df):
Returns:
pd.DataFrame: 添加调整后高度字段的DataFrame
"""
print("创建调整后的高度字段...")
print("📏 创建调整后的高度字段...")
if 'fAltitudeFused' in df.columns:
min_altitude = df['fAltitudeFused'].min()
print(".2f")
print(f" 最小高度值: {min_altitude:.2f}")
# 创建新的调整后高度字段,而不是修改原始字段
df['height_ato'] = df['fAltitudeFused'] - min_altitude
@ -566,9 +567,11 @@ def adjust_altitude(df):
original_alt = df['fAltitudeFused'].head(3).tolist()
adjusted_alt = df['height_ato'].head(3).tolist()
print("高度调整示例:")
for orig, adj in zip(original_alt, adjusted_alt):
print(".2f")
print(" 高度调整示例:")
for i, (orig, adj) in enumerate(zip(original_alt, adjusted_alt)):
print(f"{i+1}行: {orig:.2f}m → {adj:.2f}m")
else:
print(" 未找到 'fAltitudeFused' 列,跳过高度调整")
return df
@ -583,7 +586,7 @@ def merge_timestamp(df):
Returns:
pd.DataFrame: 融合后的DataFrame
"""
print("融合日期和时间(修正时间格式)...")
print(" 融合日期和时间为时间戳...")
if 'qStrDate' in df.columns and 'qStrTime' in df.columns:
timestamps = []
@ -639,7 +642,7 @@ def rename_columns(df):
Returns:
pd.DataFrame: 重命名后的DataFrame
"""
print("重命名字段...")
print(" 重命名字段为GasFlux标准格式...")
# 定义字段映射
column_mapping = {
@ -681,7 +684,7 @@ def ensure_float64_types(df, config=None):
Returns:
pd.DataFrame: 数据类型转换后的DataFrame
"""
print("确保数值字段类型为float64...")
print(" 确保数值字段类型为float64GasFlux要求...")
# 定义基础需要转换为float64的字段
float64_columns = [
@ -726,7 +729,7 @@ def process_excel_file(file_path, config_path=None):
file_path: Excel文件路径
config_path: 配置文件路径用于读取gases配置
"""
print(f"=== 开始处理文件: {file_path} ===\n")
print(f" === 开始处理文件: {Path(file_path).name} ===\n")
# 1. 读取数据
df = load_excel_data(file_path)
@ -767,10 +770,10 @@ def process_excel_file(file_path, config_path=None):
output_path = Path(file_path).with_suffix('.processed.csv')
df.to_csv(output_path, index=False, encoding='utf-8-sig')
print(f"\n处理完成!")
print(f"输出文件: {output_path}")
print(f"最终数据形状: {df.shape[0]}× {df.shape[1]}")
print(f"最终列名: {list(df.columns)}")
print(f"\n 处理完成!")
print(f" 输出文件: {output_path}")
print(f" 最终数据: {df.shape[0]:,}× {df.shape[1]}")
print(f" 最终字段: {', '.join(df.columns)}")
return df

View File

@ -21,8 +21,9 @@ def start_janitor(app):
except Exception as e:
app.logger.error(f"Janitor cleanup error: {str(e)}", exc_info=True)
finally:
# Sleep for 30 seconds before next cleanup cycle
time.sleep(30)
# Sleep for configured interval before next cleanup cycle
cleanup_interval = int(current_app.config.get('TASK_CLEANUP_INTERVAL', 30))
time.sleep(cleanup_interval)
# Create daemon thread so it doesn't prevent app shutdown
thread = threading.Thread(target=worker, daemon=True, name="janitor")
@ -39,6 +40,10 @@ def cleanup_expired_tasks():
else:
dry_run = bool(dry_run_config)
# Get cleanup intervals from config (in seconds)
failed_task_cleanup_age = current_app.config.get('FAILED_TASK_CLEANUP_AGE', 86400) # Default 24 hours for failed tasks
current_app.logger.info(f"Janitor: FAILED_TASK_CLEANUP_AGE = {failed_task_cleanup_age} seconds")
try:
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
@ -48,6 +53,10 @@ def cleanup_expired_tasks():
current_time = conn.execute("SELECT datetime('now', '+8 hours')").fetchone()[0]
current_app.logger.info(f"Janitor cleanup check at: {current_time}")
# Bases used by both sections
upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '')
output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '')
# Debug: Check for tasks with delete_after_at set
debug_rows = conn.execute("""
SELECT task_id, delete_after_at, downloaded_at
@ -60,7 +69,7 @@ def cleanup_expired_tasks():
for row in debug_rows:
current_app.logger.info(f" Task {row['task_id']}: delete_at={row['delete_after_at']}, downloaded_at={row['downloaded_at']}")
# Find tasks that need to be deleted
# 1) Scheduled deletions (delete_after_at)
rows = conn.execute("""
SELECT task_id, output_dir
FROM tasks
@ -69,63 +78,133 @@ def cleanup_expired_tasks():
AND delete_after_at <= datetime('now', '+8 hours')
""").fetchall()
if not rows:
return # No tasks to clean up
if rows:
current_app.logger.info(f"Janitor found {len(rows)} expired tasks to clean up")
current_app.logger.info(f"Janitor found {len(rows)} expired tasks to clean up")
for row in rows:
task_id = row['task_id']
output_dir = row['output_dir']
upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '')
output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '')
try:
delete_targets = []
for row in rows:
task_id = row['task_id']
output_dir = row['output_dir']
# 记录在库中的 output_dir
if output_dir:
p = Path(output_dir)
delete_targets.append(p)
try:
delete_targets = []
# 兜底:按约定 outputs/<task_id>
derived_output_dir = output_base / task_id if output_base else None
if derived_output_dir and derived_output_dir not in delete_targets:
delete_targets.append(derived_output_dir)
# 记录在库中的 output_dir
if output_dir:
p = Path(output_dir)
delete_targets.append(p)
# 同时删除 uploads/<task_id>
derived_upload_dir = upload_base / task_id if upload_base else None
if derived_upload_dir:
delete_targets.append(derived_upload_dir)
# 兜底:按约定 outputs/<task_id>
derived_output_dir = output_base / task_id if output_base else None
if derived_output_dir and derived_output_dir not in delete_targets:
delete_targets.append(derived_output_dir)
if dry_run:
for tgt in delete_targets:
if tgt:
current_app.logger.info(f"[DRY RUN] Would delete task {task_id} path: {tgt}")
else:
# 实际删除
for tgt in delete_targets:
try:
if tgt and tgt.exists():
shutil.rmtree(tgt, ignore_errors=True)
current_app.logger.info(f"Deleted path for task {task_id}: {tgt}")
else:
current_app.logger.warning(f"Path not found for task {task_id}: {tgt}")
except Exception as e:
current_app.logger.error(f"Failed to delete path {tgt} for task {task_id}: {e}")
# 同时删除 uploads/<task_id>
derived_upload_dir = upload_base / task_id if upload_base else None
if derived_upload_dir:
delete_targets.append(derived_upload_dir)
# Hard delete from database
conn.execute(
"DELETE FROM tasks WHERE task_id = ?",
(task_id,)
)
conn.commit()
current_app.logger.info(f"Hard deleted task {task_id} from database")
if dry_run:
for tgt in delete_targets:
if tgt:
current_app.logger.info(f"[DRY RUN] Would delete task {task_id} path: {tgt}")
else:
# 实际删除
for tgt in delete_targets:
try:
if tgt and tgt.exists():
shutil.rmtree(tgt, ignore_errors=True)
current_app.logger.info(f"Deleted path for task {task_id}: {tgt}")
else:
current_app.logger.warning(f"Path not found for task {task_id}: {tgt}")
except Exception as e:
current_app.logger.error(f"Failed to delete path {tgt} for task {task_id}: {e}")
except Exception as e:
current_app.logger.error(f"Failed to delete task {task_id}: {str(e)}", exc_info=True)
# Continue with other tasks even if one fails
# Hard delete from database
conn.execute(
"DELETE FROM tasks WHERE task_id = ?",
(task_id,)
)
conn.commit()
current_app.logger.info(f"Hard deleted task {task_id} from database")
# Debug: Check all failed tasks
debug_failed = conn.execute("""
SELECT task_id, status, created_at, started_at, finished_at, deleted_at
FROM tasks
WHERE status = 'failed' AND deleted_at IS NULL
""").fetchall()
if debug_failed:
current_app.logger.info(f"Janitor found {len(debug_failed)} total failed tasks:")
for row in debug_failed:
current_app.logger.info(f" Task {row['task_id']}: created={row['created_at']}, started={row['started_at']}, finished={row['finished_at']}")
except Exception as e:
current_app.logger.error(f"Failed to delete task {task_id}: {str(e)}", exc_info=True)
# Continue with other tasks even if one fails
# Clean up failed tasks that are older than configured age
failed_rows = conn.execute("""
SELECT task_id, output_dir
FROM tasks
WHERE status = 'failed'
AND deleted_at IS NULL
AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds')
""", (failed_task_cleanup_age,)).fetchall()
if failed_rows:
current_app.logger.info(f"Janitor found {len(failed_rows)} failed tasks older than {failed_task_cleanup_age} seconds")
for row in failed_rows:
current_app.logger.info(f" Failed task: {row['task_id']}")
for row in failed_rows:
task_id = row['task_id']
output_dir = row['output_dir']
try:
delete_targets = []
# 记录在库中的 output_dir
if output_dir:
p = Path(output_dir)
delete_targets.append(p)
# 兜底:按约定 outputs/<task_id>
derived_output_dir = output_base / task_id if output_base else None
if derived_output_dir and derived_output_dir not in delete_targets:
delete_targets.append(derived_output_dir)
# 同时删除 uploads/<task_id>
derived_upload_dir = upload_base / task_id if upload_base else None
if derived_upload_dir:
delete_targets.append(derived_upload_dir)
if dry_run:
for tgt in delete_targets:
if tgt:
current_app.logger.info(f"[DRY RUN] Would delete failed task {task_id} path: {tgt}")
else:
# 实际删除
for tgt in delete_targets:
try:
if tgt and tgt.exists():
shutil.rmtree(tgt, ignore_errors=True)
current_app.logger.info(f"Deleted failed task {task_id} path: {tgt}")
else:
current_app.logger.warning(f"Path not found for failed task {task_id}: {tgt}")
except Exception as e:
current_app.logger.error(f"Failed to delete path {tgt} for failed task {task_id}: {e}")
# Hard delete from database
conn.execute(
"DELETE FROM tasks WHERE task_id = ?",
(task_id,)
)
conn.commit()
current_app.logger.info(f"Auto-deleted failed task {task_id} from database (older than {failed_task_cleanup_age} seconds)")
except Exception as e:
current_app.logger.error(f"Failed to delete failed task {task_id}: {str(e)}", exc_info=True)
# Continue with other tasks even if one fails
except Exception as e:
current_app.logger.error(f"Database error during cleanup: {str(e)}", exc_info=True)
@ -138,24 +217,99 @@ def reconcile_tasks_on_startup():
"""Reconcile task states on application startup."""
db_path = _get_db_path()
# Get cleanup ages from config (in seconds)
successful_task_cleanup_age = current_app.config.get('SUCCESSFUL_TASK_CLEANUP_AGE', 3600)
failed_task_cleanup_age = current_app.config.get('FAILED_TASK_CLEANUP_AGE', 86400)
try:
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=ON")
# Fix tasks that were downloaded but don't have delete_after_at set
# This handles cases where the app crashed after marking downloaded but before setting delete time
conn.execute("""
UPDATE tasks
SET delete_after_at = COALESCE(delete_after_at, datetime(downloaded_at, '+10 minutes'))
SET delete_after_at = COALESCE(delete_after_at, datetime(downloaded_at, '+' || ? || ' seconds'))
WHERE downloaded_at IS NOT NULL
AND deleted_at IS NULL
AND delete_after_at IS NULL
""")
""", (successful_task_cleanup_age,))
updated_count = conn.total_changes
if updated_count > 0:
current_app.logger.info(f"Startup reconciliation: Fixed delete_after_at for {updated_count} downloaded tasks")
# Clean up failed tasks that are older than configured age
failed_rows = conn.execute("""
SELECT task_id, output_dir
FROM tasks
WHERE status = 'failed'
AND deleted_at IS NULL
AND COALESCE(finished_at, started_at, created_at) <= datetime('now', '+8 hours', '-' || ? || ' seconds')
""", (failed_task_cleanup_age,)).fetchall()
if failed_rows:
current_app.logger.info(f"Startup reconciliation: Found {len(failed_rows)} failed tasks older than {failed_task_cleanup_age} seconds")
upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '')
output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '')
dry_run_config = current_app.config.get('JANITOR_DRY_RUN', 'false')
if isinstance(dry_run_config, str):
dry_run = dry_run_config.lower() == 'true'
else:
dry_run = bool(dry_run_config)
for row in failed_rows:
task_id = row['task_id']
output_dir = row['output_dir']
try:
delete_targets = []
# 记录在库中的 output_dir
if output_dir:
p = Path(output_dir)
delete_targets.append(p)
# 兜底:按约定 outputs/<task_id>
derived_output_dir = output_base / task_id if output_base else None
if derived_output_dir and derived_output_dir not in delete_targets:
delete_targets.append(derived_output_dir)
# 同时删除 uploads/<task_id>
derived_upload_dir = upload_base / task_id if upload_base else None
if derived_upload_dir:
delete_targets.append(derived_upload_dir)
if dry_run:
for tgt in delete_targets:
if tgt:
current_app.logger.info(f"[DRY RUN] Would delete failed task {task_id} path: {tgt}")
else:
# 实际删除
for tgt in delete_targets:
try:
if tgt and tgt.exists():
import shutil
shutil.rmtree(tgt, ignore_errors=True)
current_app.logger.info(f"Deleted failed task {task_id} path: {tgt}")
else:
current_app.logger.warning(f"Path not found for failed task {task_id}: {tgt}")
except Exception as e:
current_app.logger.error(f"Failed to delete path {tgt} for failed task {task_id}: {e}")
# Hard delete from database
conn.execute(
"DELETE FROM tasks WHERE task_id = ?",
(task_id,)
)
current_app.logger.info(f"Startup reconciliation: Auto-deleted failed task {task_id} from database (older than {failed_task_cleanup_age} seconds)")
except Exception as e:
current_app.logger.error(f"Failed to delete failed task {task_id}: {str(e)}", exc_info=True)
# Check for tasks with output directories that no longer exist
# This helps clean up database entries for manually deleted directories
rows = conn.execute("""

Binary file not shown.