46 KiB
GasFlux Web API 文档
概述
GasFlux Web API 是一个基于 Flask 的 RESTful API,用于上传数据文件、执行气体通量分析处理,并下载处理结果。该 API 支持异步处理,能够处理大量数据并提供实时状态监控。
快速开始
基础信息
- 基础 URL:
http://localhost:5000 - 认证: 无需认证
- 数据格式: JSON
- 文件大小限制: 100MB
- 支持的文件类型:
- 数据文件:
.xlsx,.xls - 配置文件:
.yaml,.yml
- 数据文件:
完整工作流程示例
import requests
import time
def process_gasflux_data():
# 1. 检查 API 健康状态
health_response = requests.get('http://localhost:5000/health')
print(f"API Status: {health_response.json()['status']}")
# 2. 上传数据文件
with open('data.xlsx', 'rb') as f:
files = {'file': f}
upload_response = requests.post('http://localhost:5000/upload', files=files)
result = upload_response.json()
task_id = result['job_id']
print(f"任务已创建: {task_id}")
# 3. 监控处理状态
while True:
status_response = requests.get(f'http://localhost:5000/task/{task_id}')
status = status_response.json()
print(f"Status: {status['status']} - {status['message']}")
if status['status'] == 'completed':
# 4. 下载结果文件
for result_file in status['results']:
download_url = f"http://localhost:5000{result_file['download_url']}"
download_response = requests.get(download_url)
with open(result_file['name'], 'wb') as f:
f.write(download_response.content)
print(f"Downloaded: {result_file['name']}")
break
elif status['status'] == 'failed':
print(f"Task failed: {status.get('error', '未知错误')}")
break
time.sleep(3) # 每3秒检查一次状态
API 端点
🔍 监控和健康检查
1. 获取健康状态
端点: GET /health
描述: 获取 API 的健康状态、系统信息和性能指标。健康检查会评估多个关键指标,当任何指标超出正常范围时,服务状态会标记为 degraded。
健康检查逻辑:
- 存储检查: 验证上传和输出文件夹是否可写
- 负载检查: 活跃任务数量超过20个时发出警告
- 错误率检查: HTTP错误率超过10%时标记为不健康
- 综合评估: 任何一项检查失败都会影响整体健康状态
响应示例 - 健康状态 (200):
{
"code": 200,
"message": "健康检查完成",
"data": {
"status": "healthy",
"version": "1.0.0",
"timestamp": 1705257600.123,
"uptime": "2h 30m 15s",
"storage": {
"uploads_writable": true,
"outputs_writable": true
},
"tasks": {
"active_count": 2,
"total_tracked": 15,
"total_processed": 13,
"success_rate_percent": 92.31
},
"performance": {
"requests_per_second": 0.08,
"avg_response_time_ms": 234.56,
"error_rate_percent": 1.5
}
}
}
响应示例 - 不健康状态 (503):
{
"code": 503,
"message": "服务不可用",
"data": {
"status": "degraded",
"version": "1.0.0",
"timestamp": 1705257600.123,
"uptime": "1h 30m 45s",
"storage": {
"uploads_writable": true,
"outputs_writable": true
},
"tasks": {
"active_count": 0,
"total_tracked": 5,
"total_processed": 3,
"success_rate_percent": 60.0
},
"performance": {
"requests_per_second": 0.12,
"avg_response_time_ms": 145.67,
"error_rate_percent": 50.0
},
"issues": [
"错误率过高 (50.0%)",
"活跃任务数量过多 (25)"
]
}
}
状态码:
200: API 健康 (status: "healthy")503: API 不健康 (status: "degraded") - 服务不可用
健康状态说明:
- healthy: 所有检查通过,服务正常运行
- degraded: 部分检查失败,服务仍可运行但需要关注
- 错误率 > 10%: HTTP请求错误率过高
- 活跃任务 > 20: 系统负载过高
- 存储不可写: 文件系统权限问题
字段说明:
storage.uploads_writable: 上传文件夹是否可写storage.outputs_writable: 输出文件夹是否可写tasks.active_count: 当前活跃的任务数量performance.error_rate_percent: HTTP请求错误率百分比issues: 当状态为degraded时的具体问题列表
2. 获取系统统计信息
端点: GET /stats
描述: 获取详细的 API 统计信息、性能指标和系统监控数据,包括请求统计、任务状态、性能指标和系统资源使用情况。
响应示例 (200):
{
"code": 200,
"message": "统计信息获取成功",
"data": {
"summary": {
"uptime_seconds": 3600.5,
"uptime_formatted": "1h 0m 0s",
"requests_total": 150,
"requests_per_second": 0.04,
"error_rate_percent": 2.0,
"active_tasks": 1
},
"requests": {
"by_method": {
"GET": 120,
"POST": 30
},
"by_status": {
"200": 145,
"400": 3,
"500": 2
},
"top_endpoints": {
"/task/abc123": 45,
"/health": 30,
"/": 25
}
},
"tasks": {
"total_created": 25,
"total_completed": 20,
"total_failed": 2,
"success_rate_percent": 90.91,
"by_status": {
"pending": 1,
"processing": 1,
"completed": 20,
"failed": 2
}
},
"performance": {
"avg_response_time_ms": 245.67,
"max_response_time_ms": 1250.34,
"min_response_time_ms": 12.45
},
"system": {
"memory_usage_percent": 45.2,
"memory_used_gb": 7.3,
"memory_total_gb": 16.0,
"disk_usage_percent": 23.1,
"disk_used_gb": 46.8,
"disk_total_gb": 203.2
},
"recent_tasks": [
{
"task_id": "abc123-def456",
"status": "completed",
"age_seconds": 45.2,
"message": "处理完成成功"
}
]
}
}
字段说明:
summary.uptime_seconds: API运行时间(秒)summary.uptime_formatted: 格式化的运行时间(如 "1h 0m 0s")summary.requests_total: 总请求数summary.requests_per_second: 平均每秒请求数summary.error_rate_percent: 请求错误率百分比summary.active_tasks: 当前活跃任务数(pending或processing状态)requests.by_method: 按HTTP方法分组的请求统计requests.by_status: 按HTTP状态码分组的请求统计requests.top_endpoints: 请求最多的前10个端点tasks.total_created: 创建的总任务数tasks.total_completed: 完成的任务数tasks.total_failed: 失败的任务数tasks.success_rate_percent: 任务成功率百分比tasks.by_status: 按状态分组的任务统计performance.avg_response_time_ms: 平均响应时间(毫秒)performance.max_response_time_ms: 最大响应时间(毫秒)performance.min_response_time_ms: 最小响应时间(毫秒)system.memory_usage_percent: 内存使用率百分比system.memory_used_gb: 已用内存(GB)system.memory_total_gb: 总内存(GB)system.disk_usage_percent: 磁盘使用率百分比(输出目录所在磁盘)system.disk_used_gb: 已用磁盘空间(GB)system.disk_total_gb: 总磁盘空间(GB)recent_tasks[]: 最近20个任务的状态信息
3. 重置统计信息
端点: POST /stats/reset
描述: 重置所有 API 统计数据(管理员功能)。
响应示例 (200):
{
"code": 200,
"message": "统计信息重置成功",
"data": {
"timestamp": 1705257600.123
}
}
4. 获取配置信息
端点: GET /config
描述: 获取当前应用配置信息和支持的环境变量。
响应示例 (200):
{
"code": 200,
"message": "配置信息获取成功",
"data": {
"configuration": {
"host": "0.0.0.0",
"port": 5000,
"debug": false,
"base_dir": "/app",
"upload_folder": "/app/web_api_data/uploads",
"output_folder": "/app/web_api_data/outputs",
"max_content_length": 104857600,
"log_level": "INFO",
"log_file": "logs/gasflux_api.log",
"cors_origins": ["*"],
"task_cleanup_interval": 3600,
"max_task_age": 86400,
"threads": 8,
"connection_limit": 100,
"channel_timeout": 300
},
"environment_variables": {
"supported": [
"GASFLUX_HOST", "GASFLUX_PORT", "GASFLUX_DEBUG",
"GASFLUX_UPLOAD_FOLDER", "GASFLUX_OUTPUT_FOLDER",
"GASFLUX_MAX_CONTENT_LENGTH", "GASFLUX_LOG_LEVEL",
"GASFLUX_LOG_FILE", "GASFLUX_CORS_ORIGINS",
"GASFLUX_TASK_CLEANUP_INTERVAL", "GASFLUX_MAX_TASK_AGE",
"GASFLUX_THREADS", "GASFLUX_CONNECTION_LIMIT",
"GASFLUX_CHANNEL_TIMEOUT"
],
"current_values": {
"GASFLUX_HOST": "0.0.0.0",
"GASFLUX_PORT": "5000",
"GASFLUX_DEBUG": "false"
}
}
}
}
📤 文件上传和管理
5. 文件上传和处理
端点: POST /upload
描述: 上传数据文件并启动异步处理任务。
请求参数 (multipart/form-data):
file(必需): 数据文件 (.xlsx 或 .xls 格式)config(可选): 配置文件 (.yaml 或 .yml 格式)
请求示例 (cURL):
curl -X POST \
-F "file=@data.xlsx" \
-F "config=@config.yaml" \
http://localhost:5000/upload
请求示例 (Python):
import requests
files = {'file': open('data.xlsx', 'rb')}
config = {'config': open('config.yaml', 'rb')} # 可选
response = requests.post('http://localhost:5000/upload', files={**files, **config})
result = response.json()
print(f"Task ID: {result['job_id']}")
成功响应 (202):
{
"code": 202,
"message": "任务已接受并加入处理队列",
"data": {
"status": "accepted",
"job_id": "abc123-def456-ghi789",
"task_status_url": "/task/abc123-def456-ghi789"
}
}
错误响应示例:
- 文件类型不支持 (400):
{
"code": 400,
"message": "无效的数据文件类型。只允许 .xlsx 和 .xls 格式。",
"data": {}
}
- 文件过大 (413):
{
"code": 413,
"message": "文件过大。最大尺寸为 100MB。",
"data": {}
}
- 配置文件格式错误 (400):
{
"code": 400,
"message": "无效的配置文件类型。只允许 .yaml 和 .yml 格式。",
"data": {}
}
📊 任务管理和监控
6. 查询任务状态
端点: GET /task/{task_id}
描述: 查询异步处理任务的当前状态和进度信息。
路径参数:
task_id: 任务 ID (UUID 格式)
响应示例 - 处理中 (200):
{
"code": 200,
"message": "任务查询成功",
"data": {
"task_id": "abc123-def456-ghi789",
"status": "processing",
"message": "GasFlux 分析完成,正在生成报告...",
"updated_at": 1705257600.123,
"progress": {
"stage": "report_generation",
"completed_steps": 4,
"total_steps": 5,
"estimated_time_remaining": 45
}
}
}
响应示例 - 处理完成 (200):
{
"code": 200,
"message": "任务查询成功",
"data": {
"task_id": "abc123-def456-ghi789",
"status": "completed",
"message": "处理完成成功",
"updated_at": 1705257600.123,
"processing_time_seconds": 125.67,
"results": [
{
"name": "08_34_01_5m.processed_ch4_report.html",
"rel_path": "abc123-def456-ghi789/08_34_01_5m.processed_ch4_report.html",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed_ch4_report.html",
"size": 245760,
"type": "report"
},
{
"name": "08_34_01_5m.processed_data.csv",
"rel_path": "abc123-def456-ghi789/08_34_01_5m.processed_data.csv",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed_data.csv",
"size": 153600,
"type": "data"
},
{
"name": "08_34_01_5m.processed_config.yaml",
"rel_path": "abc123-def456-ghi789/08_34_01_5m.processed_config.yaml",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed_config.yaml",
"size": 2048,
"type": "config"
},
{
"name": "08_34_01_5m.processed_output_vars.json",
"rel_path": "abc123-def456-ghi789/08_34_01_5m.processed_output_vars.json",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed_output_vars.json",
"size": 4096,
"type": "metadata"
}
]
}
}
响应示例 - 处理失败 (200):
{
"task_id": "abc123-def456-ghi789",
"status": "failed",
"message": "处理失败",
"updated_at": 1705257600.123,
"processing_time_seconds": 23.45,
"error": "处理失败: Invalid data format in column 'temperature'",
"error_details": {
"stage": "data_validation",
"error_code": "INVALID_DATA_FORMAT",
"traceback": "..."
}
}
响应示例 - 任务不存在 (404):
{
"code": 404,
"message": "任务未找到",
"data": {}
}
任务状态说明:
pending: 任务已排队,等待处理processing: 正在处理中(包含进度信息)completed: 处理完成(包含结果文件列表)failed: 处理失败(包含错误信息)
7. 更新任务状态
端点: PUT /task/{task_id}
描述: 更新任务的状态、信息或优先级。
路径参数:
task_id: 任务 ID (UUID 格式)
请求参数 (JSON):
status(可选): 新的任务状态pending: 重新排队等待处理processing: 标记为处理中completed: 标记为完成failed: 标记为失败
message(可选): 状态消息或错误描述priority(可选): 任务优先级 (normal/high/low)
请求示例 (cURL):
# 标记任务为完成
curl -X PUT http://localhost:5000/task/abc123-def456-ghi789 \
-H "Content-Type: application/json" \
-d '{"status": "completed", "message": "手动标记为完成"}'
# 更新任务消息
curl -X PUT http://localhost:5000/task/abc123-def456-ghi789 \
-H "Content-Type: application/json" \
-d '{"message": "更新的状态消息"}'
# 设置高优先级
curl -X PUT http://localhost:5000/task/abc123-def456-ghi789 \
-H "Content-Type: application/json" \
-d '{"priority": "high"}'
请求示例 (Python):
import requests
# 标记任务为失败
response = requests.put(
'http://localhost:5000/task/abc123-def456-ghi789',
json={
'status': 'failed',
'message': '处理失败 due to invalid input data'
}
)
# 更新任务优先级
response = requests.put(
'http://localhost:5000/task/abc123-def456-ghi789',
json={'priority': 'high'}
)
成功响应 (200):
{
"code": 200,
"message": "任务更新成功",
"data": {
"task_id": "abc123-def456-ghi789",
"status": "updated",
"task_info": {
"status": "completed",
"message": "手动标记为完成",
"updated_at": 1705257600.123,
"priority": "normal"
}
}
}
错误响应示例:
- 任务不存在 (404):
{
"code": 404,
"message": "任务未找到",
"data": {}
}
- 无效状态 (400):
{
"code": 400,
"message": "无效状态。必须是以下之一: pending, processing, completed, failed",
"data": {}
}
- 无效请求 (400):
{
"code": 400,
"message": "请求体必须是 JSON 格式",
"data": {}
}
8. 删除任务
端点: DELETE /task/{task_id}
描述: 删除任务及其所有相关的文件和数据。
路径参数:
task_id: 任务 ID (UUID 格式)
请求示例 (cURL):
# 删除指定任务
curl -X DELETE http://localhost:5000/task/abc123-def456-ghi789
请求示例 (Python):
import requests
# 删除任务
response = requests.delete('http://localhost:5000/task/abc123-def456-ghi789')
if response.status_code == 200:
result = response.json()
print(f"Task {result['task_id']} deleted")
print(f"Files deleted: {result['details']['folders_deleted']}")
print(f"Size freed: {result['details']['total_size_deleted']} bytes")
else:
print(f"Failed to delete task: {response.json()}")
成功响应 (200):
{
"code": 200,
"message": "任务及相关文件删除成功",
"data": {
"task_id": "abc123-def456-ghi789",
"status": "deleted",
"details": {
"folders_deleted": 1,
"total_size_deleted": 307200,
"task_status": "completed"
}
}
}
错误响应示例:
- 任务不存在 (404):
{
"code": 404,
"message": "任务未找到",
"data": {}
}
- 任务正在处理 (409):
{
"code": 409,
"message": "无法删除当前正在处理或等待处理的任务",
"data": {
"task_status": "processing"
}
}
- 删除文件失败 (500):
{
"code": 500,
"message": "删除任务文件失败: Permission denied",
"data": {}
}
注意事项:
- 只能删除已完成或失败的任务
- 无法删除正在处理或等待处理的任务
- 删除操作会同时删除任务记录和所有相关文件
- 删除操作不可逆,请谨慎使用
📋 报告管理和查询
9. 分页查询已生成报告
端点: GET /reports
描述: 分页查询所有已生成的处理报告,支持排序和过滤。
查询参数:
page(可选): 页码 (默认: 1)per_page(可选): 每页报告数量 (默认: 20, 最大: 100)sort_by(可选): 排序字段 (默认: created_at)created_at: 按创建时间排序task_id: 按任务ID排序file_size: 按文件总大小排序processing_time: 按处理时间排序
sort_order(可选): 排序顺序 (默认: desc)asc: 升序desc: 降序
status(可选): 按任务状态过滤completed: 只显示完成的任务failed: 只显示失败的任务- 不指定: 显示所有任务
请求示例 (cURL):
# 获取第一页,每页20个报告,按创建时间倒序
curl "http://localhost:5000/reports?page=1&per_page=20&sort_by=created_at&sort_order=desc"
# 获取第二页,只显示完成的任务
curl "http://localhost:5000/reports?page=2&status=completed"
# 按处理时间升序排序
curl "http://localhost:5000/reports?sort_by=processing_time&sort_order=asc"
请求示例 (Python):
import requests
# 基本查询
response = requests.get('http://localhost:5000/reports')
reports = response.json()
# 分页查询
params = {
'page': 1,
'per_page': 10,
'sort_by': 'created_at',
'sort_order': 'desc',
'status': 'completed'
}
response = requests.get('http://localhost:5000/reports', params=params)
data = response.json()
print(f"总报告数: {data['pagination']['total_reports']}")
print(f"当前页: {data['pagination']['page']}/{data['pagination']['total_pages']}")
for report in data['reports']:
print(f"任务: {report['task_id']}")
print(f"状态: {report['status']}")
print(f"创建时间: {report['created_at']}")
print(f"文件数量: {report['file_count']}")
if report['main_report']:
print(f"主报告: {report['main_report']['download_url']}")
成功响应 (200):
{
"code": 200,
"message": "报告列表获取成功",
"data": {
"reports": [
{
"task_id": "abc123-def456-ghi789",
"report_name": "08_34_01_5m",
"status": "completed",
"created_at": 1705257600.123,
"file_count": 4,
"total_size": 307200,
"processing_time_seconds": 125.67,
"main_report": {
"name": "08_34_01_5m.processed_ch4_report.html",
"size": 245760,
"type": "report",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed/2026-01-14_10-33-29-961698_processing_run/08_34_01_5m.processed_ch4_report.html"
},
"all_files": [
{
"name": "08_34_01_5m.processed_ch4_report.html",
"size": 245760,
"type": "report",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed/2026-01-14_10-33-29-961698_processing_run/08_34_01_5m.processed_ch4_report.html"
},
{
"name": "08_34_01_5m.processed_data.csv",
"size": 153600,
"type": "data",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed/2026-01-14_10-33-29-961698_processing_run/08_34_01_5m.processed_data.csv"
},
{
"name": "08_34_01_5m.processed_config.yaml",
"size": 2048,
"type": "config",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed/2026-01-14_10-33-29-961698_processing_run/08_34_01_5m.processed_config.yaml"
},
{
"name": "08_34_01_5m.processed_output_vars.json",
"size": 4096,
"type": "metadata",
"download_url": "/download/abc123-def456-ghi789/08_34_01_5m.processed/2026-01-14_10-33-29-961698_processing_run/08_34_01_5m.processed_output_vars.json"
}
],
"run_directory": "abc123-def456-ghi789/08_34_01_5m.processed/2026-01-14_10-33-29-961698_processing_run"
}
],
"pagination": {
"page": 1,
"per_page": 20,
"total_reports": 45,
"total_pages": 3,
"has_next": true,
"has_prev": false
},
"filters": {
"sort_by": "created_at",
"sort_order": "desc",
"status": null
}
}
}
错误响应示例:
- 参数无效 (400):
{
"code": 400,
"message": "Invalid parameter: per_page must be between 1 and 100",
"data": {}
}
📁 文件下载
10. 下载处理结果
端点: GET /download/{filename}
描述: 下载处理后的结果文件。
路径参数:
filename: 文件的相对路径 (包含任务ID)
请求示例 (cURL):
# 下载 HTML 报告
curl -O http://localhost:5000/download/abc123-def456-ghi789/report.html
# 下载 CSV 数据
curl -O http://localhost:5000/download/abc123-def456-ghi789/data.csv
# 使用 Python 下载
import requests
response = requests.get('http://localhost:5000/download/abc123-def456-ghi789/report.html')
with open('report.html', 'wb') as f:
f.write(response.content)
状态码:
200: 成功下载文件403: 访问被拒绝 (路径遍历攻击防护)404: 文件不存在400: 路径不是文件
🌐 Web 界面
11. Web 管理界面
端点: GET /
描述: 访问用户友好的 Web 界面,支持文件上传、任务监控和结果下载。
响应: HTML 页面,包含:
- 文件上传表单
- 任务状态监控面板
- 结果文件下载链接
- 系统状态信息
🔄 处理流程详解
完整处理流程
-
文件上传阶段
- 客户端验证文件类型和大小
- 上传数据文件和可选的配置文件
- 服务器进行安全检查和文件存储
-
任务队列管理
- 服务器为上传任务分配唯一的 UUID
- 任务进入处理队列,根据系统负载进行调度
-
异步数据处理
- 数据预处理: 格式转换、数据验证、单位转换
- 配置合并: 默认配置 + 用户配置
- GasFlux 核心分析:
- 背景校正算法
- 气体通量计算
- 空间插值 (克里金插值)
- 统计分析和可视化
- 结果生成: HTML 报告、CSV 数据、配置文件备份
-
实时状态监控
- 客户端通过任务 ID 轮询状态
- 支持进度跟踪和预计完成时间
-
结果获取和清理
- 处理完成后提供下载链接
- 定期清理过期任务和文件
处理时间估计
- 小文件 (< 10MB): 1-3 分钟
- 中等文件 (10-50MB): 3-10 分钟
- 大文件 (> 50MB): 10-30 分钟
- 受计算复杂度、数据质量和系统负载影响
⚠️ 错误处理
HTTP 状态码
| 状态码 | 说明 | 处理建议 |
|---|---|---|
| 200 | 请求成功 | 正常处理响应数据 |
| 202 | 请求已接受 (异步处理) | 记录任务 ID,开始状态轮询 |
| 400 | 请求参数错误 | 检查请求格式和参数 |
| 403 | 访问被拒绝 | 检查文件路径和权限 |
| 404 | 资源不存在 | 验证任务 ID 或文件路径 |
| 413 | 文件过大 | 压缩文件或联系管理员 |
| 500 | 服务器内部错误 | 检查系统状态,重试请求 |
常见错误响应
文件上传错误:
{
"code": 400,
"message": "无效的数据文件类型。只允许 .xlsx 和 .xls 格式。",
"data": {}
}
{
"code": 413,
"message": "文件过大。最大尺寸为 100MB。",
"data": {}
}
任务查询错误:
{
"code": 404,
"message": "任务未找到",
"data": {
"task_id": "invalid-task-id"
}
}
文件下载错误:
{
"code": 404,
"message": "文件未找到",
"data": {
"filename": "task-id/file.html"
}
}
📊 性能监控和日志
日志记录
所有 API 请求都会记录到 gasflux_api.log 文件,包含:
2026-01-14 10:30:15,123 - INFO - [task:abc123] POST /upload - 202 - 2.34s
2026-01-14 10:30:17,456 - INFO - [task:abc123] Processing started: data.xlsx (15.2MB)
2026-01-14 10:32:22,789 - INFO - [task:abc123] Processing completed: 4 files generated
2026-01-14 10:32:25,012 - INFO - [task:abc123] GET /download/abc123/report.html - 200 - 0.45s
性能指标
通过 /stats 端点获取:
- 请求响应时间统计
- 任务成功率
- 系统资源使用情况
- 错误率和热点端点
💻 编程示例
Python 完整示例
基础用法
import requests
import time
import os
from pathlib import Path
class GasFluxClient:
def __init__(self, base_url="http://localhost:5000"):
self.base_url = base_url.rstrip('/')
def check_health(self):
"""检查 API 健康状态"""
response = requests.get(f"{self.base_url}/health")
return response.json()
def upload_and_process(self, data_file, config_file=None, output_dir="./results"):
"""上传文件并处理"""
files = {'file': open(data_file, 'rb')}
if config_file and os.path.exists(config_file):
files['config'] = open(config_file, 'rb')
# 上传文件
print(f"Uploading {data_file}...")
response = requests.post(f"{self.base_url}/upload", files=files)
result = response.json()
if response.status_code != 202:
raise Exception(f"Upload failed: {result.get('error', 'Unknown error')}")
task_id = result['job_id']
print(f"任务已创建: {task_id}")
# 监控处理状态
while True:
status_response = requests.get(f"{self.base_url}/task/{task_id}")
status = status_response.json()
print(f"Status: {status['status']} - {status['message']}")
if status['status'] == 'completed':
# 下载结果文件
os.makedirs(output_dir, exist_ok=True)
for result_file in status['results']:
download_url = f"{self.base_url}{result_file['download_url']}"
output_path = Path(output_dir) / result_file['name']
print(f"Downloading {result_file['name']}...")
download_response = requests.get(download_url)
with open(output_path, 'wb') as f:
f.write(download_response.content)
print(f"Processing completed! Results saved to {output_dir}")
return status
elif status['status'] == 'failed':
error_msg = status.get('error', 'Unknown error')
raise Exception(f"Task failed: {error_msg}")
time.sleep(3) # 每3秒检查一次状态
# 使用示例
client = GasFluxClient()
try:
# 检查 API 状态
health = client.check_health()
print(f"API Status: {health['status']}")
# 处理数据
result = client.upload_and_process(
data_file="data.xlsx",
config_file="config.yaml",
output_dir="./gasflux_results"
)
print("处理完成成功!")
except Exception as e:
print(f"Error: {e}")
异步版本 (使用 asyncio)
import asyncio
import aiohttp
import aiofiles
from pathlib import Path
class AsyncGasFluxClient:
def __init__(self, base_url="http://localhost:5000"):
self.base_url = base_url.rstrip('/')
async def upload_and_process(self, data_file, config_file=None, output_dir="./results"):
async with aiohttp.ClientSession() as session:
# 准备文件上传
data = aiohttp.FormData()
data.add_field('file', open(data_file, 'rb'), filename=Path(data_file).name)
if config_file and Path(config_file).exists():
data.add_field('config', open(config_file, 'rb'), filename=Path(config_file).name)
# 上传文件
print(f"Uploading {data_file}...")
async with session.post(f"{self.base_url}/upload", data=data) as response:
result = await response.json()
if response.status != 202:
raise Exception(f"Upload failed: {result.get('error', 'Unknown error')}")
task_id = result['job_id']
print(f"任务已创建: {task_id}")
# 监控处理状态
while True:
async with session.get(f"{self.base_url}/task/{task_id}") as response:
status = await response.json()
print(f"Status: {status['status']} - {status['message']}")
if status['status'] == 'completed':
# 下载结果文件
Path(output_dir).mkdir(exist_ok=True)
for result_file in status['results']:
download_url = f"{self.base_url}{result_file['download_url']}"
output_path = Path(output_dir) / result_file['name']
print(f"Downloading {result_file['name']}...")
async with session.get(download_url) as response:
async with aiofiles.open(output_path, 'wb') as f:
await f.write(await response.read())
print(f"Processing completed! Results saved to {output_dir}")
return status
elif status['status'] == 'failed':
error_msg = status.get('error', 'Unknown error')
raise Exception(f"Task failed: {error_msg}")
await asyncio.sleep(3)
# 使用异步客户端
async def main():
client = AsyncGasFluxClient()
try:
result = await client.upload_and_process(
data_file="large_dataset.xlsx",
config_file="config.yaml",
output_dir="./async_results"
)
print("Async processing completed!")
except Exception as e:
print(f"Error: {e}")
# 运行异步示例
# asyncio.run(main())
JavaScript/Node.js 示例
完整实现
const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs').promises;
const path = require('path');
class GasFluxAPI {
constructor(baseURL = 'http://localhost:5000') {
this.baseURL = baseURL.replace(/\/$/, '');
this.client = axios.create({
baseURL: this.baseURL,
timeout: 30000
});
}
async checkHealth() {
try {
const response = await this.client.get('/health');
return response.data;
} catch (error) {
throw new Error(`Health check failed: ${error.message}`);
}
}
async uploadFile(dataFilePath, configFilePath = null) {
const formData = new FormData();
// 添加数据文件
if (!await fs.access(dataFilePath).then(() => true).catch(() => false)) {
throw new Error(`Data file not found: ${dataFilePath}`);
}
formData.append('file', fs.createReadStream(dataFilePath), {
filename: path.basename(dataFilePath)
});
// 添加配置文件(如果提供)
if (configFilePath) {
if (!await fs.access(configFilePath).then(() => true).catch(() => false)) {
console.warn(`Config file not found: ${configFilePath}, skipping...`);
} else {
formData.append('config', fs.createReadStream(configFilePath), {
filename: path.basename(configFilePath)
});
}
}
try {
const response = await this.client.post('/upload', formData, {
headers: formData.getHeaders(),
maxContentLength: Infinity,
maxBodyLength: Infinity
});
return response.data;
} catch (error) {
if (error.response) {
throw new Error(`Upload failed: ${error.response.data.error}`);
}
throw error;
}
}
async getTaskStatus(taskId) {
try {
const response = await this.client.get(`/task/${taskId}`);
return response.data;
} catch (error) {
if (error.response && error.response.status === 404) {
throw new Error(`任务未找到: ${taskId}`);
}
throw error;
}
}
async downloadFile(downloadUrl, outputPath) {
try {
const response = await this.client.get(downloadUrl, {
responseType: 'stream'
});
const writer = fs.createWriteStream(outputPath);
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
} catch (error) {
throw new Error(`Download failed: ${error.message}`);
}
}
async processData(dataFilePath, configFilePath = null, outputDir = './results', pollInterval = 3000) {
try {
// 1. 检查 API 健康状态
console.log('Checking API health...');
const health = await this.checkHealth();
console.log(`API Status: ${health.status}`);
// 2. 上传文件
console.log(`Uploading ${dataFilePath}...`);
const uploadResult = await this.uploadFile(dataFilePath, configFilePath);
const taskId = uploadResult.job_id;
console.log(`任务已创建: ${taskId}`);
// 3. 监控处理状态
console.log('Monitoring processing status...');
while (true) {
const status = await this.getTaskStatus(taskId);
console.log(`[${new Date().toISOString()}] Status: ${status.status} - ${status.message}`);
if (status.status === 'completed') {
console.log('处理完成成功!');
// 4. 下载结果文件
console.log('Downloading result files...');
await fs.mkdir(outputDir, { recursive: true });
for (const resultFile of status.results) {
const downloadUrl = `${this.baseURL}${resultFile.download_url}`;
const outputPath = path.join(outputDir, resultFile.name);
console.log(`Downloading ${resultFile.name}...`);
await this.downloadFile(downloadUrl, outputPath);
console.log(`Saved to ${outputPath}`);
}
return {
taskId,
status: status.status,
results: status.results,
outputDir
};
} else if (status.status === 'failed') {
const errorMsg = status.error || 'Unknown error';
throw new Error(`处理失败: ${errorMsg}`);
}
// 等待后重试
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
} catch (error) {
console.error(`Error in processData: ${error.message}`);
throw error;
}
}
}
// 使用示例
async function main() {
const api = new GasFluxAPI();
try {
const result = await api.processData(
'data.xlsx',
'config.yaml', // 可选
'./gasflux_results',
5000 // 5秒检查一次状态
);
console.log('All done!', result);
} catch (error) {
console.error('处理失败:', error.message);
process.exit(1);
}
}
// 如果直接运行此文件
if (require.main === module) {
main();
}
module.exports = GasFluxAPI;
cURL 命令示例
基本上传和监控
#!/bin/bash
# API 基础 URL
API_URL="http://localhost:5000"
# 检查健康状态
echo "Checking API health..."
curl -s "${API_URL}/health" | jq '.'
# 上传文件
echo "Uploading data file..."
UPLOAD_RESPONSE=$(curl -s -X POST \
-F "file=@data.xlsx" \
-F "config=@config.yaml" \
"${API_URL}/upload")
echo "Upload response: $UPLOAD_RESPONSE"
# 提取任务 ID
TASK_ID=$(echo "$UPLOAD_RESPONSE" | jq -r '.job_id')
if [ "$TASK_ID" = "null" ] || [ -z "$TASK_ID" ]; then
echo "Upload failed!"
exit 1
fi
echo "Task ID: $TASK_ID"
# 监控任务状态
echo "Monitoring task status..."
while true; do
STATUS_RESPONSE=$(curl -s "${API_URL}/task/${TASK_ID}")
STATUS=$(echo "$STATUS_RESPONSE" | jq -r '.status')
MESSAGE=$(echo "$STATUS_RESPONSE" | jq -r '.message')
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Status: $STATUS - $MESSAGE"
if [ "$STATUS" = "completed" ]; then
echo "Processing completed!"
# 下载结果文件
echo "Downloading result files..."
mkdir -p results
echo "$STATUS_RESPONSE" | jq -r '.results[].download_url' | while read -r download_url; do
filename=$(basename "$download_url")
echo "Downloading $filename..."
curl -s -o "results/$filename" "${API_URL}${download_url}"
done
echo "All files downloaded to ./results/"
break
elif [ "$STATUS" = "failed" ]; then
ERROR=$(echo "$STATUS_RESPONSE" | jq -r '.error // "Unknown error"')
echo "Task failed: $ERROR"
exit 1
fi
sleep 3
done
🔧 故障排除指南
常见问题和解决方案
1. 连接问题
问题: 无法连接到 API 服务器
curl: (7) 连接被拒绝,无法连接到 localhost 端口 5000
解决方案:
- 检查服务器是否正在运行:
ps aux | grep gasflux - 验证端口配置:检查环境变量
GASFLUX_PORT - 确认防火墙设置:
sudo ufw status或sudo firewall-cmd --list-all
2. 文件上传失败
问题: 文件上传被拒绝
{"error": "无效的数据文件类型。只允许 .xlsx 和 .xls 格式。"}
解决方案:
- 检查文件扩展名(必须是 .xlsx 或 .xls)
- 验证文件不是空的
- 确保文件大小不超过 100MB
问题: 文件过大
{"error": "文件过大。最大尺寸为 100MB。"}
解决方案:
- 压缩数据文件
- 分割成多个较小的文件
- 联系管理员增加文件大小限制
3. 任务处理问题
问题: 任务长时间处于 pending 状态
{"status": "pending", "message": "Task queued for processing"}
解决方案:
- 检查系统负载:
GET /stats - 查看服务器资源使用情况
- 等待队列处理或联系管理员
问题: 处理失败
{
"status": "failed",
"error": "处理失败: Invalid data format in column 'temperature'"
}
解决方案:
- 检查输入数据格式和列名
- 验证数据范围(温度、压力等)
- 查看详细的错误信息和日志
4. 文件下载问题
问题: 下载失败
{"error": "File not found or access denied"}
解决方案:
- 确认任务已完成(status: "completed")
- 检查下载 URL 格式
- 验证文件路径是否存在
5. 服务器性能问题
问题: 响应缓慢或超时
解决方案:
- 检查系统资源:
GET /stats - 查看并发任务数量
- 监控内存和 CPU 使用率
- 考虑增加服务器资源或优化配置
调试技巧
启用详细日志
# 设置环境变量启用 DEBUG 模式
export GASFLUX_LOG_LEVEL=DEBUG
export GASFLUX_DEBUG=true
# 重启服务器
python server_waitress.py
查看实时日志
# 监控日志文件
tail -f logs/gasflux_api.log
# 过滤特定任务的日志
tail -f logs/gasflux_api.log | grep "task:abc123"
使用健康检查进行诊断
# 基本健康检查
curl -s http://localhost:5000/health | jq '.'
# 详细统计信息
curl -s http://localhost:5000/stats | jq '.'
# 配置信息
curl -s http://localhost:5000/config | jq '.'
🛡️ 安全考虑
数据保护
- 文件类型验证: 只接受指定的文件类型 (.xlsx, .xls, .yaml, .yml)
- 路径遍历保护: 防止通过
../等路径访问敏感文件 - 文件大小限制: 防止拒绝服务攻击
访问控制
- 无认证设计: 适用于内部网络或受控环境
- IP 白名单: 可通过反向代理实现
- HTTPS 推荐: 在生产环境中使用 HTTPS
数据清理
- 自动清理: 过期的任务和文件会被自动删除
- 配置选项: 可通过环境变量调整清理间隔和过期时间
📈 最佳实践
客户端实现
1. 错误处理
def safe_api_call(url, max_retries=3, backoff_factor=2):
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise e
wait_time = backoff_factor ** attempt
print(f"Request failed, retrying in {wait_time}s...")
time.sleep(wait_time)
2. 状态轮询优化
def monitor_task_efficiently(task_id, max_wait_time=3600):
start_time = time.time()
check_interval = 2 # 初始检查间隔
while time.time() - start_time < max_wait_time:
status = get_task_status(task_id)
if status['status'] in ['completed', 'failed']:
return status
# 根据任务阶段调整检查间隔
if 'progress' in status:
stage = status['progress'].get('stage', '')
if 'data_processing' in stage:
check_interval = 5 # 数据处理阶段检查频率降低
elif 'report_generation' in stage:
check_interval = 10 # 报告生成阶段进一步降低
time.sleep(check_interval)
raise TimeoutError(f"Task monitoring timed out after {max_wait_time}s")
3. 大文件处理
def upload_large_file(file_path, chunk_size=1024*1024): # 1MB chunks
file_size = os.path.getsize(file_path)
# 对于大文件,考虑压缩或分块上传
if file_size > 50*1024*1024: # 50MB
print(f"Large file detected ({file_size/1024/1024:.1f}MB)")
print("Consider compressing the data or splitting into smaller files")
# 标准上传
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post('http://localhost:5000/upload', files=files)
return response.json()
服务器部署
生产环境配置
# 环境变量配置
export GASFLUX_HOST=0.0.0.0
export GASFLUX_PORT=5000
export GASFLUX_LOG_LEVEL=INFO
export GASFLUX_MAX_CONTENT_LENGTH=104857600 # 100MB
# 使用进程管理器
# systemd 服务示例
cat > /etc/systemd/system/gasflux.service << EOF
[Unit]
Description=GasFlux Web API
After=network.target
[Service]
User=gasflux
Group=gasflux
WorkingDirectory=/opt/gasflux
ExecStart=/opt/gasflux/venv/bin/python server_waitress.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# 启用和启动服务
sudo systemctl enable gasflux
sudo systemctl start gasflux
监控和告警
#!/bin/bash
# 健康检查脚本
API_URL="http://localhost:5000"
# 检查健康状态
if ! curl -f -s "${API_URL}/health" > /dev/null; then
echo "API is unhealthy, sending alert..."
# 发送告警邮件、Slack 通知等
fi
# 检查队列长度
STATS=$(curl -s "${API_URL}/stats")
ACTIVE_TASKS=$(echo "$STATS" | jq '.summary.active_tasks')
if [ "$ACTIVE_TASKS" -gt 10 ]; then
echo "High task queue detected: $ACTIVE_TASKS active tasks"
# 发送告警
fi
📚 附录
支持的环境变量
| 变量名 | 默认值 | 描述 |
|---|---|---|
GASFLUX_HOST |
0.0.0.0 |
服务器监听地址 |
GASFLUX_PORT |
5000 |
服务器监听端口 |
GASFLUX_DEBUG |
false |
调试模式开关 |
GASFLUX_UPLOAD_FOLDER |
web_api_data/uploads |
上传文件存储目录 |
GASFLUX_OUTPUT_FOLDER |
web_api_data/outputs |
输出文件存储目录 |
GASFLUX_MAX_CONTENT_LENGTH |
104857600 |
最大文件大小 (字节) |
GASFLUX_LOG_LEVEL |
INFO |
日志级别 |
GASFLUX_LOG_FILE |
logs/gasflux_api.log |
日志文件路径 |
GASFLUX_CORS_ORIGINS |
["*"] |
允许的 CORS 源 |
GASFLUX_TASK_CLEANUP_INTERVAL |
3600 |
任务清理间隔 (秒) |
GASFLUX_MAX_TASK_AGE |
86400 |
任务最大年龄 (秒) |
GASFLUX_THREADS |
8 |
Waitress 线程数 |
GASFLUX_CONNECTION_LIMIT |
100 |
最大连接数 |
GASFLUX_CHANNEL_TIMEOUT |
300 |
通道超时 (秒) |
API 响应时间基准
/health: < 100ms/stats: < 500ms/config: < 200ms/upload: < 2s (文件处理时间)/task/{id}: < 300ms/download/{file}: 根据文件大小 (通常 < 5s)
文件格式规范
数据文件要求
- 格式: Excel (.xlsx 或 .xls)
- 必需列: latitude, longitude, height_ato, windspeed, winddir, temperature, pressure
- 可选列: ch4, co2, c2h6 等气体浓度
- 数据类型: 数值型 (float/int)
- 缺失值: NaN 或空值
配置文件格式
output_dir: ~/gasflux_reports
required_cols:
latitude: [-90, 90]
longitude: [-180, 180]
height_ato: [-200, 500]
windspeed: [0, 50]
winddir: [0, 360]
temperature: [-50, 60]
pressure: [900, 1100]
gases:
ch4: [1.5, 500]
co2: [300, 5000]
c2h6: [-0.5, 10]
strategies:
background: "algorithmic"
sensor: "insitu"
spatial: "curtain"
interpolation: "kriging"
最后更新: 2026年1月14日
GasFlux Web API 版本: 1.0.0
文档维护: API 开发团队