Files
HSI/batch_test_runner.py

432 lines
15 KiB
Python

#!/usr/bin/env python3
"""
高光谱分析工具包批量测试运行器
功能:
- 批量执行所有测试命令
- 支持按模块分组执行
- 提供详细的执行日志
- 统计测试结果
- 支持断点续跑
"""
import subprocess
import sys
import os
import time
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import argparse
class BatchTestRunner:
"""批量测试运行器"""
def __init__(self, output_dir: str = "test_results", log_file: str = "test_log.txt"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.log_file = self.output_dir / log_file
self.results_file = self.output_dir / "test_results.json"
# 测试命令分组
self.test_groups = {
"basic": [
"dim-reduction",
"segmentation",
"edge-detection"
],
"analysis": [
"anomaly-detection",
"classification",
"clustering",
"supervised-classification"
],
"features": [
"feature-selection",
"spectral-index",
"preprocessing",
"shape-features",
"glcm"
],
"color": [
"color-analysis",
"filtering",
"delta-e",
"spectral-to-color",
"xyz-to-rgb"
],
"advanced": [
"regression",
"regression-prediction",
"prosail-gui"
]
}
# 测试命令库
self.test_commands = self._load_test_commands()
# 执行状态
self.results = {
"start_time": None,
"end_time": None,
"total_tests": 0,
"passed": 0,
"failed": 0,
"skipped": 0,
"details": []
}
def _load_test_commands(self) -> Dict[str, List[str]]:
"""从 AUTOMATED_TESTS.md 加载测试命令"""
commands = {}
if not Path("AUTOMATED_TESTS.md").exists():
self.log("错误:找不到 AUTOMATED_TESTS.md 文件")
return commands
with open("AUTOMATED_TESTS.md", 'r', encoding='utf-8') as f:
lines = f.readlines()
current_module = None
current_commands = []
for line in lines:
line = line.strip()
# 检测模块标题
if line.startswith("### ") and "测试" in line:
if current_module and current_commands:
commands[current_module] = current_commands
# 提取模块名
if "降维分析" in line:
current_module = "dim-reduction"
elif "图像分割" in line:
current_module = "segmentation"
elif "边缘检测" in line:
current_module = "edge-detection"
elif "异常检测" in line:
current_module = "anomaly-detection"
elif "分类分析" in line:
current_module = "classification"
elif "聚类分析" in line:
current_module = "clustering"
elif "监督分类" in line:
current_module = "supervised-classification"
elif "特征选择" in line:
current_module = "feature-selection"
elif "光谱指数" in line:
current_module = "spectral-index"
elif "数据预处理" in line:
current_module = "preprocessing"
elif "形状特征" in line:
current_module = "shape-features"
elif "GLCM纹理特征" in line:
current_module = "glcm"
elif "颜色分析" in line:
current_module = "color-analysis"
elif "图像滤波" in line:
current_module = "filtering"
elif "回归分析" in line:
current_module = "regression"
elif "回归预测" in line:
current_module = "regression-prediction"
elif "色差计算" in line:
current_module = "delta-e"
elif "光谱到颜色转换" in line:
current_module = "spectral-to-color"
elif "XYZ到RGB转换" in line:
current_module = "xyz-to-rgb"
elif "PROSAIL模拟器GUI" in line:
current_module = "prosail-gui"
current_commands = []
# 收集命令
elif line.startswith("python main.py"):
current_commands.append(line)
# 保存最后一个模块
if current_module and current_commands:
commands[current_module] = current_commands
return commands
def log(self, message: str, level: str = "INFO"):
"""记录日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{timestamp}] [{level}] {message}"
print(log_message)
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_message + "\n")
def run_command(self, command: str, timeout: int = 300) -> Tuple[bool, str, str]:
"""执行单个命令"""
try:
self.log(f"执行命令: {command}")
# 记录开始时间
start_time = time.time()
# 执行命令
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
cwd=os.getcwd()
)
# 计算执行时间
execution_time = time.time() - start_time
success = result.returncode == 0
status = "成功" if success else "失败"
self.log(".2f")
if result.stdout:
self.log(f"命令输出:\n{result.stdout}")
if result.stderr:
self.log(f"错误输出:\n{result.stderr}", "ERROR")
return success, result.stdout, result.stderr
except subprocess.TimeoutExpired:
self.log(f"命令超时 (>{timeout}s): {command}", "ERROR")
return False, "", f"命令执行超时 (>{timeout}s)"
except Exception as e:
self.log(f"命令执行异常: {e}", "ERROR")
return False, "", str(e)
def run_module_tests(self, module_name: str, skip_on_failure: bool = False) -> Dict[str, any]:
"""运行单个模块的所有测试"""
if module_name not in self.test_commands:
self.log(f"未找到模块: {module_name}", "WARNING")
return {"module": module_name, "status": "not_found", "tests": []}
commands = self.test_commands[module_name]
self.log(f"\n开始测试模块: {module_name} ({len(commands)} 个测试)")
module_results = {
"module": module_name,
"status": "running",
"tests": [],
"passed": 0,
"failed": 0,
"skipped": 0
}
for i, command in enumerate(commands, 1):
self.log(f"\n执行测试 {i}/{len(commands)}: {command[:50]}...")
# 检查是否应该跳过(例如文件不存在)
if self._should_skip_command(command):
self.log("跳过测试(依赖文件不存在)")
module_results["tests"].append({
"command": command,
"status": "skipped",
"reason": "依赖文件不存在"
})
module_results["skipped"] += 1
continue
# 执行命令
success, stdout, stderr = self.run_command(command)
test_result = {
"command": command,
"status": "passed" if success else "failed",
"stdout": stdout,
"stderr": stderr,
"execution_time": None # 可以后续添加
}
module_results["tests"].append(test_result)
if success:
module_results["passed"] += 1
self.log("✓ 测试通过")
else:
module_results["failed"] += 1
self.log("✗ 测试失败")
if skip_on_failure:
self.log("由于失败,跳过该模块剩余测试")
break
# 更新模块状态
if module_results["failed"] > 0:
module_results["status"] = "failed"
elif module_results["passed"] > 0:
module_results["status"] = "passed"
else:
module_results["status"] = "skipped"
self.log(f"模块 {module_name} 测试完成: {module_results['passed']} 通过, {module_results['failed']} 失败, {module_results['skipped']} 跳过")
return module_results
def _should_skip_command(self, command: str) -> bool:
"""检查是否应该跳过某个命令"""
# 检查输入文件是否存在
if "--input" in command:
# 简单的文件存在性检查
parts = command.split()
try:
input_idx = parts.index("--input")
if input_idx + 1 < len(parts):
input_file = parts[input_idx + 1].strip('"')
if not Path(input_file).exists():
return True
except (ValueError, IndexError):
pass
return False
def run_group_tests(self, group_name: str, skip_on_failure: bool = False) -> Dict[str, any]:
"""运行一个组的所有模块测试"""
if group_name not in self.test_groups:
self.log(f"未找到测试组: {group_name}", "ERROR")
return {"group": group_name, "status": "not_found", "modules": []}
modules = self.test_groups[group_name]
self.log(f"\n{'='*50}")
self.log(f"开始测试组: {group_name} ({len(modules)} 个模块)")
group_results = {
"group": group_name,
"status": "running",
"modules": [],
"passed": 0,
"failed": 0,
"skipped": 0
}
for module in modules:
module_result = self.run_module_tests(module, skip_on_failure)
group_results["modules"].append(module_result)
group_results["passed"] += module_result["passed"]
group_results["failed"] += module_result["failed"]
group_results["skipped"] += module_result["skipped"]
# 更新组状态
if group_results["failed"] > 0:
group_results["status"] = "failed"
elif group_results["passed"] > 0:
group_results["status"] = "passed"
else:
group_results["status"] = "skipped"
self.log(f"测试组 {group_name} 完成: {group_results['passed']} 通过, {group_results['failed']} 失败, {group_results['skipped']} 跳过")
return group_results
def run_all_tests(self, skip_on_failure: bool = False) -> Dict[str, any]:
"""运行所有测试"""
self.log("="*60)
self.log("开始执行高光谱分析工具包自动化测试")
self.log("="*60)
self.results["start_time"] = datetime.now().isoformat()
all_results = {
"summary": self.results,
"groups": []
}
try:
for group_name in self.test_groups.keys():
group_result = self.run_group_tests(group_name, skip_on_failure)
all_results["groups"].append(group_result)
# 更新总计
self.results["passed"] += group_result["passed"]
self.results["failed"] += group_result["failed"]
self.results["skipped"] += group_result["skipped"]
self.results["details"].extend(group_result["modules"])
# 计算总数
self.results["total_tests"] = self.results["passed"] + self.results["failed"] + self.results["skipped"]
except KeyboardInterrupt:
self.log("测试被用户中断", "WARNING")
except Exception as e:
self.log(f"测试执行异常: {e}", "ERROR")
finally:
self.results["end_time"] = datetime.now().isoformat()
# 保存结果
with open(self.results_file, 'w', encoding='utf-8') as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
# 打印总结
self.print_summary(all_results)
return all_results
def print_summary(self, results: Dict[str, any]):
"""打印测试总结"""
self.log("\n" + "="*60)
self.log("测试执行总结")
self.log("="*60)
summary = results["summary"]
self.log(f"总测试数: {summary['total_tests']}")
self.log(f"通过: {summary['passed']}")
self.log(f"失败: {summary['failed']}")
self.log(f"跳过: {summary['skipped']}")
duration = datetime.fromisoformat(summary["end_time"]) - datetime.fromisoformat(summary["start_time"])
self.log(f"执行时间: {duration}")
success_rate = (summary['passed'] / summary['total_tests'] * 100) if summary['total_tests'] > 0 else 0
for group in results["groups"]:
status_icon = "" if group["status"] == "passed" else ("" if group["status"] == "failed" else "")
self.log(f" {status_icon} {group['group']}: {group['passed']} 通过, {group['failed']} 失败, {group['skipped']} 跳过")
self.log(f"\n详细结果已保存到: {self.results_file}")
self.log(f"执行日志已保存到: {self.log_file}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="高光谱分析工具包批量测试运行器")
parser.add_argument("--group", "-g", help="指定测试组 (basic, analysis, features, color, advanced)")
parser.add_argument("--module", "-m", help="指定测试模块")
parser.add_argument("--output-dir", "-o", default="test_results", help="输出目录")
parser.add_argument("--skip-on-failure", action="store_true", help="失败时跳过后续测试")
parser.add_argument("--list", action="store_true", help="列出所有可用的测试")
args = parser.parse_args()
runner = BatchTestRunner(args.output_dir)
if args.list:
print("可用的测试组:")
for group, modules in runner.test_groups.items():
print(f" {group}: {', '.join(modules)}")
print(f"\n共找到 {len(runner.test_commands)} 个测试模块")
for module, commands in runner.test_commands.items():
print(f" {module}: {len(commands)} 个测试")
return
if args.module:
# 运行单个模块
runner.run_module_tests(args.module)
elif args.group:
# 运行单个组
runner.run_group_tests(args.group, args.skip_on_failure)
else:
# 运行所有测试
runner.run_all_tests(args.skip_on_failure)
if __name__ == "__main__":
main()