From b9828a1b1388946636a28c07c72de0703bf4e0a7 Mon Sep 17 00:00:00 2001 From: zhanghuilai Date: Mon, 9 Feb 2026 17:10:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84:=20=E5=88=87=E6=8D=A2?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E8=87=B3SQLite=EF=BC=8C=E5=90=AF=E7=94=A8INI?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=8EAPI=20Key=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CONFIG_README.md | 165 +++++++++ FRONTEND_CONFIG_README.md | 165 +++++++++ GASFLUX_CONFIG_DOCUMENTATION.md | 92 +++-- create_api_key.py | 63 ++++ frontend_config_options.json | 272 ++++++++++++++ frontend_integration_example.js | 237 +++++++++++++ gasflux.ini | 44 +++ gasflux.ini.example | 36 ++ server_waitress.py | 32 +- src/gasflux/app.py | 498 ++++++++++++++------------ src/gasflux/auth.py | 155 ++++++++ src/gasflux/blueprints/api_keys.py | 95 +++++ src/gasflux/blueprints/download.py | 70 +++- src/gasflux/blueprints/health.py | 2 +- src/gasflux/blueprints/reports.py | 23 +- src/gasflux/blueprints/stats.py | 40 ++- src/gasflux/blueprints/task_pool.py | 53 ++- src/gasflux/blueprints/tasks.py | 19 +- src/gasflux/blueprints/upload.py | 47 +-- src/gasflux/blueprints/web.py | 43 ++- src/gasflux/cli.py | 7 +- src/gasflux/config_reader.py | 236 +++++++++++++ src/gasflux/data_processor.py | 16 +- src/gasflux/db.py | 156 ++++++++ src/gasflux/janitor.py | 209 +++++++++++ src/gasflux/processing_pipelines.py | 6 +- src/gasflux/reporting.py | 12 +- src/gasflux/run_example.py | 10 +- src/gasflux/shared.py | 529 +++++++++++++++------------- 新建文本文档.txt | 1 + 30 files changed, 2721 insertions(+), 612 deletions(-) create mode 100644 CONFIG_README.md create mode 100644 FRONTEND_CONFIG_README.md create mode 100644 create_api_key.py create mode 100644 frontend_config_options.json create mode 100644 frontend_integration_example.js create mode 100644 gasflux.ini create mode 100644 gasflux.ini.example create mode 100644 src/gasflux/auth.py create mode 100644 src/gasflux/blueprints/api_keys.py create mode 100644 src/gasflux/config_reader.py create mode 100644 src/gasflux/db.py create mode 100644 src/gasflux/janitor.py create mode 100644 新建文本文档.txt diff --git a/CONFIG_README.md b/CONFIG_README.md new file mode 100644 index 0000000..bd8513e --- /dev/null +++ b/CONFIG_README.md @@ -0,0 +1,165 @@ +# GasFlux INI Configuration Guide + +GasFlux 现在支持使用 INI 配置文件来管理路径和网络设置,提供更灵活的部署选项。 + +## 配置文件位置 + +GasFlux 会按以下顺序查找配置文件: +1. `gasflux.ini` (当前目录) +2. `config.ini` (当前目录) +3. `gasflux.ini` (项目根目录) + +如果找不到配置文件,将使用默认值和环境变量。 + +## 配置示例 + +复制 `gasflux.ini.example` 为 `gasflux.ini` 并根据需要修改: + +```ini +[server] +# 服务器配置 +host = 0.0.0.0 +port = 5000 +debug = false +base_url = http://localhost:5000 + +[paths] +# 目录路径配置(相对于项目根目录或绝对路径) +uploads = ./web_api_data/uploads +outputs = ./web_api_data/outputs + +[limits] +# 文件大小限制 +max_content_length = 104857600 # 100MB + +[logging] +# 日志配置 +level = INFO +file = logs/gasflux_api.log + +[security] +# 安全设置 +admin_bootstrap_key = your_secret_key_here + +[cleanup] +# 任务清理设置 +task_cleanup_interval = 3600 # 1小时 +max_task_age = 86400 # 24小时 +janitor_dry_run = false + +[performance] +# 性能调优 +threads = 8 +connection_limit = 100 +channel_timeout = 300 + +[database] +# 数据库配置 +path = + +[api_keys] +# API密钥持久化设置 +persist_backend = sqlite +``` + +## 配置说明 + +### [server] 服务器配置 +- `host`: 服务器监听地址 (默认: 0.0.0.0) +- `port`: 服务器监听端口 (默认: 5000) +- `debug`: 调试模式 (默认: false) +- `base_url`: 对外访问的基础URL (默认: http://localhost:5000) + +### [paths] 路径配置 +- `uploads`: 上传文件存储目录 (默认: ./web_api_data/uploads) +- `outputs`: 计算结果输出目录 (默认: ./web_api_data/outputs) + +路径可以是: +- 相对路径(相对于项目根目录) +- 绝对路径 +- 支持 `~` 展开用户主目录 + +### [limits] 限制配置 +- `max_content_length`: 最大文件上传大小,字节 (默认: 104857600 = 100MB) + +### [logging] 日志配置 +- `level`: 日志级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL) +- `file`: 日志文件路径 (默认: logs/gasflux_api.log) + +### [security] 安全配置 +- `admin_bootstrap_key`: 管理员引导密钥,用于初始创建API密钥 + +### [cleanup] 清理配置 +- `task_cleanup_interval`: 任务清理检查间隔,秒 (默认: 3600) +- `max_task_age`: 任务最大保留时间,秒 (默认: 86400) +- `janitor_dry_run`: 清理程序干运行模式,只记录不实际删除 (默认: false) + +### [performance] 性能配置 +- `threads`: 服务器线程数 (默认: 8) +- `connection_limit`: 连接限制 (默认: 100) +- `channel_timeout`: 通道超时,秒 (默认: 300) + +### [database] 数据库配置 +- `path`: SQLite数据库文件路径,为空则使用默认位置 + +### [api_keys] API密钥配置 +- `persist_backend`: 任务持久化后端 (json/sqlite/both,默认: sqlite) + +## 环境变量兼容性 + +所有配置项都可以通过环境变量覆盖,格式为 `GASFLUX_{SECTION}_{KEY}`,例如: +- `GASFLUX_SERVER_HOST=127.0.0.1` +- `GASFLUX_PATHS_UPLOADS=/custom/uploads` + +环境变量优先级高于 INI 文件配置。 + +## 路径处理逻辑 + +1. **全局目录**: `uploads` 和 `outputs` 目录由 INI 配置决定 +2. **任务子目录**: 每个任务在这些目录下创建 `{task_id}` 子目录 +3. **YAML 配置**: 上传的 YAML 配置文件中的 `output_dir` 仍然可以覆盖全局设置 + +## 启动验证 + +启动时会显示配置信息: +``` +Configuration loaded from: /path/to/gasflux.ini +Directories initialized - Upload: /path/to/uploads, Output: /path/to/outputs +``` + +## 示例配置 + +### 开发环境 +```ini +[server] +host = 127.0.0.1 +port = 5000 +debug = true + +[paths] +uploads = ./dev_uploads +outputs = ./dev_outputs + +[logging] +level = DEBUG +``` + +### 生产环境 +```ini +[server] +host = 0.0.0.0 +port = 80 +debug = false +base_url = https://api.example.com + +[paths] +uploads = /data/gasflux/uploads +outputs = /data/gasflux/outputs + +[logging] +level = INFO +file = /var/log/gasflux/api.log + +[security] +admin_bootstrap_key = your_production_bootstrap_key +``` \ No newline at end of file diff --git a/FRONTEND_CONFIG_README.md b/FRONTEND_CONFIG_README.md new file mode 100644 index 0000000..f2c10d6 --- /dev/null +++ b/FRONTEND_CONFIG_README.md @@ -0,0 +1,165 @@ +# GasFlux前端配置下拉选项说明 + +本文档描述了如何在前端应用中使用GasFlux的配置下拉选项。 + +## 文件结构 + +- `frontend_config_options.json` - 主要配置文件,包含所有下拉选项的定义 +- `frontend_integration_example.js` - React组件集成示例 +- `FRONTEND_CONFIG_README.md` - 本说明文档 + +## 配置结构说明 + +### 主要分类 + +配置选项按以下类别组织: + +1. **处理策略 (strategies)** - 核心处理策略设置 +2. **背景校正算法 (algorithmic_baseline)** - 基线校正算法选择 +3. **半变异函数设置 (semivariogram)** - 克里金插值参数 +4. **普通克里金设置 (ordinary_kriging)** - 插值核心参数 + +### 选项字段说明 + +每个配置选项包含以下字段: + +```json +{ + "key": "option_key", // 配置键名 + "name": "中文名称", // 中文显示名称 + "nameEn": "English Name", // 英文显示名称 + "description": "中文描述", // 中文描述 + "type": "select|boolean", // 选项类型 + "required": true, // 是否必填 + "default": "default_value", // 默认值 + "options": [ // 选项列表 + { + "value": "option_value", // 选项值 + "label": "显示标签", // 显示标签 + "labelEn": "Label", // 英文标签 + "description": "选项描述" // 选项详细描述 + } + ] +} +``` + +## 前端集成步骤 + +### 1. 加载配置选项 + +```javascript +import configOptions from './frontend_config_options.json'; + +// 或通过API加载 +fetch('/api/config-options') + .then(res => res.json()) + .then(data => setConfigOptions(data)); +``` + +### 2. 渲染下拉组件 + +```javascript +const renderSelect = (category, option) => { + return ( + + ); +}; +``` + +### 3. 应用空间模式默认值 + +当用户选择不同的空间处理模式时,自动应用推荐的默认参数: + +```javascript +const applySpatialModeDefaults = (spatialMode) => { + const defaults = configOptions.validation.spatial_mode_defaults[spatialMode]; + if (defaults) { + // 应用半变异函数和克里金参数的默认值 + updateConfig({ + semivariogram_settings: defaults.semivariogram_settings, + ordinary_kriging_settings: defaults.ordinary_kriging_settings + }); + } +}; +``` + +## 具体选项说明 + +### 1. 空间处理模式 (strategies.spatial) + +- **curtain**: 平面模式,适用于直线或平行航线 +- **spiral**: 螺旋模式,适用于螺旋或圆形飞行路径 + +选择不同模式会自动调整相关的半变异函数和克里金参数。 + +### 2. 背景校正算法 (algorithmic_baseline.algorithm) + +- **fastchrom**: FastChrom算法(推荐),适用于大多数情况 +- **dietrich**: Dietrich算法,基于多项式拟合 +- **fabc**: FABC算法,全自动基线校正 +- **golotvin**: Golotvin算法,分段式基线校正 + +### 3. 半变异函数模型 (semivariogram.model) + +- **spherical**: 球面模型,适用于大多数地理数据 +- **gaussian**: 高斯模型,适用于平滑变化的数据 +- **exponential**: 指数模型,适用于螺旋模式数据 + +### 4. 切割地面选项 (ordinary_kriging.cut_ground) + +- **true**: 切割地面以下的负值 +- **false**: 保留所有插值结果 + +## 验证规则 + +### 必填字段 +所有标记为 `required: true` 的选项都是必填的。 + +### 空间模式默认值 +配置中包含了针对不同空间模式的推荐参数: + +- **平面模式 (curtain)**: + - 半变异函数容差: 15° + - 最大滞后距离: 80m + - 网格分辨率: 200 + +- **螺旋模式 (spiral)**: + - 半变异函数容差: 30° + - 最大滞后距离: 100m + - 网格分辨率: 500 + +## 使用建议 + +1. **界面设计**: 为每个选项提供清晰的标签和描述 +2. **默认值**: 总是显示并应用合理的默认值 +3. **联动更新**: 空间模式改变时自动更新相关参数 +4. **验证**: 在提交前验证所有必填字段 +5. **保存**: 支持配置的保存和加载功能 + +## 扩展配置 + +对于不在下拉选项中的参数(如数值范围、路径等),建议使用: + +- **数值输入框**: 带范围验证的数字输入 +- **文本输入框**: 文件路径、自定义名称等 +- **滑块组件**: 百分比、比例等数值调整 +- **复选框组**: 多选配置项 + +## 版本控制 + +- **版本**: 1.0 +- **最后更新**: 2025-02-09 +- **兼容性**: 支持GasFlux配置文档v1.0 + +--- + +*此配置系统基于GasFlux官方文档设计,如有更新请参考最新文档。* \ No newline at end of file diff --git a/GASFLUX_CONFIG_DOCUMENTATION.md b/GASFLUX_CONFIG_DOCUMENTATION.md index 2ccbb2f..ab382a6 100644 --- a/GASFLUX_CONFIG_DOCUMENTATION.md +++ b/GASFLUX_CONFIG_DOCUMENTATION.md @@ -309,57 +309,75 @@ ordinary_kriging_settings: ### 螺旋模式完整配置 ```yaml -output_dir: ./spiral_output +# GasFlux configuration file for basic usage example +output_dir: ./10m + +# Required columns and their maximum valid ranges required_cols: latitude: [-90, 90] longitude: [-180, 180] - height_ato: [0, 200] - windspeed: [0, 20] - winddir: [0, 360] - temperature: [-50, 60] - pressure: [900, 1100] + height_ato: [0, 50] # meters above takeoff + windspeed: [0, 50] # m/s + winddir: [0, 360] # degrees + temperature: [-50, 60] # degrees Celsius + pressure: [900, 1100] # hPa/mb +# Optional gas columns and their maximum ppm concentration ranges. +# Relative concentrations are used, so absolute offset can be incorrect as long as gain and linearity are correct. gases: - co2: [300, 500] - ch4: [1.5, 10.0] + ch4: [2, 2.5] + co2: [2, 2.5] strategies: - background: "algorithm" - sensor: "insitu" - spatial: "spiral" - interpolation: "kriging" + background: "algorithm" # Currently only algorithmic baseline correction (via pybaselines) is supported + sensor: "insitu" # Currently only in-situ sensor data is supported + spatial: "spiral" # Spatial processing mode: "curtain" and "spiral" are supported + interpolation: "kriging" # Currently only kriging interpolation is supported +# Baseline correction algorithm settings algorithmic_baseline_settings: algorithm: fastchrom - fastchrom: - half_window: 6 - threshold: "custom" - min_fwhm: ~ - interp_half_window: 3 - smooth_half_window: 3 - weights: ~ - max_iter: 100 - min_length: 2 + fastchrom: { + "half_window": 6, + "threshold": "custom", # + "min_fwhm": ~, + "interp_half_window": 3, + "smooth_half_window": 3, + "weights": ~, + "max_iter": 100, + "min_length": 2} + fabc : { + "lam": 10000, # The smoothing parameter. Larger values will create smoother baselines. Default is 1e6. + "scale": 10, # The scale at which to calculate the continuous wavelet transform. Should be approximately equal to the index-based full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from optimize_window(), which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. + "diff_order": 2} # The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. + dietrich : { + "poly_order": 5, + "smooth_half_window": 5,} + golotvin : { + "half_window": 2, + "sections": 10} +# Kriging settings - aggressively optimized for Spiral mode semivariogram_settings: - model: spherical - estimator: cressie - n_lags: 20 - bin_func: even - fit_method: lm - maxlag: 100 - tolerance: 30 - azimuth: 0 - bandwidth: 20 - + model: exponential # Changed to exponential model for better circular data fitting + estimator: cressie # Robust estimator for variogram calculation + n_lags: 50 # Further increased to 50 for better variogram resolution + bin_func: even # Even binning function + fit_method: lm # Least squares fitting method + ### Aggressively increased search ranges for circular/spiral data distribution + maxlag: 5000 # Dramatically increased to 5000m for comprehensive coverage + tolerance: 180 # Increased to 180° to allow full circular search + azimuth: 0 # Horizontal direction maintained + bandwidth: 300 # Further increased to 300m for maximum search bandwidth + # fit_sigma: linear # this should allow for a spatial uncertainty but currently producing bugs ordinary_kriging_settings: - min_points: 3 - max_points: 100 - grid_resolution: 500 - min_nodes: 10 - y_min: ~ - cut_ground: True + min_points: 1 # Reduced to 1 to allow interpolation even with sparse data + max_points: 20 # Further reduced to 20 to minimize computational load + grid_resolution: 100 # Increased density to 100 for finer interpolation grid + min_nodes: 20 # Increased to 20 to ensure sufficient grid nodes + y_min: ~ # Automatically determine minimum y value + cut_ground: False ``` ## 参数调优指南 diff --git a/create_api_key.py b/create_api_key.py new file mode 100644 index 0000000..bbc1391 --- /dev/null +++ b/create_api_key.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +""" +Create initial API key for GasFlux +""" + +import sys +from pathlib import Path + +# Add src to path +src_dir = Path(__file__).parent / "src" +sys.path.insert(0, str(src_dir)) + +from gasflux.config_reader import config_reader + +def main(): + print("GasFlux API Key Creator") + print("=" * 30) + + # Check if bootstrap key is configured + bootstrap_key = config_reader.admin_bootstrap_key + if not bootstrap_key: + print("ERROR: No admin_bootstrap_key configured in gasflux.ini") + print("Please set [security] admin_bootstrap_key in gasflux.ini first") + return + + print(f"Bootstrap key configured: {'Yes' if bootstrap_key else 'No'}") + + # Get key details + description = input("Enter API key description (default: 'Default API Key'): ").strip() + if not description: + description = "Default API Key" + + scopes = input("Enter API key scopes (default: '*'): ").strip() + if not scopes: + scopes = "*" + + try: + from gasflux.app import app + from gasflux.auth import create_api_key + + with app.app_context(): + key_id, plain_key = create_api_key(description=description, scopes=scopes) + + print("\n✅ API Key Created Successfully!") + print("=" * 30) + print(f"Key ID: {key_id}") + print(f"Key: {plain_key}") + print(f"Description: {description}") + print(f"Scopes: {scopes}") + print("\n⚠️ IMPORTANT: Save this key securely!") + print(" This key will only be shown once.") + print("\nTo use this key in API requests:") + print(f" curl -H 'X-API-Key: {plain_key}' http://localhost:5000/upload") + print("\nOr set it as an environment variable:") + print(f" export GASFLUX_API_KEY={plain_key}") + + except Exception as e: + print(f"ERROR: Failed to create API key: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/frontend_config_options.json b/frontend_config_options.json new file mode 100644 index 0000000..7fb139e --- /dev/null +++ b/frontend_config_options.json @@ -0,0 +1,272 @@ +{ + "version": "1.0", + "description": "GasFlux前端配置下拉选项定义", + "lastUpdated": "2025-02-09", + "categories": [ + { + "id": "strategies", + "name": "处理策略", + "nameEn": "Processing Strategies", + "description": "核心处理策略配置", + "options": [ + { + "key": "spatial", + "name": "空间处理模式", + "nameEn": "Spatial Processing Mode", + "description": "选择飞行路径的处理模式", + "type": "select", + "required": true, + "default": "curtain", + "options": [ + { + "value": "curtain", + "label": "平面模式 (Curtain)", + "labelEn": "Curtain Mode", + "description": "适用于直线或平行航线飞行路径", + "descriptionEn": "Suitable for straight or parallel flight paths" + }, + { + "value": "spiral", + "label": "螺旋模式 (Spiral)", + "labelEn": "Spiral Mode", + "description": "适用于螺旋或圆形飞行路径", + "descriptionEn": "Suitable for spiral or circular flight paths" + } + ] + } + ] + }, + { + "id": "algorithmic_baseline", + "name": "背景校正算法", + "nameEn": "Baseline Correction Algorithm", + "description": "选择背景校正算法", + "options": [ + { + "key": "algorithm", + "name": "算法类型", + "nameEn": "Algorithm Type", + "description": "选择使用的背景校正算法", + "type": "select", + "required": true, + "default": "fastchrom", + "options": [ + { + "value": "fastchrom", + "label": "FastChrom (推荐)", + "labelEn": "FastChrom (Recommended)", + "description": "快速色谱算法,适用于大多数情况", + "descriptionEn": "Fast chromatogram algorithm, suitable for most cases" + }, + { + "value": "dietrich", + "label": "Dietrich算法", + "labelEn": "Dietrich Algorithm", + "description": "基于多项式拟合的算法", + "descriptionEn": "Polynomial fitting based algorithm" + }, + { + "value": "fabc", + "label": "FABC算法", + "labelEn": "FABC Algorithm", + "description": "全自动基线校正算法", + "descriptionEn": "Fully Automatic Baseline Correction" + }, + { + "value": "golotvin", + "label": "Golotvin算法", + "labelEn": "Golotvin Algorithm", + "description": "分段式基线校正算法", + "descriptionEn": "Section-based baseline correction" + } + ] + } + ] + }, + { + "id": "semivariogram", + "name": "半变异函数设置", + "nameEn": "Semivariogram Settings", + "description": "克里金插值半变异函数参数", + "options": [ + { + "key": "model", + "name": "变异函数模型", + "nameEn": "Variogram Model", + "description": "选择变异函数的数学模型", + "type": "select", + "required": true, + "default": "spherical", + "options": [ + { + "value": "spherical", + "label": "球面模型", + "labelEn": "Spherical Model", + "description": "适用于大多数地理数据", + "descriptionEn": "Suitable for most geospatial data" + }, + { + "value": "gaussian", + "label": "高斯模型", + "labelEn": "Gaussian Model", + "description": "适用于平滑变化的数据", + "descriptionEn": "Suitable for smoothly varying data" + }, + { + "value": "exponential", + "label": "指数模型", + "labelEn": "Exponential Model", + "description": "适用于螺旋模式数据", + "descriptionEn": "Suitable for spiral mode data" + } + ] + }, + { + "key": "estimator", + "name": "估计器", + "nameEn": "Estimator", + "description": "选择变异函数的估计方法", + "type": "select", + "required": true, + "default": "cressie", + "options": [ + { + "value": "cressie", + "label": "Cressie (鲁棒)", + "labelEn": "Cressie (Robust)", + "description": "鲁棒估计器,对异常值不敏感", + "descriptionEn": "Robust estimator, insensitive to outliers" + }, + { + "value": "matheron", + "label": "Matheron (经典)", + "labelEn": "Matheron (Classic)", + "description": "经典的经验变异函数估计", + "descriptionEn": "Classic empirical variogram estimation" + }, + { + "value": "dowd", + "label": "Dowd", + "labelEn": "Dowd", + "description": "另一种变异函数估计方法", + "descriptionEn": "Alternative variogram estimation method" + } + ] + }, + { + "key": "bin_func", + "name": "分箱函数", + "nameEn": "Binning Function", + "description": "选择数据分箱的方法", + "type": "select", + "required": true, + "default": "even", + "options": [ + { + "value": "even", + "label": "均匀分箱", + "labelEn": "Even Binning", + "description": "将数据均匀分配到各个箱中", + "descriptionEn": "Distribute data evenly across bins" + }, + { + "value": "uniform", + "label": "统一分箱", + "labelEn": "Uniform Binning", + "description": "使用统一的箱宽度", + "descriptionEn": "Use uniform bin widths" + } + ] + }, + { + "key": "fit_method", + "name": "拟合方法", + "nameEn": "Fitting Method", + "description": "选择变异函数的拟合方法", + "type": "select", + "required": true, + "default": "lm", + "options": [ + { + "value": "lm", + "label": "最小二乘法", + "labelEn": "Least Squares", + "description": "使用最小二乘法拟合模型", + "descriptionEn": "Fit model using least squares method" + }, + { + "value": "manual", + "label": "手动拟合", + "labelEn": "Manual Fitting", + "description": "手动调整参数进行拟合", + "descriptionEn": "Manually adjust parameters for fitting" + } + ] + } + ] + }, + { + "id": "ordinary_kriging", + "name": "普通克里金设置", + "nameEn": "Ordinary Kriging Settings", + "description": "克里金插值核心参数", + "options": [ + { + "key": "cut_ground", + "name": "切割地面", + "nameEn": "Cut Ground", + "description": "是否切割地面以下的值", + "type": "boolean", + "required": true, + "default": true, + "options": [ + { + "value": true, + "label": "是", + "labelEn": "Yes", + "description": "切割地面以下的负值", + "descriptionEn": "Cut negative values below ground" + }, + { + "value": false, + "label": "否", + "labelEn": "No", + "description": "保留所有插值结果", + "descriptionEn": "Keep all interpolation results" + } + ] + } + ] + } + ], + "validation": { + "spatial_mode_defaults": { + "curtain": { + "semivariogram_settings": { + "model": "spherical", + "tolerance": 15, + "maxlag": 80, + "bandwidth": 25 + }, + "ordinary_kriging_settings": { + "grid_resolution": 200, + "min_points": 5, + "max_points": 80 + } + }, + "spiral": { + "semivariogram_settings": { + "model": "exponential", + "tolerance": 30, + "maxlag": 100, + "bandwidth": 20 + }, + "ordinary_kriging_settings": { + "grid_resolution": 500, + "min_points": 3, + "max_points": 100 + } + } + } + } +} \ No newline at end of file diff --git a/frontend_integration_example.js b/frontend_integration_example.js new file mode 100644 index 0000000..03da92d --- /dev/null +++ b/frontend_integration_example.js @@ -0,0 +1,237 @@ +// GasFlux前端配置下拉选项集成示例 +// 使用方法:将 frontend_config_options.json 加载到前端应用中 + +// 示例:React组件集成 +import React, { useState, useEffect } from 'react'; +import configOptions from './frontend_config_options.json'; + +const GasFluxConfigForm = () => { + const [config, setConfig] = useState({}); + const [configOptionsData, setConfigOptionsData] = useState(null); + + useEffect(() => { + // 加载配置选项 + setConfigOptionsData(configOptions); + }, []); + + const handleSelectChange = (categoryId, optionKey, value) => { + setConfig(prev => ({ + ...prev, + [categoryId]: { + ...prev[categoryId], + [optionKey]: value + } + })); + + // 应用空间模式默认值 + if (categoryId === 'strategies' && optionKey === 'spatial') { + applySpatialModeDefaults(value); + } + }; + + const applySpatialModeDefaults = (spatialMode) => { + const defaults = configOptionsData?.validation?.spatial_mode_defaults?.[spatialMode]; + if (defaults) { + setConfig(prev => ({ + ...prev, + semivariogram_settings: { + ...prev.semivariogram_settings, + ...defaults.semivariogram_settings + }, + ordinary_kriging_settings: { + ...prev.ordinary_kriging_settings, + ...defaults.ordinary_kriging_settings + } + })); + } + }; + + const renderSelect = (category, option) => { + const currentValue = config[category.id]?.[option.key] || option.default; + + return ( +
+ + +
+ {option.description} ({option.descriptionEn}) +
+ {option.options.find(opt => opt.value === currentValue)?.description && ( +
+ {option.options.find(opt => opt.value === currentValue).description} +
+ )} +
+ ); + }; + + const renderBoolean = (category, option) => { + const currentValue = config[category.id]?.[option.key] ?? option.default; + + return ( +
+ +
+ {option.options.map(opt => ( + + ))} +
+
+ ); + }; + + if (!configOptionsData) { + return
加载配置选项中...
; + } + + return ( +
+

GasFlux 配置设置

+ + {configOptionsData.categories.map(category => ( +
+

{category.name} ({category.nameEn})

+

{category.description}

+ + {category.options.map(option => { + if (option.type === 'select') { + return renderSelect(category, option); + } else if (option.type === 'boolean') { + return renderBoolean(category, option); + } + return null; + })} +
+ ))} + +
+

当前配置预览

+
{JSON.stringify(config, null, 2)}
+
+ + +
+ ); +}; + +export default GasFluxConfigForm; + +// CSS样式示例 +const styles = ` +.gasflux-config-form { + max-width: 800px; + margin: 0 auto; + padding: 20px; +} + +.config-category { + margin-bottom: 30px; + padding: 20px; + border: 1px solid #ddd; + border-radius: 8px; +} + +.config-option { + margin-bottom: 20px; +} + +.config-option label { + display: block; + font-weight: bold; + margin-bottom: 8px; +} + +.config-option select { + width: 100%; + padding: 8px; + border: 1px solid #ccc; + border-radius: 4px; +} + +.boolean-options { + display: flex; + gap: 20px; +} + +.radio-label { + display: flex; + flex-direction: column; + cursor: pointer; +} + +.option-description, .value-description { + margin-top: 5px; + font-size: 0.9em; + color: #666; +} + +.required { + color: red; +} + +.config-preview { + margin-top: 30px; + padding: 20px; + background: #f5f5f5; + border-radius: 8px; +} + +.config-preview pre { + white-space: pre-wrap; + word-wrap: break-word; +} + +.save-config-btn { + background: #007bff; + color: white; + padding: 12px 24px; + border: none; + border-radius: 6px; + cursor: pointer; + font-size: 16px; +} + +.save-config-btn:hover { + background: #0056b3; +} +`; + +// 使用说明: +// 1. 将 frontend_config_options.json 放在项目中 +// 2. 在React组件中导入并使用 +// 3. 根据需要调整样式和布局 +// 4. 添加表单验证逻辑 +// 5. 实现配置保存和加载功能 \ No newline at end of file diff --git a/gasflux.ini b/gasflux.ini new file mode 100644 index 0000000..754b2b3 --- /dev/null +++ b/gasflux.ini @@ -0,0 +1,44 @@ +[server] +# Server configuration +host = 0.0.0.0 +port = 5000 +debug = false +base_url = http://localhost:5000 + +[paths] +# Directory paths (relative to project root or absolute) +uploads = ./web_api_data/uploads +outputs = ./web_api_data/outputs + +[limits] +# File size and performance limits +max_content_length = 104857600 # 100MB in bytes + +[logging] +# Logging configuration +level = INFO +file = logs/gasflux_api.log + +[security] +# API key and security settings +admin_bootstrap_key = bootstrap_key_2024 + +[cleanup] +# Task cleanup settings +task_cleanup_interval = 3600 # 1 hour in seconds +max_task_age = 86400 # 24 hours in seconds +janitor_dry_run = false + +[performance] +# Performance tuning +threads = 8 +connection_limit = 100 +channel_timeout = 300 + +[database] +# Database configuration +path = + +[api_keys] +# API key persistence settings +persist_backend = sqlite \ No newline at end of file diff --git a/gasflux.ini.example b/gasflux.ini.example new file mode 100644 index 0000000..87bc281 --- /dev/null +++ b/gasflux.ini.example @@ -0,0 +1,36 @@ +[server] +# Server configuration +host = 0.0.0.0 +port = 5000 +debug = false +base_url = http://localhost:5000 + +[paths] +# Directory paths (relative to project root or absolute) +uploads = ./web_api_data/uploads +outputs = ./web_api_data/outputs + +[limits] +# File size and performance limits +max_content_length = 104857600 # 100MB in bytes + +[logging] +# Logging configuration +level = INFO +file = logs/gasflux_api.log + +[security] +# API key and security settings +admin_bootstrap_key = + +[cleanup] +# Task cleanup settings +task_cleanup_interval = 3600 # 1 hour in seconds +max_task_age = 86400 # 24 hours in seconds +janitor_dry_run = false + +[performance] +# Performance tuning +threads = 8 +connection_limit = 100 +channel_timeout = 300 \ No newline at end of file diff --git a/server_waitress.py b/server_waitress.py index 58d4527..e4836d5 100644 --- a/server_waitress.py +++ b/server_waitress.py @@ -64,17 +64,21 @@ def main(): from waitress import serve print("✓ Waitress WSGI server imported") - # Server configuration from environment variables + # Server configuration from INI file (with environment variable fallbacks) host = Config.HOST port = Config.PORT threads = Config.THREADS connection_limit = Config.CONNECTION_LIMIT channel_timeout = Config.CHANNEL_TIMEOUT + base_url = Config.BASE_URL print(f"Starting Waitress server on {host}:{port}") print(f"- Threads: {threads}") print(f"- Connection limit: {connection_limit}") print(f"- Channel timeout: {channel_timeout}s") + print(f"- Base URL: {base_url}") + print(f"- Upload folder: {Config.UPLOAD_FOLDER}") + print(f"- Output folder: {Config.OUTPUT_FOLDER}") print("Press Ctrl+C to stop the server") print("=" * 50) print("Available API endpoints:") @@ -93,7 +97,31 @@ def main(): print(" GET /config - Configuration") print(" GET / - Web interface") print("=" * 50) - print(f"Configuration: {Config.to_dict()}") + + # Display configuration source info + try: + from src.gasflux.config_reader import config_reader + print("Configuration loaded from INI file:") + print(f" - Config file: {config_reader._get_config_file_path() or 'gasflux.ini (default)'}") + print(f" - Uploads path: {config_reader.uploads_path}") + print(f" - Outputs path: {config_reader.outputs_path}") + + # Check API keys + from src.gasflux.app import app + with app.app_context(): + from src.gasflux.db import get_db + db = get_db() + key_count = db.execute("SELECT COUNT(*) FROM api_keys WHERE revoked = 0").fetchone()[0] + print(f" - API keys: {key_count} active") + + if key_count == 0: + print("\n⚠️ No active API keys found!") + print(" Run 'python create_api_key.py' to create your first API key") + print(" Or use bootstrap key in API requests: -H 'X-Admin-Bootstrap-Key: bootstrap_key_2024'") + except Exception as e: + print(f"Configuration info error: {e}") + + print(f"\nFull configuration: {Config.to_dict()}") # Start the server with valid Waitress parameters serve( diff --git a/src/gasflux/app.py b/src/gasflux/app.py index a3c1ab8..14981a2 100644 --- a/src/gasflux/app.py +++ b/src/gasflux/app.py @@ -15,45 +15,64 @@ import yaml # Shared utilities imported from shared.py try: # Try relative import (when run as part of package) - from .shared import task_status, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING, TASK_STATUS_COMPLETED, TASK_STATUS_FAILED + from .shared import TASK_STATUS_PENDING, TASK_STATUS_PROCESSING, TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, update_task_status as shared_update_task_status except ImportError: # Fallback to absolute import (when run directly) - from shared import task_status, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING, TASK_STATUS_COMPLETED, TASK_STATUS_FAILED + from shared import TASK_STATUS_PENDING, TASK_STATUS_PROCESSING, TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, update_task_status as shared_update_task_status + +# Load configuration from INI file +try: + from .config_reader import config_reader +except ImportError: + from config_reader import config_reader # Blueprints will be imported after app initialization to avoid circular imports # Environment-based configuration management class Config: - """Configuration management using environment variables with defaults.""" + """Configuration management using INI file with environment variable fallbacks.""" - # Server configuration - HOST = os.getenv('GASFLUX_HOST', '0.0.0.0') - PORT = int(os.getenv('GASFLUX_PORT', '5000')) - DEBUG = os.getenv('GASFLUX_DEBUG', 'false').lower() in ('true', '1', 'yes', 'on') + # Server configuration from config_reader + HOST = config_reader.host + PORT = config_reader.port + DEBUG = config_reader.debug + BASE_URL = config_reader.base_url # Directory configuration BASE_DIR = None # Will be set dynamically - UPLOAD_FOLDER_NAME = os.getenv('GASFLUX_UPLOAD_FOLDER', 'web_api_data/uploads') - OUTPUT_FOLDER_NAME = os.getenv('GASFLUX_OUTPUT_FOLDER', 'web_api_data/outputs') + UPLOAD_FOLDER_NAME = str(config_reader.uploads_path) + OUTPUT_FOLDER_NAME = str(config_reader.outputs_path) # File size limits (in bytes) - MAX_CONTENT_LENGTH = int(os.getenv('GASFLUX_MAX_CONTENT_LENGTH', str(100 * 1024 * 1024))) # 100MB + MAX_CONTENT_LENGTH = config_reader.max_content_length # Logging configuration - LOG_LEVEL = os.getenv('GASFLUX_LOG_LEVEL', 'INFO').upper() - LOG_FILE = os.getenv('GASFLUX_LOG_FILE', 'logs/gasflux_api.log') + LOG_LEVEL = config_reader.log_level.upper() + LOG_FILE = config_reader.log_file - # CORS configuration + # CORS configuration (keeping environment fallback for now) CORS_ORIGINS = os.getenv('GASFLUX_CORS_ORIGINS', '*').split(',') # Task management - TASK_CLEANUP_INTERVAL = int(os.getenv('GASFLUX_TASK_CLEANUP_INTERVAL', '3600')) # 1 hour in seconds - MAX_TASK_AGE = int(os.getenv('GASFLUX_MAX_TASK_AGE', str(24 * 3600))) # 24 hours in seconds + TASK_CLEANUP_INTERVAL = config_reader.task_cleanup_interval + MAX_TASK_AGE = config_reader.max_task_age # Performance tuning - THREADS = int(os.getenv('GASFLUX_THREADS', '8')) # Waitress threads - CONNECTION_LIMIT = int(os.getenv('GASFLUX_CONNECTION_LIMIT', '100')) - CHANNEL_TIMEOUT = int(os.getenv('GASFLUX_CHANNEL_TIMEOUT', '300')) # 5 minutes + THREADS = config_reader.threads + CONNECTION_LIMIT = config_reader.connection_limit + CHANNEL_TIMEOUT = config_reader.channel_timeout + + # Database configuration + DB_PATH = config_reader.db_path if config_reader.db_path else None + + # Persistence backend + TASK_PERSIST_BACKEND = config_reader.persist_backend + + # Janitor configuration + JANITOR_DRY_RUN = config_reader.janitor_dry_run + + # Admin bootstrap key + ADMIN_BOOTSTRAP_KEY = config_reader.admin_bootstrap_key @classmethod def init_base_dir(cls): @@ -74,40 +93,35 @@ class Config: @classmethod def init_directories(cls, output_dir=None): - """Initialize upload and output directories.""" - if output_dir: - # Use config-based output directory - output_base = Path(output_dir) - if not output_base.is_absolute(): - output_base = cls.BASE_DIR / output_base - else: - # Use default relative paths - output_base = cls.BASE_DIR + """Initialize upload and output directories from configuration.""" + # Use paths from config_reader + uploads_path = config_reader.uploads_path + outputs_path = config_reader.outputs_path - # Set upload and output directories relative to output_base - cls.UPLOAD_FOLDER = output_base / "uploads" - cls.OUTPUT_FOLDER = output_base / "outputs" + # Resolve relative paths to absolute if needed + if not uploads_path.is_absolute(): + uploads_path = cls.BASE_DIR / uploads_path + if not outputs_path.is_absolute(): + outputs_path = cls.BASE_DIR / outputs_path + + # Set the resolved paths + cls.UPLOAD_FOLDER = uploads_path + cls.OUTPUT_FOLDER = outputs_path # Create directories cls.UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True) cls.OUTPUT_FOLDER.mkdir(parents=True, exist_ok=True) logger.info(f"Directories initialized - Upload: {cls.UPLOAD_FOLDER}, Output: {cls.OUTPUT_FOLDER}") + # For backward compatibility, also set the old-style paths + if output_dir: + logger.warning("output_dir parameter is deprecated, use gasflux.ini [paths] section instead") + @classmethod def update_directories_from_config(cls, config_path=None): - """Update directories based on config file.""" - if config_path and Path(config_path).exists(): - try: - with open(config_path, 'r', encoding='utf-8') as f: - config = yaml.safe_load(f) - output_dir = config.get('output_dir') - if output_dir: - cls.init_directories(output_dir) - logger.info(f"Directories updated from config: {config_path}") - except Exception as e: - logger.warning(f"Failed to update directories from config {config_path}: {e}") - else: - logger.info("Using default directory configuration") + """Update directories based on config file. (DEPRECATED: Use gasflux.ini instead)""" + logger.warning("update_directories_from_config is deprecated. Output directories are now configured via gasflux.ini [paths] section.") + # No longer reads output_dir from YAML config - directories are set from INI config in init_directories() @classmethod def get_log_level(cls): @@ -175,33 +189,9 @@ def log_performance(func): # Task status management # Task status constants and storage moved to shared.py -def update_task_status(task_id, status, message=None, results=None, error=None): - """Update task status in the global dictionary.""" - timestamp = time.time() - old_status = task_status.get(task_id, {}).get("status", "unknown") - - task_status[task_id] = { - "status": status, - "message": message, - "results": results, - "error": error, - "updated_at": timestamp - } - - # Log detailed status change with context - log_msg = f"Task {task_id} status changed: {old_status} -> {status}" - if message: - log_msg += f" | Message: {message}" - if results: - log_msg += f" | Results count: {len(results) if isinstance(results, list) else 'N/A'}" - if error: - log_msg += f" | Error: {error}" - - log_level = logging.ERROR if status == TASK_STATUS_FAILED else logging.INFO - logger.log(log_level, log_msg) - - # Update task statistics - stats_collector.record_task_status_change(old_status, status) +def update_task_status(task_id, status, message=None, results=None, error=None, output_dir=None): + """Update task status using the shared implementation (writes to SQLite).""" + return shared_update_task_status(task_id, status, message=message, results=results, error=error, output_dir=output_dir) # Statistics and Monitoring @@ -383,180 +373,215 @@ stats_collector = APIStatsCollector() def process_data_async(task_id, data_path, config_path, job_output_dir): """Background task to process data asynchronously.""" - logger.info(f"Job {task_id}: Background processing started for task {task_id}") - start_time = time.time() - - try: - update_task_status(task_id, TASK_STATUS_PROCESSING, "Starting data processing...") - - # 1. Load and override config FIRST - logger.info(f"Job {task_id}: Loading configuration from {config_path}") - config_start = time.time() + # 确保后台线程里有 Flask 应用上下文 + with app.app_context(): + logger.info(f"Job {task_id}: Background processing started for task {task_id}") + start_time = time.time() try: - with open(config_path, 'r') as f: - config = yaml.safe_load(f) - logger.info(f"Job {task_id}: Configuration loaded successfully with {len(config)} keys") - except Exception as e: - logger.error(f"Job {task_id}: Failed to load config from {config_path}: {str(e)}") - raise + update_task_status(task_id, TASK_STATUS_PROCESSING, "开始处理数据...") - # Update directories based on config output_dir - Config.update_directories_from_config(config_path) + # 1. Load and override config FIRST + logger.info(f"Job {task_id}: Loading configuration from {config_path}") + config_start = time.time() - # Sync app.config with updated directories - app.config['UPLOAD_FOLDER'] = Config.UPLOAD_FOLDER - app.config['OUTPUT_FOLDER'] = Config.OUTPUT_FOLDER + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + logger.info(f"Job {task_id}: Configuration loaded successfully with {len(config)} keys") + except Exception as e: + logger.error(f"Job {task_id}: Failed to load config from {config_path}: {str(e)}") + raise - # Update task status file path to new output directory - from .shared import set_task_status_file_path, load_task_status_from_file - set_task_status_file_path(Config.OUTPUT_FOLDER / "task_status.json") - # 立即从新路径加载现有状态,避免后续保存清空文件 - load_task_status_from_file() + # Update directories based on config output_dir + Config.update_directories_from_config(config_path) - # Update job directories to be under the correct config-based paths - from pathlib import Path - job_upload_dir = Path(Config.UPLOAD_FOLDER) / task_id - job_output_dir = Path(Config.OUTPUT_FOLDER) / task_id - job_upload_dir.mkdir(parents=True, exist_ok=True) - job_output_dir.mkdir(parents=True, exist_ok=True) + # Sync app.config with updated directories + app.config['UPLOAD_FOLDER'] = Config.UPLOAD_FOLDER + app.config['OUTPUT_FOLDER'] = Config.OUTPUT_FOLDER - # Move uploaded files to the correct config-based directories - try: - import shutil + # Task status persistence now uses SQLite only + # JSON persistence has been disabled + # from .shared import set_task_status_file_path, load_task_status_from_file + # set_task_status_file_path(Config.OUTPUT_FOLDER / "task_status.json") + # load_task_status_from_file() - # Move data file to correct uploads directory - if data_path.parent != job_upload_dir: - new_data_path = job_upload_dir / data_path.name - if data_path != new_data_path: - shutil.move(str(data_path), str(new_data_path)) - data_path = new_data_path - logger.info(f"Job {task_id}: Moved data file to {data_path}") + # Update job directories to be under the correct config-based paths + from pathlib import Path + job_upload_dir = Path(Config.UPLOAD_FOLDER) / task_id + job_output_dir = Path(Config.OUTPUT_FOLDER) / task_id + job_upload_dir.mkdir(parents=True, exist_ok=True) + job_output_dir.mkdir(parents=True, exist_ok=True) - # Move config file to correct uploads directory (if it's a custom config) - if config_path.parent != job_upload_dir and config_path.parent != Config.BASE_DIR: - new_config_path = job_upload_dir / config_path.name - if config_path != new_config_path: - shutil.move(str(config_path), str(new_config_path)) - config_path = new_config_path - logger.info(f"Job {task_id}: Moved config file to {config_path}") + # Trigger an update to save output_dir to database + update_task_status(task_id, TASK_STATUS_PROCESSING, "目录已就绪", output_dir=str(job_output_dir)) - except Exception as e: - logger.warning(f"Job {task_id}: Failed to move uploaded files to configured directories: {str(e)}") + # Move uploaded files to the correct config-based directories + try: + import shutil - logger.debug(f"Job {task_id}: Keeping original output directory: {config.get('output_dir', 'not set')}") - logger.debug(f"Job {task_id}: Updated directories - Upload: {Config.UPLOAD_FOLDER}, Output: {Config.OUTPUT_FOLDER}, Job output: {job_output_dir}") + # Move data file to correct uploads directory + if data_path.parent != job_upload_dir: + new_data_path = job_upload_dir / data_path.name + if data_path != new_data_path: + shutil.move(str(data_path), str(new_data_path)) + data_path = new_data_path + logger.info(f"Job {task_id}: Moved data file to {data_path}") - config_duration = time.time() - config_start - logger.info(f"Job {task_id}: Configuration processing completed in {config_duration:.3f}s") + # Move config file to correct uploads directory (if it's a custom config) + if config_path.parent != job_upload_dir and config_path.parent != Config.BASE_DIR: + new_config_path = job_upload_dir / config_path.name + if config_path != new_config_path: + shutil.move(str(config_path), str(new_config_path)) + config_path = new_config_path + logger.info(f"Job {task_id}: Moved config file to {config_path}") - update_task_status(task_id, TASK_STATUS_PROCESSING, "Configuration loaded, starting preprocessing...") + except Exception as e: + logger.warning(f"Job {task_id}: Failed to move uploaded files to configured directories: {str(e)}") - # 2. Data Preprocessing (files are already in correct directories) - logger.info(f"Job {task_id}: Starting preprocessing phase...") - preprocess_start = time.time() + logger.debug(f"Job {task_id}: Using INI configured output directory: {Config.OUTPUT_FOLDER}") + logger.debug(f"Job {task_id}: Updated directories - Upload: {Config.UPLOAD_FOLDER}, Output: {Config.OUTPUT_FOLDER}, Job output: {job_output_dir}") - processed_csv = data_path.parent / f"{data_path.stem}.processed.csv" - logger.debug(f"Job {task_id}: Input file: {data_path}, Output file: {processed_csv}") + config_duration = time.time() - config_start + logger.info(f"Job {task_id}: Configuration processing completed in {config_duration:.3f}s") - process_file(str(data_path), str(processed_csv), str(config_path)) + update_task_status(task_id, TASK_STATUS_PROCESSING, "配置已加载,开始预处理...") - preprocess_duration = time.time() - preprocess_start - logger.info(f"Job {task_id}: Preprocessing completed in {preprocess_duration:.3f}s") + # 2. Data Preprocessing (files are already in correct directories) + logger.info(f"Job {task_id}: Starting preprocessing phase...") + preprocess_start = time.time() - update_task_status(task_id, TASK_STATUS_PROCESSING, "Preprocessing completed, starting GasFlux analysis...") + processed_csv = data_path.parent / f"{data_path.stem}.processed.csv" + logger.debug(f"Job {task_id}: Input file: {data_path}, Output file: {processed_csv}") - # Write modified config to a temp file - final_config_path = data_path.parent / "final_config.yaml" - try: - with open(final_config_path, 'w') as f: - yaml.safe_dump(config, f) - logger.info(f"Job {task_id}: Final config written to {final_config_path}") - except Exception as e: - logger.error(f"Job {task_id}: Failed to write final config: {str(e)}") - raise + process_file(str(data_path), str(processed_csv), str(config_path)) - config_duration = time.time() - config_start - logger.info(f"Job {task_id}: Configuration processing completed in {config_duration:.3f}s") + preprocess_duration = time.time() - preprocess_start + logger.info(f"Job {task_id}: Preprocessing completed in {preprocess_duration:.3f}s") - update_task_status(task_id, TASK_STATUS_PROCESSING, "Configuration loaded, starting GasFlux analysis...") + update_task_status(task_id, TASK_STATUS_PROCESSING, "预处理完成,开始GasFlux分析...") - # 3. GasFlux Processing - logger.info(f"Job {task_id}: Starting GasFlux analysis...") - analysis_start = time.time() + # Write modified config to a temp file + final_config_path = data_path.parent / "final_config.yaml" + try: + with open(final_config_path, 'w') as f: + yaml.safe_dump(config, f) + logger.info(f"Job {task_id}: Final config written to {final_config_path}") + except Exception as e: + logger.error(f"Job {task_id}: Failed to write final config: {str(e)}") + raise - process_main(processed_csv, final_config_path, task_id) + config_duration = time.time() - config_start + logger.info(f"Job {task_id}: Configuration processing completed in {config_duration:.3f}s") - analysis_duration = time.time() - analysis_start - logger.info(f"Job {task_id}: GasFlux analysis completed in {analysis_duration:.3f}s") + update_task_status(task_id, TASK_STATUS_PROCESSING, "配置已加载,开始GasFlux分析...") - update_task_status(task_id, TASK_STATUS_PROCESSING, "GasFlux analysis completed, generating reports...") + # 3. GasFlux Processing + logger.info(f"Job {task_id}: Starting GasFlux analysis...") + analysis_start = time.time() - # Collect results and generate full URLs - logger.info(f"Job {task_id}: Collecting generated files from {job_output_dir}") - results_start = time.time() - results = [] + processor = process_main(processed_csv, final_config_path, job_output_dir, task_id) # 获取返回值 - try: - for f in job_output_dir.rglob("*"): - if f.is_file(): - rel_path = f.relative_to(app.config['OUTPUT_FOLDER']).as_posix() - file_size = f.stat().st_size - results.append({ - "name": f.name, - "rel_path": rel_path, - "download_url": f"/download/{rel_path}", # Relative URL that client can use - "size": file_size + analysis_duration = time.time() - analysis_start + logger.info(f"Job {task_id}: GasFlux analysis completed in {analysis_duration:.3f}s") + + # 提取krig_params数据(只保存关键数值) + krig_params_data = [] + if hasattr(processor, 'output_vars') and 'krig_parameters' in processor.output_vars: + for gas, params in processor.output_vars['krig_parameters'].items(): + # 只保存数值类型的数据,跳过数组 + clean_params = {} + for key, value in params.items(): + if isinstance(value, (int, float)): + clean_params[key] = value + elif hasattr(value, 'item') and hasattr(value, 'size'): + # numpy数组:只处理单元素数组 + if value.size == 1: + clean_params[key] = value.item() + # 多元素数组跳过,不保存 + elif hasattr(value, 'item'): + # 其他numpy对象尝试转换 + try: + clean_params[key] = value.item() + except ValueError: + # 转换失败则跳过 + continue + + krig_params_data.append({ + 'gas': gas, + 'krig_params': clean_params }) - logger.debug(f"Job {task_id}: Found output file: {f.name} ({file_size} bytes)") - results_duration = time.time() - results_start - logger.info(f"Job {task_id}: Results collection completed in {results_duration:.3f}s - {len(results)} files generated") + update_task_status(task_id, TASK_STATUS_PROCESSING, "GasFlux分析完成,正在生成报告...") - total_size = sum(r.get('size', 0) for r in results) - logger.info(f"Job {task_id}: Total output size: {total_size} bytes across {len(results)} files") + # Collect results and generate full URLs + logger.info(f"Job {task_id}: Collecting generated files from {job_output_dir}") + results_start = time.time() + results = [] + + # 先添加krig_params数据 + results.extend(krig_params_data) + + try: + for f in job_output_dir.rglob("*"): + if f.is_file(): + rel_path = f.relative_to(app.config['OUTPUT_FOLDER']).as_posix() + file_size = f.stat().st_size + results.append({ + "name": f.name, + "rel_path": rel_path, + "download_url": f"/download/{rel_path}", # Relative URL that client can use + "size": file_size + }) + logger.debug(f"Job {task_id}: Found output file: {f.name} ({file_size} bytes)") + + results_duration = time.time() - results_start + logger.info(f"Job {task_id}: Results collection completed in {results_duration:.3f}s - {len(results)} files generated") + + total_size = sum(r.get('size', 0) for r in results) + logger.info(f"Job {task_id}: Total output size: {total_size} bytes across {len(results)} files") + + except Exception as e: + logger.error(f"Job {task_id}: Failed to collect results: {str(e)}") + raise + + total_duration = time.time() - start_time + logger.info(f"Job {task_id}: Processing complete. Total duration: {total_duration:.3f}s, {len(results)} files generated.") + + # Record task completion time for statistics + stats_collector.record_task_completion_time(total_duration) + + update_task_status(task_id, TASK_STATUS_COMPLETED, "处理成功完成", results=results) except Exception as e: - logger.error(f"Job {task_id}: Failed to collect results: {str(e)}") - raise + total_duration = time.time() - start_time + logger.error(f"Job {task_id}: Processing failed after {total_duration:.3f}s - Error: {str(e)}", exc_info=True) - total_duration = time.time() - start_time - logger.info(f"Job {task_id}: Processing complete. Total duration: {total_duration:.3f}s, {len(results)} files generated.") + # Record failed task processing time for statistics + stats_collector.record_task_completion_time(total_duration) + logger.error(f"Job {task_id}: Failed task details - Data: {data_path}, Config: {config_path}, Output: {job_output_dir}") - # Record task completion time for statistics - stats_collector.record_task_completion_time(total_duration) + # Try to capture any partial results + partial_results = [] + try: + for f in job_output_dir.rglob("*"): + if f.is_file(): + rel_path = f.relative_to(app.config['OUTPUT_FOLDER']).as_posix() + partial_results.append({ + "name": f.name, + "rel_path": rel_path, + "download_url": f"/download/{rel_path}", # Relative URL that client can use + "size": f.stat().st_size, + "note": "partial_result" + }) + except Exception as collect_error: + logger.warning(f"Job {task_id}: Failed to collect partial results: {str(collect_error)}") - update_task_status(task_id, TASK_STATUS_COMPLETED, "Processing completed successfully", results=results) + error_msg = f"处理失败: {str(e)}" + if partial_results: + error_msg += f" (部分结果可用: {len(partial_results)} 个文件)" - except Exception as e: - total_duration = time.time() - start_time - logger.error(f"Job {task_id}: Processing failed after {total_duration:.3f}s - Error: {str(e)}", exc_info=True) - - # Record failed task processing time for statistics - stats_collector.record_task_completion_time(total_duration) - logger.error(f"Job {task_id}: Failed task details - Data: {data_path}, Config: {config_path}, Output: {job_output_dir}") - - # Try to capture any partial results - partial_results = [] - try: - for f in job_output_dir.rglob("*"): - if f.is_file(): - rel_path = f.relative_to(app.config['OUTPUT_FOLDER']).as_posix() - partial_results.append({ - "name": f.name, - "rel_path": rel_path, - "download_url": f"/download/{rel_path}", # Relative URL that client can use - "size": f.stat().st_size, - "note": "partial_result" - }) - except Exception as collect_error: - logger.warning(f"Job {task_id}: Failed to collect partial results: {str(collect_error)}") - - error_msg = f"Processing failed: {str(e)}" - if partial_results: - error_msg += f" (partial results available: {len(partial_results)} files)" - - update_task_status(task_id, TASK_STATUS_FAILED, error=error_msg, results=partial_results if partial_results else None) + update_task_status(task_id, TASK_STATUS_FAILED, error=error_msg, results=partial_results if partial_results else None) # Import GasFlux modules logger.info("Importing GasFlux modules...") @@ -632,10 +657,20 @@ Config.init_base_dir() # ALLOWED_DATA_EXTENSIONS and ALLOWED_CONFIG_EXTENSIONS moved to shared.py app.config['MAX_CONTENT_LENGTH'] = Config.MAX_CONTENT_LENGTH -# Don't set UPLOAD_FOLDER and OUTPUT_FOLDER here - they will be set dynamically per request -# Set defaults to avoid KeyError if any handler reads before config is applied -app.config.setdefault('UPLOAD_FOLDER', None) -app.config.setdefault('OUTPUT_FOLDER', None) +app.config['BASE_URL'] = Config.BASE_URL + +# Set upload and output folders from config +app.config['UPLOAD_FOLDER'] = Config.UPLOAD_FOLDER +app.config['OUTPUT_FOLDER'] = Config.OUTPUT_FOLDER + +# Database and persistence configuration +if Config.DB_PATH: + app.config['DB_PATH'] = Config.DB_PATH +if Config.TASK_PERSIST_BACKEND: + app.config['TASK_PERSIST_BACKEND'] = Config.TASK_PERSIST_BACKEND +app.config['JANITOR_DRY_RUN'] = str(Config.JANITOR_DRY_RUN).lower() +if Config.ADMIN_BOOTSTRAP_KEY: + app.config['ADMIN_BOOTSTRAP_KEY'] = Config.ADMIN_BOOTSTRAP_KEY # Log current configuration logger.info(f"Upload folder: {Config.UPLOAD_FOLDER}") @@ -673,6 +708,10 @@ def setup_directories(): # allowed_file moved to shared.py +# Initialize database and start background services +from .db import init_app as init_db +init_db(app) + # Import blueprints after app initialization to avoid circular imports from .blueprints.health import health_bp from .blueprints.upload import upload_bp @@ -683,6 +722,7 @@ from .blueprints.config import config_bp from .blueprints.reports import reports_bp from .blueprints.download import download_bp from .blueprints.web import web_bp +from .blueprints.api_keys import api_keys_bp # Register blueprints app.register_blueprint(health_bp) @@ -694,30 +734,18 @@ app.register_blueprint(config_bp) app.register_blueprint(reports_bp) app.register_blueprint(download_bp) app.register_blueprint(web_bp) +app.register_blueprint(api_keys_bp) -# Load persisted task status after app initialization +# Task status persistence now uses SQLite only +# JSON persistence has been disabled - functions removed from shared.py + +# Initialize janitor for background cleanup try: - from .shared import ( - load_task_status_from_file, - save_task_status_to_file, - set_task_status_file_path, - ) - - # 只有在 OUTPUT_FOLDER 有效时才启用持久化 - if hasattr(Config, 'OUTPUT_FOLDER') and Config.OUTPUT_FOLDER: - task_status_path = Config.OUTPUT_FOLDER / "task_status.json" - set_task_status_file_path(task_status_path) - logger.info(f"Task status persistence path set to: {task_status_path}") - with app.app_context(): - load_task_status_from_file() - - import atexit - def _save_on_exit(): - with app.app_context(): - save_task_status_to_file() - atexit.register(_save_on_exit) - else: - logger.info("Task status persistence will be configured after config is loaded") + from .janitor import start_janitor, reconcile_tasks_on_startup + with app.app_context(): + reconcile_tasks_on_startup() + # No longer need to load task status into memory + start_janitor(app) except Exception as e: print(f"⚠ Failed to setup task persistence: {e}") @@ -725,4 +753,4 @@ except Exception as e: if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000, debug=False) + app.run(host=Config.HOST, port=Config.PORT, debug=Config.DEBUG) diff --git a/src/gasflux/auth.py b/src/gasflux/auth.py new file mode 100644 index 0000000..1da19ab --- /dev/null +++ b/src/gasflux/auth.py @@ -0,0 +1,155 @@ +""" +Authentication Module +Handles API key authentication and management. +""" + +import hmac +import secrets +import hashlib +import os +from functools import wraps +from flask import request, abort, current_app, g +from .db import get_db + + +def hash_api_key(key: str, salt: str = None) -> tuple[str, str]: + """Hash API key with salt for secure storage.""" + if salt is None: + salt = secrets.token_hex(16) + + # Use PBKDF2 for key derivation + key_hash = hashlib.pbkdf2_hmac( + 'sha256', + key.encode('utf-8'), + salt.encode('utf-8'), + 100000 # High iteration count for security + ).hex() + + return key_hash, salt + + +def verify_api_key(provided_key: str, stored_hash: str, salt: str) -> bool: + """Verify provided API key against stored hash.""" + expected_hash, _ = hash_api_key(provided_key, salt) + return hmac.compare_digest(expected_hash, stored_hash) + + +def require_api_key(f): + """Decorator to require valid API key authentication.""" + @wraps(f) + def decorated_function(*args, **kwargs): + # Check for bootstrap admin key (for initial key creation) + bootstrap_key = os.environ.get('ADMIN_BOOTSTRAP_KEY') or current_app.config.get('ADMIN_BOOTSTRAP_KEY') + if bootstrap_key: + provided_bootstrap = request.headers.get('X-Admin-Bootstrap-Key') + if provided_bootstrap and hmac.compare_digest(provided_bootstrap, bootstrap_key): + return f(*args, **kwargs) + + # Check for regular API key + provided_key = request.headers.get('X-API-Key') + if not provided_key: + current_app.logger.warning(f"API key missing from request: {request.method} {request.path}") + abort(401, "API key required") + + # Query database for key + db = get_db() + row = db.execute( + "SELECT key_hash, salt, revoked, scopes FROM api_keys WHERE key_id = ?", + (provided_key,) + ).fetchone() + + if not row: + current_app.logger.warning(f"Unknown API key used: {request.method} {request.path}") + abort(401, "Invalid API key") + + if row['revoked']: + current_app.logger.warning(f"Revoked API key used: {request.method} {request.path}") + abort(401, "API key revoked") + + # Verify key + if not verify_api_key(provided_key, row['key_hash'], row['salt']): + current_app.logger.warning(f"Invalid API key hash: {request.method} {request.path}") + abort(401, "Invalid API key") + + # Update last used timestamp + db.execute( + "UPDATE api_keys SET last_used_at = datetime('now', '+8 hours') WHERE key_id = ?", + (provided_key,) + ) + db.commit() + + # Store key info in request context + g.api_key_id = provided_key + g.api_key_scopes = row['scopes'] or '*' + + return f(*args, **kwargs) + + return decorated_function + + +def create_api_key(description: str = "", scopes: str = "*") -> tuple[str, str]: + """Create a new API key.""" + key = secrets.token_urlsafe(32) # Generate secure random key + key_hash, salt = hash_api_key(key) + + db = get_db() + db.execute( + "INSERT INTO api_keys (key_id, key_hash, salt, scopes, description) VALUES (?, ?, ?, ?, ?)", + (key, key_hash, salt, scopes, description) + ) + db.commit() + + current_app.logger.info(f"Created new API key: {key[:8]}... (description: {description})") + return key, key # Return both hashed and plain versions (plain only once) + + +def revoke_api_key(key_id: str) -> bool: + """Revoke an API key.""" + db = get_db() + result = db.execute( + "UPDATE api_keys SET revoked = 1 WHERE key_id = ?", + (key_id,) + ) + db.commit() + + if result.rowcount > 0: + current_app.logger.info(f"Revoked API key: {key_id[:8]}...") + return True + return False + + +def list_api_keys() -> list[dict]: + """List all API keys (without sensitive data).""" + db = get_db() + rows = db.execute( + "SELECT key_id, scopes, description, created_at, last_used_at, revoked FROM api_keys ORDER BY created_at DESC" + ).fetchall() + + return [{ + 'key_id': row['key_id'], + 'scopes': row['scopes'], + 'description': row['description'], + 'created_at': row['created_at'], + 'last_used_at': row['last_used_at'], + 'revoked': bool(row['revoked']) + } for row in rows] + + +def get_api_key_info(key_id: str) -> dict | None: + """Get information about a specific API key.""" + db = get_db() + row = db.execute( + "SELECT key_id, scopes, description, created_at, last_used_at, revoked FROM api_keys WHERE key_id = ?", + (key_id,) + ).fetchone() + + if row: + return { + 'key_id': row['key_id'], + 'scopes': row['scopes'], + 'description': row['description'], + 'created_at': row['created_at'], + 'last_used_at': row['last_used_at'], + 'revoked': bool(row['revoked']) + } + return None \ No newline at end of file diff --git a/src/gasflux/blueprints/api_keys.py b/src/gasflux/blueprints/api_keys.py new file mode 100644 index 0000000..1932c63 --- /dev/null +++ b/src/gasflux/blueprints/api_keys.py @@ -0,0 +1,95 @@ +""" +API Keys Blueprint +Handles API key management endpoints. +""" + +from flask import Blueprint, request +from ..auth import create_api_key, revoke_api_key, list_api_keys, get_api_key_info +from ..shared import _format_response, log_performance, logger +from ..auth import require_api_key + +# Create blueprint +api_keys_bp = Blueprint('api_keys', __name__, url_prefix='/api-keys') + + +@api_keys_bp.route('', methods=['POST']) +@log_performance +def create_key(): + """Create a new API key.""" + logger.info("API key creation request") + + try: + data = request.get_json() + if not data: + return _format_response(400, "Request body must be JSON") + + description = data.get('description', '') + scopes = data.get('scopes', '*') + + key_id, plain_key = create_api_key(description=description, scopes=scopes) + + return _format_response(201, "API key created successfully", { + "key_id": key_id, + "key": plain_key, # Only returned once for security + "description": description, + "scopes": scopes + }) + + except Exception as e: + logger.error(f"Error creating API key: {str(e)}", exc_info=True) + return _format_response(500, "Internal server error") + + +@api_keys_bp.route('', methods=['GET']) +@log_performance +def list_keys(): + """List all API keys.""" + logger.debug("API keys list request") + + try: + keys = list_api_keys() + return _format_response(200, "API keys retrieved successfully", { + "keys": keys, + "total": len(keys) + }) + + except Exception as e: + logger.error(f"Error listing API keys: {str(e)}", exc_info=True) + return _format_response(500, "Internal server error") + + +@api_keys_bp.route('/', methods=['GET']) +@log_performance +def get_key(key_id): + """Get information about a specific API key.""" + logger.debug(f"API key info request for: {key_id}") + + try: + key_info = get_api_key_info(key_id) + if not key_info: + return _format_response(404, "API key not found") + + return _format_response(200, "API key information retrieved successfully", key_info) + + except Exception as e: + logger.error(f"Error retrieving API key info: {str(e)}", exc_info=True) + return _format_response(500, "Internal server error") + + +@api_keys_bp.route('/', methods=['DELETE']) +@log_performance +def revoke_key(key_id): + """Revoke an API key.""" + logger.info(f"API key revocation request for: {key_id}") + + try: + if revoke_api_key(key_id): + return _format_response(200, "API key revoked successfully", { + "key_id": key_id + }) + else: + return _format_response(404, "API key not found") + + except Exception as e: + logger.error(f"Error revoking API key: {str(e)}", exc_info=True) + return _format_response(500, "Internal server error") \ No newline at end of file diff --git a/src/gasflux/blueprints/download.py b/src/gasflux/blueprints/download.py index 47dc3f3..80ad0f2 100644 --- a/src/gasflux/blueprints/download.py +++ b/src/gasflux/blueprints/download.py @@ -7,6 +7,39 @@ from pathlib import Path from flask import Blueprint, send_file, current_app from ..shared import _format_response, log_performance, logger +from ..auth import require_api_key + + +def _mark_task_downloaded(task_id): + """Mark task as downloaded and schedule deletion in database.""" + import sqlite3 + from pathlib import Path + + # Use independent database connection (not from flask.g which may be closed) + from ..db import get_db_path as get_config_db_path + db_path = get_config_db_path(current_app) + + try: + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.execute("PRAGMA foreign_keys=ON") + conn.execute("PRAGMA busy_timeout=3000") + + # Update downloaded timestamp and set deletion time (10 minutes later) + conn.execute(""" + UPDATE tasks + SET downloaded_at = datetime('now', '+8 hours'), + delete_after_at = datetime('now', '+8 hours', '+10 minutes') + WHERE task_id = ? + """, (task_id,)) + + conn.commit() + logger.info(f"Task {task_id} marked as downloaded, scheduled for deletion in 10 minutes") + + except Exception as e: + logger.error(f"Failed to mark task {task_id} as downloaded: {str(e)}", exc_info=True) + finally: + if 'conn' in locals(): + conn.close() # Create blueprint download_bp = Blueprint('download', __name__, url_prefix='/download') @@ -20,6 +53,32 @@ def download_file(filename): logger.info(f"Download request for file: {filename} from IP {request.remote_addr}") + # Check API key from header or query parameter + provided_key = request.headers.get('X-API-Key') or request.args.get('api_key') + if not provided_key: + logger.warning(f"API key missing from request: POST /download/{filename}") + return _format_response(401, "API key required") + + # Validate API key + from ..auth import verify_api_key, get_db + db = get_db() + row = db.execute( + "SELECT key_hash, salt, revoked FROM api_keys WHERE key_id = ?", + (provided_key,) + ).fetchone() + + if not row: + logger.warning(f"Unknown API key used: POST /download/{filename}") + return _format_response(401, "Invalid API key") + + if row['revoked']: + logger.warning(f"Revoked API key used: POST /download/{filename}") + return _format_response(401, "API key revoked") + + if not verify_api_key(provided_key, row['key_hash'], row['salt']): + logger.warning(f"Invalid API key hash: POST /download/{filename}") + return _format_response(401, "Invalid API key") + try: # 支持两种路径格式: # 1. 绝对路径(以 / 开头,如 /full/path/to/file) @@ -27,6 +86,7 @@ def download_file(filename): if filename.startswith('/'): # 绝对路径 - 直接使用 file_path = Path(filename) + task_id = None # Can't determine task_id from absolute path else: # 相对路径 - 相对于 OUTPUT_FOLDER output_folder = Path(current_app.config.get('OUTPUT_FOLDER') or '') @@ -61,7 +121,15 @@ def download_file(filename): file_size = file_path.stat().st_size logger.info(f"Serving file: {filename} ({file_size} bytes)") - return send_file(file_path) + # Mark download immediately before sending file + if task_id: + try: + _mark_task_downloaded(task_id) + except Exception as e: + logger.error(f"Failed to mark download for task {task_id}: {str(e)}") + + response = send_file(file_path) + return response except Exception as e: logger.error(f"Error serving file {filename}: {str(e)}", exc_info=True) diff --git a/src/gasflux/blueprints/health.py b/src/gasflux/blueprints/health.py index d0213aa..0696ae1 100644 --- a/src/gasflux/blueprints/health.py +++ b/src/gasflux/blueprints/health.py @@ -9,7 +9,7 @@ import logging from flask import Blueprint, request from ..app import Config, stats_collector -from ..shared import task_status, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING +from ..shared import TASK_STATUS_PENDING, TASK_STATUS_PROCESSING from ..shared import _format_response, log_performance, logger # Create blueprint diff --git a/src/gasflux/blueprints/reports.py b/src/gasflux/blueprints/reports.py index e80201e..b3b8c7e 100644 --- a/src/gasflux/blueprints/reports.py +++ b/src/gasflux/blueprints/reports.py @@ -8,7 +8,7 @@ from pathlib import Path from flask import Blueprint, request, current_app -from ..shared import _get_file_type, _format_response, log_performance, logger, task_status +from ..shared import _get_file_type, _format_response, log_performance, logger from ..app import Config # Create blueprint @@ -26,16 +26,16 @@ def list_reports(): try: page = int(request.args.get('page', 1)) if page < 1: - return _format_response(400, "Invalid parameter: page must be >= 1") + return _format_response(400, "无效参数: 页码必须大于等于1") except (ValueError, TypeError): - return _format_response(400, "Invalid parameter: page must be a valid integer") + return _format_response(400, "无效参数: 页码必须是有效的整数") try: per_page = int(request.args.get('per_page', 20)) if per_page < 1 or per_page > 100: - return _format_response(400, "Invalid parameter: per_page must be between 1 and 100") + return _format_response(400, "无效参数: 每页数量必须在1-100之间") except (ValueError, TypeError): - return _format_response(400, "Invalid parameter: per_page must be a valid integer") + return _format_response(400, "无效参数: 每页数量必须是有效的整数") sort_by = request.args.get('sort_by', 'created_at') sort_order = request.args.get('sort_order', 'desc') @@ -44,14 +44,14 @@ def list_reports(): # Validate sort parameters valid_sort_fields = ['created_at', 'task_id', 'file_size', 'processing_time'] if sort_by not in valid_sort_fields: - return _format_response(400, f"Invalid parameter: sort_by must be one of {valid_sort_fields}") + return _format_response(400, f"无效参数: 排序字段必须是以下之一: {', '.join(valid_sort_fields)}") if sort_order not in ['asc', 'desc']: - return _format_response(400, "Invalid parameter: sort_order must be 'asc' or 'desc'") + return _format_response(400, "无效参数: 排序顺序必须是 'asc' 或 'desc'") # Validate status filter valid_statuses = ['completed', 'failed', None] if status_filter is not None and status_filter not in ['completed', 'failed']: - return _format_response(400, "Invalid parameter: status must be 'completed', 'failed', or not specified") + return _format_response(400, "无效参数: 状态必须是 'completed'、'failed' 或不指定") # 兼容缺省:优先 app.config,其次 Config.OUTPUT_FOLDER output_root = current_app.config.get('OUTPUT_FOLDER') or getattr(Config, 'OUTPUT_FOLDER', None) @@ -72,9 +72,10 @@ def list_reports(): continue task_id = task_dir.name - # Get task information from global task_status - task_info = task_status.get(task_id, {}) - task_status_value = task_info.get('status') + # Get task information from database + from ..shared import get_task_status + task_info = get_task_status(task_id) + task_status_value = task_info.get('status') if task_info.get('status') != 'not_found' else None # Log task status for debugging logger.debug(f"Task {task_id}: status from memory={task_status_value}, info={task_info}") diff --git a/src/gasflux/blueprints/stats.py b/src/gasflux/blueprints/stats.py index fc0d1a2..020fba3 100644 --- a/src/gasflux/blueprints/stats.py +++ b/src/gasflux/blueprints/stats.py @@ -1,5 +1,5 @@ """ -Statistics Blueprint +Statistics Blueprintoutput_dir Provides API statistics and monitoring endpoints. """ @@ -7,7 +7,7 @@ import time from flask import Blueprint, current_app -from ..shared import _format_response, log_performance, logger,stats_collector, task_status +from ..shared import _format_response, log_performance, logger, stats_collector # Create blueprint stats_bp = Blueprint('stats', __name__, url_prefix='/stats') @@ -46,17 +46,33 @@ def get_stats(): logger.warning(f"Failed to collect system metrics: {e}") stats_data['system'] = {'error': str(e)} - # Add recent task information + # Add recent task information from database recent_tasks = [] - current_time = time.time() - for task_id, task_info in list(task_status.items())[-20:]: # Last 20 tasks - age = current_time - task_info.get('updated_at', 0) - recent_tasks.append({ - 'task_id': task_id, - 'status': task_info.get('status'), - 'age_seconds': round(age, 1), - 'message': task_info.get('message', '')[:100] # Truncate long messages - }) + try: + from ..db import get_db + db = get_db() + current_time = time.time() + + rows = db.execute(""" + SELECT task_id, status, message, updated_at + FROM tasks + WHERE deleted_at IS NULL + ORDER BY updated_at DESC + LIMIT 20 + """).fetchall() + + for row in rows: + updated_at = row[3] if row[3] else 0 + age = current_time - updated_at + recent_tasks.append({ + 'task_id': row[0], # task_id + 'status': row[1], # status + 'age_seconds': round(age, 1), + 'message': (row[2] or '')[:100] # message, truncate long messages + }) + except Exception as e: + logger.warning(f"Failed to get recent tasks from database: {e}") + recent_tasks = [] stats_data['recent_tasks'] = recent_tasks diff --git a/src/gasflux/blueprints/task_pool.py b/src/gasflux/blueprints/task_pool.py index 7c39e4b..69c6de9 100644 --- a/src/gasflux/blueprints/task_pool.py +++ b/src/gasflux/blueprints/task_pool.py @@ -12,17 +12,39 @@ from ..shared import ( _format_response, log_performance, logger, - task_status, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING, TASK_STATUS_COMPLETED, TASK_STATUS_FAILED ) +from ..auth import require_api_key # Create blueprint task_pool_bp = Blueprint('task_pool', __name__, url_prefix='/tasks') +def _localize_message(msg): + """Translate known English status messages to Chinese for consistent frontend display.""" + if not msg: + return msg + + replacements = { + "Task queued for processing": "任务已加入处理队列", + "Starting data processing...": "开始处理数据...", + "Configuration loaded, starting preprocessing...": "配置已加载,开始预处理...", + "Preprocessing completed, starting GasFlux analysis...": "预处理完成,开始GasFlux分析...", + "Configuration loaded, starting GasFlux analysis...": "配置已加载,开始GasFlux分析...", + "GasFlux analysis completed, generating reports...": "GasFlux分析完成,正在生成报告...", + "Processing completed successfully": "处理成功完成", + "Processing failed:": "处理失败:", + } + + for eng, zh in replacements.items(): + if msg.startswith(eng): + return msg.replace(eng, zh, 1) + return msg + + def _build_simple_downloads_from_results(results: list[dict]) -> dict: """ Build direct download shortcuts for common files, based on task results. @@ -71,21 +93,43 @@ def _lean_task_summary(task_summary: dict) -> dict: lean = { 'task_id': task_id, 'status': status, - 'message': task_summary.get('message'), + 'message': _localize_message(task_summary.get('message')), 'updated_at': task_summary.get('updated_at'), } if status == TASK_STATUS_COMPLETED and task_id: - full_task_info = task_status.get(task_id, {}) + from ..shared import get_task_status + full_task_info = get_task_status(task_id) results = full_task_info.get('results', []) or [] + + # Extract volume from results + volume = None + for result in results: + if isinstance(result, dict) and 'krig_params' in result: + krig_params = result.get('krig_params', {}) + if isinstance(krig_params, dict) and 'volume' in krig_params: + volume = krig_params['volume'] + break + + # Add volume to response if found + if volume is not None: + lean['volume'] = float(volume) # Ensure it's a number + downloads = _build_simple_downloads_from_results(results) if downloads: lean['downloads'] = downloads + elif status == TASK_STATUS_FAILED and task_id: + from ..shared import get_task_status + full_task_info = get_task_status(task_id) + error = full_task_info.get('error') + if error: + lean['error'] = error return lean @task_pool_bp.route('', methods=['GET']) +@require_api_key @log_performance def list_tasks(): """Get paginated list of tasks with optional filtering.""" @@ -143,6 +187,7 @@ def list_tasks(): @task_pool_bp.route('/stats', methods=['GET']) +@require_api_key @log_performance def get_pool_stats(): """Get task pool statistics.""" @@ -162,6 +207,7 @@ def get_pool_stats(): @task_pool_bp.route('/active', methods=['GET']) +@require_api_key @log_performance def get_active_tasks(): """Get list of currently active (processing) tasks.""" @@ -195,6 +241,7 @@ def get_active_tasks(): @task_pool_bp.route('/queue', methods=['GET']) +@require_api_key @log_performance def get_queued_tasks(): """Get list of queued (pending) tasks.""" diff --git a/src/gasflux/blueprints/tasks.py b/src/gasflux/blueprints/tasks.py index 946a6be..379c649 100644 --- a/src/gasflux/blueprints/tasks.py +++ b/src/gasflux/blueprints/tasks.py @@ -12,28 +12,28 @@ from ..shared import ( _format_response, log_performance, logger, - task_status, _build_simple_downloads_from_results, TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, TASK_STATUS_PROCESSING, TASK_STATUS_PENDING, ) +from ..auth import require_api_key # Create blueprint tasks_bp = Blueprint('tasks', __name__, url_prefix='/task') @tasks_bp.route('/', methods=['GET']) +@require_api_key @log_performance def get_task_status_endpoint(task_id): """Get the status of a processing task.""" logger.debug(f"Status request for task {task_id}") try: - # Note: cleanup_old_tasks() is disabled for individual task queries - # to preserve historical task data for task pool management - # cleanup_old_tasks() + # Note: cleanup_old_tasks() is enabled to prevent memory leaks from old tasks + cleanup_old_tasks() task_info = get_task_status(task_id) if task_info.get("status") == "not_found": @@ -72,6 +72,7 @@ def get_task_status_endpoint(task_id): @tasks_bp.route('/', methods=['PUT']) +@require_api_key @log_performance def update_task(task_id): """Update task status and information.""" @@ -125,10 +126,6 @@ def update_task(task_id): # Only update message update_task_status(task_id, current_status, message) - # Update priority if provided - if 'priority' in updates: - task_status[task_id]['priority'] = updates['priority'] - # Get updated task info updated_task = get_task_status(task_id) @@ -152,6 +149,7 @@ def update_task(task_id): @tasks_bp.route('/', methods=['DELETE']) +@require_api_key @log_performance def delete_task(task_id): """Delete a task and its associated files.""" @@ -197,10 +195,7 @@ def delete_task(task_id): logger.error(f"Error deleting task folder {task_folder}: {str(e)}") return _format_response(500, f"删除任务文件失败: {str(e)}") - # Remove from task status tracking - if task_id in task_status: - del task_status[task_id] - logger.info(f"Removed task {task_id} from status tracking") + # Task status is now managed by database only data = { "task_id": task_id, diff --git a/src/gasflux/blueprints/upload.py b/src/gasflux/blueprints/upload.py index 5749c8b..1057c38 100644 --- a/src/gasflux/blueprints/upload.py +++ b/src/gasflux/blueprints/upload.py @@ -13,13 +13,15 @@ from io import BytesIO from ..app import process_data_async -from ..shared import _format_response, log_performance, logger, ALLOWED_DATA_EXTENSIONS, ALLOWED_CONFIG_EXTENSIONS, allowed_file,update_task_status, TASK_STATUS_PENDING, TASK_STATUS_FAILED +from ..shared import _format_response, log_performance, logger, ALLOWED_DATA_EXTENSIONS, ALLOWED_CONFIG_EXTENSIONS, allowed_file, update_task_status, TASK_STATUS_PENDING, TASK_STATUS_FAILED +from ..auth import require_api_key # Create blueprint upload_bp = Blueprint('upload', __name__, url_prefix='/upload') @upload_bp.route('', methods=['POST']) +@require_api_key @log_performance def upload_file(): logger.info("Received upload request") @@ -49,8 +51,8 @@ def upload_file(): return _format_response(400, "无效的数据文件类型。只允许 .xlsx 和 .xls 格式。") # Generate unique job ID - job_id = str(uuid.uuid4()) - logger.info(f"Generated job ID: {job_id}") + task_id = str(uuid.uuid4()) + logger.info(f"Generated job ID: {task_id}") # 1) Parse config content (parse in memory without saving first) if config_file and config_file.filename != '': @@ -69,13 +71,14 @@ def upload_file(): with open(default_config_path, 'r', encoding='utf-8') as f: active_config = yaml.safe_load(f) - # 2) Create job directories based on config['output_dir'] - output_base = Path(active_config['output_dir']).expanduser() - job_upload_dir = output_base / "uploads" / job_id - job_output_dir = output_base / "outputs" / job_id + # 2) Create job directories based on INI configuration + upload_base = Path(current_app.config['UPLOAD_FOLDER']) + output_base = Path(current_app.config['OUTPUT_FOLDER']) + job_upload_dir = upload_base / task_id + job_output_dir = output_base / task_id job_upload_dir.mkdir(parents=True, exist_ok=True) job_output_dir.mkdir(parents=True, exist_ok=True) - logger.info(f"Job {job_id}: Created directories - Upload: {job_upload_dir}, Output: {job_output_dir}") + logger.info(f"Job {task_id}: Created directories - Upload: {job_upload_dir}, Output: {job_output_dir}") # 3) Save data file to job_upload_dir data_filename = secure_filename(data_file.filename) @@ -83,9 +86,9 @@ def upload_file(): try: data_file.seek(0) data_file.save(str(data_path)) - logger.info(f"Job {job_id}: Data file saved successfully - Path: {data_path}") + logger.info(f"Job {task_id}: Data file saved successfully - Path: {data_path}") except Exception as e: - logger.error(f"Job {job_id}: Failed to save data file {data_filename}: {str(e)}") + logger.error(f"Job {task_id}: Failed to save data file {data_filename}: {str(e)}") return _format_response(500, "保存数据文件失败") # 4) Save config file to job_upload_dir @@ -96,9 +99,9 @@ def upload_file(): config_file.seek(0) config_file.save(str(config_path)) active_config_path = config_path - logger.info(f"Job {job_id}: Custom config saved successfully - Path: {config_path}") + logger.info(f"Job {task_id}: Custom config saved successfully - Path: {config_path}") except Exception as e: - logger.error(f"Job {job_id}: Failed to save config file {config_filename}: {str(e)}") + logger.error(f"Job {task_id}: Failed to save config file {config_filename}: {str(e)}") return _format_response(500, "保存配置文件失败") else: # Copy default config for record keeping @@ -106,29 +109,29 @@ def upload_file(): with open(config_path, 'w', encoding='utf-8') as f: yaml.safe_dump(active_config, f, allow_unicode=True) active_config_path = config_path - logger.info(f"Job {job_id}: Default config saved for record - Path: {config_path}") + logger.info(f"Job {task_id}: Default config saved for record - Path: {config_path}") # Initialize task status - update_task_status(job_id, TASK_STATUS_PENDING, "Task queued for processing") - logger.info(f"Job {job_id}: Task status initialized as PENDING") + update_task_status(task_id, TASK_STATUS_PENDING, "任务已加入处理队列", output_dir=str(job_output_dir)) + logger.info(f"Job {task_id}: Task status initialized as PENDING") # Start background processing try: thread = threading.Thread( target=process_data_async, - args=(job_id, data_path, active_config_path, job_output_dir) + args=(task_id, data_path, active_config_path, job_output_dir) ) thread.daemon = True thread.start() - logger.info(f"Job {job_id}: Background processing thread started successfully") + logger.info(f"Job {task_id}: Background processing thread started successfully") except Exception as e: - logger.error(f"Job {job_id}: Failed to start background processing thread: {str(e)}") - update_task_status(job_id, TASK_STATUS_FAILED, error=str(e)) + logger.error(f"Job {task_id}: Failed to start background processing thread: {str(e)}") + update_task_status(task_id, TASK_STATUS_FAILED, error=str(e)) return _format_response(500, "启动处理失败") - logger.info(f"Job {job_id}: Upload process completed successfully, returning job ID to client") + logger.info(f"Job {task_id}: Upload process completed successfully, returning job ID to client") return _format_response(202, "任务已接受并加入处理队列", { "status": "accepted", - "job_id": job_id, - "task_status_url": f"/task/{job_id}" + "task_id": task_id, + "task_status_url": f"/task/{task_id}" }) \ No newline at end of file diff --git a/src/gasflux/blueprints/web.py b/src/gasflux/blueprints/web.py index 18dcc6d..f792b16 100644 --- a/src/gasflux/blueprints/web.py +++ b/src/gasflux/blueprints/web.py @@ -78,6 +78,11 @@ def index():

新建处理任务

+
+ + + 首次使用请先运行 python create_api_key.py 创建密钥 +
@@ -104,10 +109,10 @@ def index(): {% for report in reports %}
- {{ report.split('/')[-1] }} + {{ report.split('/')[-1] }} 任务 ID: {{ report.split('/')[0] }}
- 下载 + 下载
{% else %}

暂无已生成的报告。

@@ -117,12 +122,14 @@ def index():

API 调用指南 (开发者)

+

⚠️ 重要提醒: 所有 API 接口都需要 API Key 认证。请先运行 python create_api_key.py 创建密钥。

+

认证方式: 在请求头中添加 X-API-Key: <your_key>

健康检查: GET /health

上传分析: POST /upload

查询任务状态: GET /task/<task_id>

参数: file (Excel), config (YAML, 可选)

-

示例: curl -X POST -F "file=@data.xlsx" http://localhost:5000/upload

-

状态查询: curl http://localhost:5000/task/your-task-id

+

示例: curl -X POST -H "X-API-Key: your_key" -F "file=@data.xlsx" http://localhost:5000/upload

+

状态查询: curl -H "X-API-Key: your_key" http://localhost:5000/task/your-task-id

diff --git a/src/gasflux/cli.py b/src/gasflux/cli.py index f82fc7c..bf4f1e0 100644 --- a/src/gasflux/cli.py +++ b/src/gasflux/cli.py @@ -27,7 +27,8 @@ def process_command(data_path: str, config_path: str, test: bool): if test: data_file = Path(__file__).parent / "testdata" / "testdata.csv" config_file = Path(__file__).parent / "testdata" / "testconfig.yaml" - process_main(data_file, config_file) + output_dir = Path(__file__).parent / "testdata" / "output" + process_main(data_file, config_file, output_dir) return dpath_obj = Path(data_path) @@ -65,7 +66,9 @@ def process_command(data_path: str, config_path: str, test: bool): config_file = Path(config_path) for data_file in data_files: - process_main(data_file, config_file) + # 为每个数据文件创建对应的输出目录 + output_dir = data_file.parent / f"{data_file.stem}_output" + process_main(data_file, config_file, output_dir) def generate_config_command(config_destination: str, recursive: bool = False, template_path: str | None = None): diff --git a/src/gasflux/config_reader.py b/src/gasflux/config_reader.py new file mode 100644 index 0000000..086352a --- /dev/null +++ b/src/gasflux/config_reader.py @@ -0,0 +1,236 @@ +""" +Configuration Reader Module +Reads configuration from INI files with fallback to environment variables. +""" + +import os +import configparser +from pathlib import Path +from typing import Dict, Any, Optional + + +class ConfigReader: + """Reads configuration from INI file with environment variable fallbacks.""" + + def __init__(self, config_file: Optional[str] = None): + self.config = configparser.ConfigParser() + + # Set default values + self._set_defaults() + + # Try to read config file + if config_file: + self.load_config_file(config_file) + else: + # Try default config files + self._try_load_default_configs() + + def _set_defaults(self): + """Set default configuration values.""" + # Server defaults + self.config.add_section('server') + self.config.set('server', 'host', os.getenv('GASFLUX_HOST', '0.0.0.0')) + self.config.set('server', 'port', os.getenv('GASFLUX_PORT', '5000')) + self.config.set('server', 'debug', os.getenv('GASFLUX_DEBUG', 'false')) + self.config.set('server', 'base_url', os.getenv('GASFLUX_BASE_URL', 'http://localhost:5000')) + + # Paths defaults + self.config.add_section('paths') + self.config.set('paths', 'uploads', os.getenv('GASFLUX_UPLOAD_FOLDER', './web_api_data/uploads')) + self.config.set('paths', 'outputs', os.getenv('GASFLUX_OUTPUT_FOLDER', './web_api_data/outputs')) + + # Limits defaults + self.config.add_section('limits') + self.config.set('limits', 'max_content_length', os.getenv('GASFLUX_MAX_CONTENT_LENGTH', '104857600')) # 100MB + + # Logging defaults + self.config.add_section('logging') + self.config.set('logging', 'level', os.getenv('GASFLUX_LOG_LEVEL', 'INFO')) + self.config.set('logging', 'file', os.getenv('GASFLUX_LOG_FILE', 'logs/gasflux_api.log')) + + # Security defaults + self.config.add_section('security') + self.config.set('security', 'admin_bootstrap_key', os.getenv('ADMIN_BOOTSTRAP_KEY', '')) + + # Cleanup defaults + self.config.add_section('cleanup') + self.config.set('cleanup', 'task_cleanup_interval', os.getenv('GASFLUX_TASK_CLEANUP_INTERVAL', '3600')) + self.config.set('cleanup', 'max_task_age', os.getenv('GASFLUX_MAX_TASK_AGE', '86400')) + self.config.set('cleanup', 'janitor_dry_run', os.getenv('JANITOR_DRY_RUN', 'false')) + + # Performance defaults + self.config.add_section('performance') + self.config.set('performance', 'threads', os.getenv('GASFLUX_THREADS', '8')) + self.config.set('performance', 'connection_limit', os.getenv('GASFLUX_CONNECTION_LIMIT', '100')) + self.config.set('performance', 'channel_timeout', os.getenv('GASFLUX_CHANNEL_TIMEOUT', '300')) + + # Database defaults + self.config.add_section('database') + self.config.set('database', 'path', os.getenv('DB_PATH', '')) + + # API Keys defaults + self.config.add_section('api_keys') + self.config.set('api_keys', 'persist_backend', os.getenv('TASK_PERSIST_BACKEND', 'sqlite')) + + def load_config_file(self, config_file: str): + """Load configuration from INI file.""" + config_path = Path(config_file) + if config_path.exists(): + try: + with open(config_path, 'r', encoding='utf-8') as f: + self.config.read_file(f) + print(f"Configuration loaded from: {config_path.absolute()}") + except Exception as e: + print(f"Warning: Failed to load config file {config_path}: {e}") + else: + print(f"Warning: Config file not found: {config_path}") + + def _try_load_default_configs(self): + """Try to load configuration from default locations.""" + # Try gasflux.ini in current directory + current_dir = Path.cwd() + config_files = [ + current_dir / 'gasflux.ini', + current_dir / 'config.ini', + Path(__file__).parent.parent.parent / 'gasflux.ini', # Project root + ] + + for config_file in config_files: + if config_file.exists(): + self.load_config_file(str(config_file)) + break + + def get(self, section: str, key: str, fallback: str = '') -> str: + """Get configuration value.""" + try: + return self.config.get(section, key) + except (configparser.NoSectionError, configparser.NoOptionError): + return fallback + + def getint(self, section: str, key: str, fallback: int = 0) -> int: + """Get configuration value as integer.""" + try: + return self.config.getint(section, key) + except (configparser.NoSectionError, configparser.NoOptionError, ValueError): + try: + return int(fallback) + except ValueError: + return 0 + + def getboolean(self, section: str, key: str, fallback: bool = False) -> bool: + """Get configuration value as boolean.""" + try: + return self.config.getboolean(section, key) + except (configparser.NoSectionError, configparser.NoOptionError, ValueError): + return fallback + + def get_path(self, section: str, key: str, fallback: str = '', base_dir: Optional[Path] = None) -> Path: + """Get configuration value as Path, resolving relative to base_dir if provided.""" + path_str = self.get(section, key, fallback) + path = Path(path_str) + + if not path.is_absolute() and base_dir: + path = base_dir / path + + return path.expanduser().resolve() + + def to_dict(self) -> Dict[str, Dict[str, Any]]: + """Convert configuration to dictionary.""" + result = {} + for section in self.config.sections(): + result[section] = {} + for key, value in self.config.items(section): + result[section][key] = value + return result + + # Convenience methods for common config values + @property + def host(self) -> str: + return self.get('server', 'host', '0.0.0.0') + + @property + def port(self) -> int: + return self.getint('server', 'port', 5000) + + @property + def debug(self) -> bool: + return self.getboolean('server', 'debug', False) + + @property + def base_url(self) -> str: + return self.get('server', 'base_url', 'http://localhost:5000') + + @property + def uploads_path(self) -> Path: + return self.get_path('paths', 'uploads', './web_api_data/uploads') + + @property + def outputs_path(self) -> Path: + return self.get_path('paths', 'outputs', './web_api_data/outputs') + + @property + def max_content_length(self) -> int: + return self.getint('limits', 'max_content_length', 104857600) + + @property + def log_level(self) -> str: + return self.get('logging', 'level', 'INFO') + + @property + def log_file(self) -> str: + return self.get('logging', 'file', 'logs/gasflux_api.log') + + @property + def admin_bootstrap_key(self) -> str: + return self.get('security', 'admin_bootstrap_key', '') + + @property + def task_cleanup_interval(self) -> int: + return self.getint('cleanup', 'task_cleanup_interval', 3600) + + @property + def max_task_age(self) -> int: + return self.getint('cleanup', 'max_task_age', 86400) + + @property + def janitor_dry_run(self) -> bool: + return self.getboolean('cleanup', 'janitor_dry_run', False) + + @property + def threads(self) -> int: + return self.getint('performance', 'threads', 8) + + @property + def connection_limit(self) -> int: + return self.getint('performance', 'connection_limit', 100) + + @property + def channel_timeout(self) -> int: + return self.getint('performance', 'channel_timeout', 300) + + @property + def db_path(self) -> str: + return self.get('database', 'path', '') + + @property + def persist_backend(self) -> str: + return self.get('api_keys', 'persist_backend', 'sqlite') + + def _get_config_file_path(self) -> Optional[str]: + """Get the path of the loaded config file.""" + # Try to find which config file was actually loaded + current_dir = Path.cwd() + config_files = [ + current_dir / 'gasflux.ini', + current_dir / 'config.ini', + Path(__file__).parent.parent.parent / 'gasflux.ini', + ] + + for config_file in config_files: + if config_file.exists(): + return str(config_file) + return None + + +# Global config instance +config_reader = ConfigReader() \ No newline at end of file diff --git a/src/gasflux/data_processor.py b/src/gasflux/data_processor.py index 528b52f..4b46387 100644 --- a/src/gasflux/data_processor.py +++ b/src/gasflux/data_processor.py @@ -251,7 +251,7 @@ def convert_coordinates(df): return df -def calculate_pressure(df, max_samples=None, height_tolerance=10.0, height_bin_size=2.0): +def calculate_pressure(df, max_samples=None, height_tolerance=10.0, height_bin_size=10.0): """ 计算气压数据(高度分档优化版) @@ -546,23 +546,25 @@ def calculate_pressure(df, max_samples=None, height_tolerance=10.0, height_bin_s def adjust_altitude(df): """ - 调整融合高程(减去最小值) + 创建调整后的高度字段(减去最小值) Args: df: 输入DataFrame Returns: - pd.DataFrame: 调整后的DataFrame + pd.DataFrame: 添加调整后高度字段的DataFrame """ - print("调整融合高程...") + print("创建调整后的高度字段...") if 'fAltitudeFused' in df.columns: min_altitude = df['fAltitudeFused'].min() print(".2f") + # 创建新的调整后高度字段,而不是修改原始字段 + df['height_ato'] = df['fAltitudeFused'] - min_altitude + original_alt = df['fAltitudeFused'].head(3).tolist() - df['fAltitudeFused'] = df['fAltitudeFused'] - min_altitude - adjusted_alt = df['fAltitudeFused'].head(3).tolist() + adjusted_alt = df['height_ato'].head(3).tolist() print("高度调整示例:") for orig, adj in zip(original_alt, adjusted_alt): @@ -644,7 +646,7 @@ def rename_columns(df): 'timestamp': 'timestamp', # 时间戳(已创建) 'stGPSPositionX': 'longitude', # 经度 → longitude 'stGPSPositionY': 'latitude', # 纬度 → latitude - 'fAltitudeFused': 'height_ato', # 融合高程 → height_ato + # 'fAltitudeFused': 'height_ato', # 高度字段已在adjust_altitude中处理 'fFixedWindDirection': 'winddir', # 修正风向 → winddir 'fFixedWindSpeed': 'windspeed', # 修正风速 → windspeed 'fWindTemperature': 'temperature', # 风温 → temperature diff --git a/src/gasflux/db.py b/src/gasflux/db.py new file mode 100644 index 0000000..ccf581a --- /dev/null +++ b/src/gasflux/db.py @@ -0,0 +1,156 @@ +""" +Database Module +Handles SQLite database initialization and connection management. +""" + +import sqlite3 +import os +from flask import g, current_app +from pathlib import Path + +# Set timezone to Beijing (UTC+8) for SQLite +os.environ['TZ'] = 'Asia/Shanghai' +try: + import time + time.tzset() # Update timezone if possible +except AttributeError: + # time.tzset() not available on Windows + # Try alternative approach for Windows + try: + # On Windows, we can try to set the timezone via different methods + import locale + locale.setlocale(locale.LC_TIME, 'Chinese_China.936') + except: + pass + + +def init_db(app): + """Initialize database with tables and pragmas.""" + db_path = get_db_path(app) + + # Ensure directory exists + db_path.parent.mkdir(parents=True, exist_ok=True) + + # Create connection for initialization + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + + # Set SQLite pragmas for performance and reliability + conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging for concurrent reads/writes + conn.execute("PRAGMA synchronous=NORMAL") # Balance between performance and safety + conn.execute("PRAGMA foreign_keys=ON") # Enable foreign key constraints + conn.execute("PRAGMA busy_timeout=3000") # Wait up to 3 seconds if DB is locked + + # Create tables + conn.executescript(""" + -- Tasks table for storing task metadata and lifecycle + CREATE TABLE IF NOT EXISTS tasks ( + task_id TEXT PRIMARY KEY, + status TEXT NOT NULL DEFAULT 'pending', + message TEXT, + error TEXT, + created_at TIMESTAMP DEFAULT (datetime('now', '+8 hours')), + started_at TIMESTAMP, + finished_at TIMESTAMP, + downloaded_at TIMESTAMP, + delete_after_at TIMESTAMP, + deleted_at TIMESTAMP, + output_dir TEXT, + CHECK (status IN ('pending', 'processing', 'completed', 'failed')) + ); + + -- Note: results column will be added dynamically below + + -- API Keys table for authentication + CREATE TABLE IF NOT EXISTS api_keys ( + key_id TEXT PRIMARY KEY, + key_hash TEXT NOT NULL, + salt TEXT NOT NULL, + revoked INTEGER DEFAULT 0, + scopes TEXT DEFAULT '*', + created_at TIMESTAMP DEFAULT (datetime('now', '+8 hours')), + last_used_at TIMESTAMP, + description TEXT + ); + + -- Optional: Download tokens for one-time downloads + CREATE TABLE IF NOT EXISTS download_tokens ( + token TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + expires_at TIMESTAMP NOT NULL, + created_at TIMESTAMP DEFAULT (datetime('now', '+8 hours')), + FOREIGN KEY(task_id) REFERENCES tasks(task_id) ON DELETE CASCADE + ); + + -- Index for efficient cleanup queries + CREATE INDEX IF NOT EXISTS idx_tasks_delete_after + ON tasks(delete_after_at) + WHERE delete_after_at IS NOT NULL AND deleted_at IS NULL; + + -- Index for status filtering + CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); + + -- Index for download token expiration + CREATE INDEX IF NOT EXISTS idx_download_tokens_expires + ON download_tokens(expires_at); + """) + + # Check if results column exists and add it if not (for database migration) + try: + # Check if results column exists + cursor = conn.execute("PRAGMA table_info(tasks)") + columns = [row[1] for row in cursor.fetchall()] + + if 'results' not in columns: + conn.execute("ALTER TABLE tasks ADD COLUMN results TEXT") + app.logger.info("Added results column to tasks table") + except Exception as e: + app.logger.warning(f"Could not add results column (might already exist): {e}") + + conn.commit() + conn.close() + + app.logger.info(f"Database initialized at: {db_path}") + + +def get_db_path(app): + """Get database file path from app config or default location.""" + db_path_config = app.config.get('DB_PATH') + if db_path_config: + return Path(db_path_config).expanduser() + + # Default to output folder if available + output_folder = app.config.get('OUTPUT_FOLDER') + if output_folder: + return Path(output_folder) / "gasflux.db" + + # Fallback to instance folder + instance_path = Path(app.instance_path) if hasattr(app, 'instance_path') else Path('.') + return instance_path / "gasflux.db" + + +def get_db(): + """Get database connection for current request.""" + if 'db' not in g: + db_path = get_db_path(current_app) + g.db = sqlite3.connect(str(db_path), check_same_thread=False) + g.db.row_factory = sqlite3.Row + + # Set pragmas on each connection (safe to do multiple times) + g.db.execute("PRAGMA foreign_keys=ON") + g.db.execute("PRAGMA busy_timeout=3000") + + return g.db + + +def close_db(e=None): + """Close database connection at end of request.""" + db = g.pop('db', None) + if db is not None: + db.close() + + +def init_app(app): + """Register database functions with Flask app.""" + init_db(app) + app.teardown_appcontext(close_db) \ No newline at end of file diff --git a/src/gasflux/janitor.py b/src/gasflux/janitor.py new file mode 100644 index 0000000..972150b --- /dev/null +++ b/src/gasflux/janitor.py @@ -0,0 +1,209 @@ +""" +Janitor Module +Handles background cleanup of expired tasks and their output directories. +""" + +import threading +import time +import shutil +from pathlib import Path +import sqlite3 +from flask import current_app + + +def start_janitor(app): + """Start the background janitor thread for cleaning up expired tasks.""" + def worker(): + with app.app_context(): + while True: + try: + cleanup_expired_tasks() + except Exception as e: + app.logger.error(f"Janitor cleanup error: {str(e)}", exc_info=True) + finally: + # Sleep for 30 seconds before next cleanup cycle + time.sleep(30) + + # Create daemon thread so it doesn't prevent app shutdown + thread = threading.Thread(target=worker, daemon=True, name="janitor") + thread.start() + app.logger.info("Janitor thread started for background cleanup") + + +def cleanup_expired_tasks(): + """Clean up tasks that have exceeded their deletion time.""" + db_path = _get_db_path() + dry_run_config = current_app.config.get('JANITOR_DRY_RUN', 'false') + if isinstance(dry_run_config, str): + dry_run = dry_run_config.lower() == 'true' + else: + dry_run = bool(dry_run_config) + + try: + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys=ON") + + # Log current time for debugging (using same timezone as setting) + current_time = conn.execute("SELECT datetime('now', '+8 hours')").fetchone()[0] + current_app.logger.info(f"Janitor cleanup check at: {current_time}") + + # Debug: Check for tasks with delete_after_at set + debug_rows = conn.execute(""" + SELECT task_id, delete_after_at, downloaded_at + FROM tasks + WHERE delete_after_at IS NOT NULL + AND deleted_at IS NULL + """).fetchall() + if debug_rows: + current_app.logger.info(f"Found {len(debug_rows)} tasks with delete_after_at set:") + for row in debug_rows: + current_app.logger.info(f" Task {row['task_id']}: delete_at={row['delete_after_at']}, downloaded_at={row['downloaded_at']}") + + # Find tasks that need to be deleted + rows = conn.execute(""" + SELECT task_id, output_dir + FROM tasks + WHERE delete_after_at IS NOT NULL + AND deleted_at IS NULL + AND delete_after_at <= datetime('now', '+8 hours') + """).fetchall() + + if not rows: + return # No tasks to clean up + + current_app.logger.info(f"Janitor found {len(rows)} expired tasks to clean up") + + upload_base = Path(current_app.config.get('UPLOAD_FOLDER') or '') + output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '') + + for row in rows: + task_id = row['task_id'] + output_dir = row['output_dir'] + + try: + delete_targets = [] + + # 记录在库中的 output_dir + if output_dir: + p = Path(output_dir) + delete_targets.append(p) + + # 兜底:按约定 outputs/ + derived_output_dir = output_base / task_id if output_base else None + if derived_output_dir and derived_output_dir not in delete_targets: + delete_targets.append(derived_output_dir) + + # 同时删除 uploads/ + derived_upload_dir = upload_base / task_id if upload_base else None + if derived_upload_dir: + delete_targets.append(derived_upload_dir) + + if dry_run: + for tgt in delete_targets: + if tgt: + current_app.logger.info(f"[DRY RUN] Would delete task {task_id} path: {tgt}") + else: + # 实际删除 + for tgt in delete_targets: + try: + if tgt and tgt.exists(): + shutil.rmtree(tgt, ignore_errors=True) + current_app.logger.info(f"Deleted path for task {task_id}: {tgt}") + else: + current_app.logger.warning(f"Path not found for task {task_id}: {tgt}") + except Exception as e: + current_app.logger.error(f"Failed to delete path {tgt} for task {task_id}: {e}") + + # Hard delete from database + conn.execute( + "DELETE FROM tasks WHERE task_id = ?", + (task_id,) + ) + conn.commit() + current_app.logger.info(f"Hard deleted task {task_id} from database") + + except Exception as e: + current_app.logger.error(f"Failed to delete task {task_id}: {str(e)}", exc_info=True) + # Continue with other tasks even if one fails + + except Exception as e: + current_app.logger.error(f"Database error during cleanup: {str(e)}", exc_info=True) + finally: + if 'conn' in locals(): + conn.close() + + +def reconcile_tasks_on_startup(): + """Reconcile task states on application startup.""" + db_path = _get_db_path() + + try: + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.execute("PRAGMA foreign_keys=ON") + + # Fix tasks that were downloaded but don't have delete_after_at set + # This handles cases where the app crashed after marking downloaded but before setting delete time + conn.execute(""" + UPDATE tasks + SET delete_after_at = COALESCE(delete_after_at, datetime(downloaded_at, '+10 minutes')) + WHERE downloaded_at IS NOT NULL + AND deleted_at IS NULL + AND delete_after_at IS NULL + """) + updated_count = conn.total_changes + + if updated_count > 0: + current_app.logger.info(f"Startup reconciliation: Fixed delete_after_at for {updated_count} downloaded tasks") + + # Check for tasks with output directories that no longer exist + # This helps clean up database entries for manually deleted directories + rows = conn.execute(""" + SELECT task_id, output_dir + FROM tasks + WHERE output_dir IS NOT NULL + AND deleted_at IS NULL + """).fetchall() + + orphaned_count = 0 + for row in rows: + task_id_from_db = row[0] # task_id是第一个字段 + output_dir_from_db = row[1] # output_dir是第二个字段 + + if not Path(output_dir_from_db).exists(): + conn.execute( + "DELETE FROM tasks WHERE task_id = ?", + (task_id_from_db,) + ) + orphaned_count += 1 + + if orphaned_count > 0: + current_app.logger.info(f"Startup reconciliation: Marked {orphaned_count} tasks as deleted (directories not found)") + + # Backfill output_dir for rows with NULL, using OUTPUT_FOLDER/ + output_base = Path(current_app.config.get('OUTPUT_FOLDER') or '') + if output_base: + rows2 = conn.execute(""" + SELECT task_id FROM tasks + WHERE (output_dir IS NULL OR output_dir = '') + AND deleted_at IS NULL + """).fetchall() + for r in rows2: + task_id_from_db = r[0] # task_id是第一个字段 + guess = output_base / task_id_from_db + conn.execute("UPDATE tasks SET output_dir = ? WHERE task_id = ?", (str(guess), task_id_from_db)) + current_app.logger.info(f"Backfilled output_dir for task {task_id_from_db}: {guess}") + + conn.commit() + + except Exception as e: + current_app.logger.error(f"Startup reconciliation error: {str(e)}", exc_info=True) + finally: + if 'conn' in locals(): + conn.close() + + +def _get_db_path(): + """Get database file path from app config.""" + from .db import get_db_path as get_config_db_path + return get_config_db_path(current_app) \ No newline at end of file diff --git a/src/gasflux/processing_pipelines.py b/src/gasflux/processing_pipelines.py index 1c2795b..45baf83 100644 --- a/src/gasflux/processing_pipelines.py +++ b/src/gasflux/processing_pipelines.py @@ -309,7 +309,7 @@ class DataProcessor: ].std() -def process_main(data_file: Path, config_file: Path, task_id: str | None = None) -> None: +def process_main(data_file: Path, config_file: Path, output_dir: Path, task_id: str | None = None) -> DataProcessor: """Main function to run the pipeline.""" config = load_config(config_file) # 优先使用 task_id,否则退回文件 stem @@ -319,5 +319,7 @@ def process_main(data_file: Path, config_file: Path, task_id: str | None = None) processor = DataProcessor(config, df) processor.strategy_selection() processor.process() - reporting.generate_reports(name, processor, config) + reporting.generate_reports(name, processor, config, output_dir) logger.info("Processing complete") + + return processor # 返回processor对象 diff --git a/src/gasflux/reporting.py b/src/gasflux/reporting.py index 15cea2c..59c36f6 100644 --- a/src/gasflux/reporting.py +++ b/src/gasflux/reporting.py @@ -5,7 +5,7 @@ from pathlib import Path import plotly.graph_objects as go from jinja2 import Template from plotly.io import to_html -from datetime import datetime +from datetime import datetime, timedelta import yaml @@ -61,7 +61,7 @@ def mass_balance_report( ) -def generate_reports(name: str, processor, config: dict): +def generate_reports(name: str, processor, config: dict, output_dir: Path): """ Generates reports, configuration files, and processed output variables for gasflux processing runs. @@ -69,11 +69,11 @@ def generate_reports(name: str, processor, config: dict): name (str): The name identifier for the current processing run (task_id). processor (object): The processing object containing report data and output variables. config (dict): Configuration dictionary used for processing. + output_dir (Path): Output directory path (already includes task_id) from INI configuration. """ - output_dir = Path(config["output_dir"]).expanduser() - processing_time = datetime.now() - # Save directly to outputs/{task_id} directory - output_path = output_dir / "outputs" / name + processing_time = datetime.now() + timedelta(hours=8) # Beijing time (UTC+8) + # Save directly to the output directory (already includes task_id) + output_path = output_dir output_path.mkdir(parents=True, exist_ok=True) # Save reports diff --git a/src/gasflux/run_example.py b/src/gasflux/run_example.py index 8c41de4..af0d8f0 100644 --- a/src/gasflux/run_example.py +++ b/src/gasflux/run_example.py @@ -119,8 +119,16 @@ def main(): print(f"使用配置文件: {config_file}") print() + # 确定输出目录 + try: + from .config_reader import config_reader + output_dir = config_reader.outputs_path / processed_csv.stem + except ImportError: + # 如果无法导入配置,使用默认路径 + output_dir = Path("./output") / processed_csv.stem + # 运行GasFlux处理流程 - process_main(processed_csv, config_file) + process_main(processed_csv, config_file, output_dir) print("\n✅ GasFlux通量分析完成!") # 显示输出信息 diff --git a/src/gasflux/shared.py b/src/gasflux/shared.py index 8073d65..f8b9386 100644 --- a/src/gasflux/shared.py +++ b/src/gasflux/shared.py @@ -16,18 +16,9 @@ TASK_STATUS_PROCESSING = "processing" TASK_STATUS_COMPLETED = "completed" TASK_STATUS_FAILED = "failed" -# Global task status storage -task_status = {} +# Task status is now stored in database only (removed global memory storage) -# Task status persistence file override (set by app on startup) -_TASK_STATUS_FILE_PATH: Path | None = None -_TASK_STATUS_FILE_LOCK = threading.Lock() - - -def set_task_status_file_path(path: str | Path): - """Set the task status persistence file path (used across threads without Flask app context).""" - global _TASK_STATUS_FILE_PATH - _TASK_STATUS_FILE_PATH = Path(path) +# File-based persistence removed - using database as single source of truth def _build_simple_downloads_from_results(results: list[dict]) -> dict: @@ -221,8 +212,7 @@ class StatisticsCollector: 'requests_total': self.stats['requests']['total'], 'requests_per_second': round(requests_per_second, 2), 'error_rate_percent': round(error_rate, 2), - 'active_tasks': len([t for t in task_status.values() - if t.get('status') in [TASK_STATUS_PENDING, TASK_STATUS_PROCESSING]]) + 'active_tasks': self._get_active_tasks_count() }, 'requests': { 'by_method': self.stats['requests']['by_method'], @@ -262,6 +252,20 @@ class StatisticsCollector: return " ".join(parts) + def _get_active_tasks_count(self): + """Get count of active tasks from database.""" + try: + from .db import get_db + db = get_db() + row = db.execute(""" + SELECT COUNT(*) as count FROM tasks + WHERE deleted_at IS NULL AND status IN (?, ?) + """, (TASK_STATUS_PENDING, TASK_STATUS_PROCESSING)).fetchone() + return row[0] if row else 0 + except Exception as e: + logger.warning(f"Failed to get active tasks count from database: {str(e)}") + return 0 + # Create global statistics collector instance stats_collector = StatisticsCollector() @@ -305,19 +309,18 @@ def log_response_info(response): logger.info(f"RESPONSE: {request.method} {endpoint} - Status: {response.status_code} - Duration: {duration:.3f}s" if duration else f"RESPONSE: {request.method} {endpoint} - Status: {response.status_code}") -def update_task_status(task_id, status, message=None, results=None, error=None): - """Update task status in the global dictionary.""" - timestamp = time.time() - old_status = task_status.get(task_id, {}).get("status", "unknown") +def update_task_status(task_id, status, message=None, results=None, error=None, output_dir=None): + """Update task status in database only (single source of truth).""" + from flask import current_app - task_status[task_id] = { - "status": status, - "message": message, - "results": results, - "error": error, - "updated_at": timestamp, - "created_at": task_status.get(task_id, {}).get("created_at", timestamp) - } + # Get old status from database for statistics + old_status = "unknown" + try: + current_status = get_task_status(task_id) + if current_status and current_status.get("status") != "not_found": + old_status = current_status.get("status", "unknown") + except Exception: + pass # Ignore errors when getting old status # Record status change in statistics stats_collector.record_task_status_change(old_status, status) @@ -334,43 +337,151 @@ def update_task_status(task_id, status, message=None, results=None, error=None): log_level = logging.ERROR if status == TASK_STATUS_FAILED else logging.INFO logger.log(log_level, log_msg) - # Save task status to file for persistence - logger.debug(f"Saving task {task_id} status '{status}' to persistent storage") - save_task_status_to_file() + # Check persistence backend configuration + persist_backend = current_app.config.get('TASK_PERSIST_BACKEND', 'sqlite').lower() + + # Save to database if configured + if persist_backend in ['sqlite', 'both']: + try: + _update_task_status_db(task_id, status, message, results, error, output_dir) + except Exception as e: + logger.error(f"Failed to save task {task_id} to database: {str(e)}", exc_info=True) + + # File-based persistence removed - database is now the single source of truth + + +def _update_task_status_db(task_id, status, message=None, results=None, error=None, output_dir=None): + """Update task status in database.""" + from .db import get_db + import json + + db = get_db() + + # Serialize results to JSON if provided + results_json = json.dumps(results) if results is not None else None + + # Get existing output_dir from database, or use passed parameter + existing = db.execute("SELECT output_dir FROM tasks WHERE task_id = ?", (task_id,)).fetchone() + output_dir_db = existing[0] if existing and existing[0] else output_dir + + # Check if task exists + existing = db.execute("SELECT task_id FROM tasks WHERE task_id = ?", (task_id,)).fetchone() + + if existing: + # Update existing task + db.execute(""" + UPDATE tasks SET + status = ?, + message = ?, + error = ?, + results = ?, + -- 若库中为空则写入 output_dir;已有值则保留 + output_dir = CASE WHEN (output_dir IS NULL OR output_dir = '') AND ? IS NOT NULL THEN ? ELSE output_dir END, + finished_at = CASE WHEN ? IN ('completed', 'failed') THEN datetime('now', '+8 hours') ELSE finished_at END, + started_at = CASE WHEN ? = 'processing' AND started_at IS NULL THEN datetime('now', '+8 hours') ELSE started_at END + WHERE task_id = ? + """, (status, message, error, results_json, output_dir_db, output_dir_db, status, status, task_id)) + else: + # Insert new task + # output_dir_db is already set above from database query + + db.execute(""" + INSERT INTO tasks (task_id, status, message, error, results, started_at, finished_at, output_dir) + VALUES (?, ?, ?, ?, ?, CASE WHEN ? = 'processing' THEN datetime('now', '+8 hours') ELSE NULL END, + CASE WHEN ? IN ('completed', 'failed') THEN datetime('now', '+8 hours') ELSE NULL END, ?) + """, (task_id, status, message, error, results_json, status, status, output_dir_db)) + + db.commit() def get_task_status(task_id): - """Get task status from global dictionary.""" - if task_id in task_status: - return task_status[task_id] + """Get task status from database (single source of truth).""" + from flask import current_app + + # Check persistence backend configuration + persist_backend = current_app.config.get('TASK_PERSIST_BACKEND', 'sqlite').lower() + + # Only use database as single source of truth + if persist_backend in ['sqlite', 'both']: + try: + db_status = _get_task_status_from_db(task_id) + return db_status + except Exception as e: + logger.error(f"Failed to get task {task_id} from database: {str(e)}", exc_info=True) + return {"status": "not_found"} -def cleanup_old_tasks(): - """Clean up old completed tasks to prevent memory leak.""" - current_time = time.time() - max_age = 24 * 3600 # 24 hours - to_remove = [] +def _get_task_status_from_db(task_id): + """Get task status from database.""" + from .db import get_db + import json - for task_id, task_info in task_status.items(): - task_age = current_time - task_info.get("updated_at", 0) - if task_age > max_age: - to_remove.append(task_id) - logger.info(f"Task {task_id} scheduled for cleanup (age: {task_age:.1f}s, status: {task_info.get('status')})") + db = get_db() + row = db.execute(""" + SELECT task_id, status, message, error, created_at, started_at, finished_at, + downloaded_at, delete_after_at, deleted_at, output_dir, results + FROM tasks WHERE task_id = ? + """, (task_id,)).fetchone() - initial_count = len(task_status) - for task_id in to_remove: - del task_status[task_id] + if not row: + return {"status": "not_found"} - if to_remove: - logger.info(f"Cleanup completed: removed {len(to_remove)} tasks, {len(task_status)} tasks remaining") + # Convert timestamps to float for compatibility + def ts_to_float(ts): + if ts: + try: + import datetime + if isinstance(ts, str): + dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00')) + else: + dt = ts + return dt.timestamp() + except: + return time.time() # fallback + return None + + status = { + "status": row['status'], + "message": row['message'], + "error": row['error'], + "created_at": ts_to_float(row['created_at']), + "updated_at": ts_to_float(row['finished_at'] or row['started_at'] or row['created_at']), + } + + # Add database-specific fields + if row['output_dir']: + status['output_dir'] = row['output_dir'] + if row['downloaded_at']: + status['downloaded_at'] = ts_to_float(row['downloaded_at']) + if row['delete_after_at']: + status['delete_after_at'] = ts_to_float(row['delete_after_at']) + if row['deleted_at']: + status['deleted_at'] = ts_to_float(row['deleted_at']) + + # Load results from database + if row['results']: + try: + status['results'] = json.loads(row['results']) + except (json.JSONDecodeError, TypeError): + status['results'] = [] else: - logger.debug(f"Cleanup check: no old tasks to remove ({len(task_status)} active tasks)") + status['results'] = [] + + return status + + +def cleanup_old_tasks(): + """Clean up old tasks from database (no longer needed with database-only storage).""" + # Since we now use database as single source of truth, + # memory cleanup is no longer needed + logger.debug("cleanup_old_tasks: skipped (using database-only storage)") + return def get_task_list(status_filter=None, page=1, page_size=20, sort_by='updated_at', sort_order='desc', cleanup=True): """ - Get paginated list of tasks with optional filtering and sorting. + Get paginated list of tasks with optional filtering and sorting from database. Args: status_filter (str or list): Filter by task status. Can be single status or list of statuses. @@ -391,65 +502,80 @@ def get_task_list(status_filter=None, page=1, page_size=20, sort_by='updated_at' 'has_prev': has previous page } """ - if cleanup: - cleanup_old_tasks() # Clean up old tasks before returning list + from .db import get_db - # Filter tasks - filtered_tasks = [] - for task_id, task_info in task_status.items(): - if status_filter: - if isinstance(status_filter, str): - if task_info.get('status') != status_filter: - continue - elif isinstance(status_filter, list): - if task_info.get('status') not in status_filter: - continue - filtered_tasks.append((task_id, task_info)) + # Build WHERE clause for filtering + where_conditions = ["deleted_at IS NULL"] + params = [] - # Sort tasks - def safe_numeric_sort(value): - """Safely convert value to numeric for sorting.""" - try: - if isinstance(value, (int, float)): - return value - elif isinstance(value, str): - # Try to convert string timestamp to float - return float(value) - else: - return 0 - except (ValueError, TypeError): - return 0 + if status_filter: + if isinstance(status_filter, str): + where_conditions.append("status = ?") + params.append(status_filter) + elif isinstance(status_filter, list): + placeholders = ",".join("?" * len(status_filter)) + where_conditions.append(f"status IN ({placeholders})") + params.extend(status_filter) - reverse_order = sort_order.lower() == 'desc' - if sort_by == 'created_at': - filtered_tasks.sort(key=lambda x: safe_numeric_sort(x[1].get('created_at', 0)), reverse=reverse_order) - elif sort_by == 'updated_at': - filtered_tasks.sort(key=lambda x: safe_numeric_sort(x[1].get('updated_at', 0)), reverse=reverse_order) - elif sort_by == 'status': - filtered_tasks.sort(key=lambda x: x[1].get('status', ''), reverse=reverse_order) - else: - # Default sort by updated_at desc - filtered_tasks.sort(key=lambda x: safe_numeric_sort(x[1].get('updated_at', 0)), reverse=True) + where_clause = " AND ".join(where_conditions) - # Paginate - total_tasks = len(filtered_tasks) + # Build ORDER BY clause + sort_column = { + 'created_at': 'created_at', + 'updated_at': 'COALESCE(finished_at, started_at, created_at)', + 'status': 'status' + }.get(sort_by, 'COALESCE(finished_at, started_at, created_at)') + + order_clause = f"{sort_column} {sort_order.upper()}" + + db = get_db() + + # Get total count + count_row = db.execute(f"SELECT COUNT(*) FROM tasks WHERE {where_clause}", params).fetchone() + total_tasks = count_row[0] if count_row else 0 + + # Calculate pagination total_pages = (total_tasks + page_size - 1) // page_size - start_idx = (page - 1) * page_size - end_idx = start_idx + page_size + offset = (page - 1) * page_size - paginated_tasks = filtered_tasks[start_idx:end_idx] + # Get paginated results + rows = db.execute(f""" + SELECT task_id, status, message, error, created_at, started_at, finished_at, output_dir, results + FROM tasks + WHERE {where_clause} + ORDER BY {order_clause} + LIMIT ? OFFSET ? + """, params + [page_size, offset]).fetchall() # Format task summaries task_summaries = [] - for task_id, task_info in paginated_tasks: + for row in rows: + # Convert timestamps + created_at = _timestamp_to_unix(row[4]) if row[4] else None + started_at = _timestamp_to_unix(row[5]) if row[5] else None + finished_at = _timestamp_to_unix(row[6]) if row[6] else None + updated_at = finished_at or started_at or created_at + + # Parse results + has_results = False + if row[8]: # results column + try: + import json + results = json.loads(row[8]) + has_results = bool(results) + except: + has_results = False + else: + has_results = False + summary = { - 'task_id': task_id, - 'status': task_info.get('status'), - 'message': task_info.get('message'), - 'created_at': task_info.get('created_at'), - 'updated_at': task_info.get('updated_at'), - 'has_results': bool(task_info.get('results')), - 'has_error': bool(task_info.get('error')) + 'task_id': row[0], # task_id + 'status': row[1], # status + 'message': row[2], # message + 'created_at': created_at, + 'updated_at': updated_at, + 'has_results': has_results, + 'has_error': bool(row[3]) # error } task_summaries.append(summary) @@ -466,55 +592,54 @@ def get_task_list(status_filter=None, page=1, page_size=20, sort_by='updated_at' def get_task_pool_stats(): """ - Get task pool statistics. + Get task pool statistics from database. Returns: dict: Task pool statistics including counts by status. """ - # Note: cleanup_old_tasks() is disabled for task pool stats - # to preserve historical task data for management purposes - # cleanup_old_tasks() # Clean up old tasks before calculating stats + from .db import get_db + + db = get_db() + + # Query status counts from database + status_counts = {} + total_tasks = 0 + + rows = db.execute(""" + SELECT status, COUNT(*) as count + FROM tasks + WHERE deleted_at IS NULL + GROUP BY status + """).fetchall() + + for row in rows: + status = row[0] + count = row[1] + status_counts[status] = count + total_tasks += count + + # Calculate derived statistics + active_tasks = status_counts.get(TASK_STATUS_PROCESSING, 0) + queued_tasks = status_counts.get(TASK_STATUS_PENDING, 0) + completed_tasks = status_counts.get(TASK_STATUS_COMPLETED, 0) + failed_tasks = status_counts.get(TASK_STATUS_FAILED, 0) + + task_success_rate = (completed_tasks / max(total_tasks, 1)) * 100 stats = { - 'total_tasks': len(task_status), - 'status_counts': {}, - 'active_tasks': 0, - 'queued_tasks': 0, - 'completed_tasks': 0, - 'failed_tasks': 0 + 'total_tasks': total_tasks, + 'status_counts': status_counts, + 'active_tasks': active_tasks, + 'queued_tasks': queued_tasks, + 'completed_tasks': completed_tasks, + 'failed_tasks': failed_tasks, + 'task_success_rate': task_success_rate } - for task_info in task_status.values(): - status = task_info.get('status') - if status: - stats['status_counts'][status] = stats['status_counts'].get(status, 0) + 1 - - if status == TASK_STATUS_PROCESSING: - stats['active_tasks'] += 1 - elif status == TASK_STATUS_PENDING: - stats['queued_tasks'] += 1 - elif status == TASK_STATUS_COMPLETED: - stats['completed_tasks'] += 1 - elif status == TASK_STATUS_FAILED: - stats['failed_tasks'] += 1 - return stats -def _get_status_file(): - """Get the task status file path, preferring OUTPUT_FOLDER for reliability.""" - if _TASK_STATUS_FILE_PATH is not None: - return _TASK_STATUS_FILE_PATH - try: - # Prefer OUTPUT_FOLDER which is more reliable and writable - from flask import current_app - output_dir = current_app.config.get('OUTPUT_FOLDER') - if output_dir: - return Path(output_dir) / "task_status.json" - except Exception: - pass - # Fall back to module directory (for development) - return Path(__file__).parent / "task_status.json" +# _get_status_file function removed - file-based persistence no longer used def _to_json_safe(obj): @@ -549,124 +674,24 @@ def _to_json_safe(obj): return None -def save_task_status_to_file(): - """Save current task status to JSON file for persistence.""" +# save_task_status_to_file function removed - using database as single source of truth + + +# load_task_status_from_file function removed - using database as single source of truth + + +# load_task_status_from_db function removed - using database as single source of truth + + +def _timestamp_to_unix(timestamp_str): + """Convert SQLite timestamp string to unix timestamp.""" try: - import json - import os - from pathlib import Path - - status_file = _get_status_file() - logger.debug(f"Attempting to save {len(task_status)} task statuses to {status_file}") - - # Guard: 避免用空内存覆盖已有文件 - if not task_status and status_file.exists(): - logger.info("Skipping task status save: empty in-memory status and file already exists") - return - - # Ensure only one thread writes the status file at a time (prevents Windows replace/lock issues) - with _TASK_STATUS_FILE_LOCK: - - status_to_save = {} - for task_id, task_info in task_status.items(): - try: - clean_info = {} - for k, v in task_info.items(): - if k == 'results' and isinstance(v, list): - # Keep only metadata for results, remove potentially large data fields - cleaned_results = [] - for item in v: - if isinstance(item, dict): - cleaned_item = {kk: vv for kk, vv in item.items() if kk not in ['data', 'arrays']} - - # Normalize result metadata for persistence - name = cleaned_item.get('name') or '' - rel_path = cleaned_item.get('rel_path') or '' - - # Normalize size to int - size = cleaned_item.get('size', 0) - try: - size = int(size) - except Exception: - size = 0 - cleaned_item['size'] = size - - # Infer/normalize type if missing or unknown - t = cleaned_item.get('type') - if (not t) or (t == 'unknown'): - t = _get_file_type(name or rel_path) - cleaned_item['type'] = t - - # Convert numpy types / other objects to JSON-safe - for kk, vv in list(cleaned_item.items()): - cleaned_item[kk] = _to_json_safe(vv) - - cleaned_results.append(cleaned_item) - - clean_info['results'] = cleaned_results - - # Slim persistence: store only direct downloads for frontend - clean_info['downloads'] = _build_simple_downloads_from_results(cleaned_results) - - # Ensure each result has a download_url (optional but handy) - for r in clean_info['results']: - if isinstance(r, dict) and r.get('rel_path') and not r.get('download_url'): - r['download_url'] = f"/download/{r['rel_path']}" - else: - clean_info[k] = _to_json_safe(v) - - status_to_save[task_id] = clean_info - logger.debug(f"Processed task {task_id} with status {clean_info.get('status')}") - - except Exception as task_e: - logger.warning(f"Failed to process task {task_id}: {task_e}") - # Skip this task but continue with others - continue - - # Ensure the directory exists - status_file.parent.mkdir(parents=True, exist_ok=True) - - # Write to a unique temporary file first, then rename for atomicity with fsync - temp_file = status_file.with_suffix(f'.{os.getpid()}.tmp') - with open(temp_file, 'w', encoding='utf-8') as f: - json.dump(status_to_save, f, indent=2, ensure_ascii=False) - f.flush() - os.fsync(f.fileno()) # Force write to disk - - # Atomic rename - temp_file.replace(status_file) - - logger.info(f"Successfully saved {len(status_to_save)} task statuses to {status_file}") - - except Exception as e: - logger.error(f"Failed to save task status to file: {e}", exc_info=True) - - -def load_task_status_from_file(): - """Load task status from JSON file on startup.""" - try: - import json - - status_file = _get_status_file() - - if not status_file.exists(): - logger.info("No task status file found, starting with empty task pool") - return - - # Ensure no writer thread is updating the file while we read it - with _TASK_STATUS_FILE_LOCK: - with open(status_file, 'r', encoding='utf-8') as f: - loaded_status = json.load(f) - - # Restore task status - global task_status - task_status.clear() - task_status.update(loaded_status) - - logger.info(f"Loaded {len(loaded_status)} task statuses from {status_file}") - - except Exception as e: - logger.warning(f"Failed to load task status from file: {e}") + import datetime + # Parse SQLite timestamp format (YYYY-MM-DD HH:MM:SS) + dt = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + return dt.timestamp() + except: + return time.time() def allowed_file(filename, allowed_extensions): diff --git a/新建文本文档.txt b/新建文本文档.txt new file mode 100644 index 0000000..3ce0720 --- /dev/null +++ b/新建文本文档.txt @@ -0,0 +1 @@ +curl -X POST "http://localhost:5000/api-keys" ^ -H "Content-Type: application/json" ^ -H "X-Admin-Bootstrap-Key: bootstrap_key_2024" ^ -d "{\"description\":\"web\",\"scopes\":\"*\"}" \ No newline at end of file