fix: PanelFactory 信号风暴修复 + 后端上帝类肢解(BaseStepHandler/调度器/Step1打样)

This commit is contained in:
DXC
2026-06-17 17:48:40 +08:00
parent 39e8c29913
commit f6455b71ba
5 changed files with 588 additions and 4 deletions

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
步骤处理器包
将 WaterQualityInversionPipeline 的 14 个巨型 step* 方法
拆分为独立的 Handler 类,每个 Handler 实现 BaseStepHandler 接口。
调度器PipelineScheduler仅维护执行上下文并根据 step_key
从注册表查找对应 Handler 执行,自身不再包含任何算法逻辑。
"""
from src.core.handlers.base import BaseStepHandler, PipelineContext
from src.core.handlers.step1_water_mask import Step1WaterMaskHandler
__all__ = [
'BaseStepHandler',
'PipelineContext',
'Step1WaterMaskHandler',
]

277
src/core/handlers/base.py Normal file
View File

@ -0,0 +1,277 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Handler 基类与 Pipeline 执行上下文
BaseStepHandler —— 所有步骤 Handler 的抽象基类,定义统一的 execute 接口。
PipelineContext —— 在 Handler 之间传递的共享状态容器(路径、计时、回调等)。
设计原则:
- Handler 只负责"执行一个步骤的算法逻辑",不管理调度/依赖/跳过。
- Context 是 Handler 之间唯一的共享状态通道。
- 调度器PipelineScheduler负责遍历 config、查找 Handler、调用 execute。
"""
from __future__ import annotations
import time
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
class PipelineContext:
"""管道执行上下文 —— Handler 之间共享状态的唯一载体。
包含:
- 工作目录及子目录
- 中间结果路径water_mask_path, glint_mask_path, ...
- 步骤计时记录
- 回调函数(用于 GUI 进度通知)
- 可视化/报告生成器实例
"""
def __init__(self, work_dir: str = "./work_dir"):
self.work_dir = Path(work_dir)
self.work_dir.mkdir(parents=True, exist_ok=True)
# ── 子目录 ──
self.water_mask_dir = self.work_dir / "1_water_mask"
self.glint_dir = self.work_dir / "2_Glint_Detection"
self.deglint_dir = self.work_dir / "3_deglint"
self.processed_data_dir = self.work_dir / "5_Data_Cleaning"
self.training_spectra_dir = self.work_dir / "6_Spectral_Feature_Extraction"
self.indices_dir = self.work_dir / "7_Water_Quality_Indices"
self.models_dir = self.work_dir / "8_Supervised_Model_Training"
self.non_empirical_models_dir = self.work_dir / "8_Non_Empirical_Regression"
self.custom_regression_dir = self.work_dir / "13_Custom_Regression"
self.sampling_dir = self.work_dir / "4_sampling"
self.prediction_dir = self.work_dir / "11_12_13_predictions"
self.visualization_dir = self.work_dir / "14_visualization"
self.reports_dir = self.work_dir / "reports"
for d in [self.water_mask_dir, self.glint_dir, self.deglint_dir,
self.processed_data_dir, self.training_spectra_dir,
self.indices_dir, self.models_dir, self.non_empirical_models_dir,
self.custom_regression_dir, self.sampling_dir, self.prediction_dir,
self.visualization_dir, self.reports_dir]:
d.mkdir(parents=True, exist_ok=True)
# ── 中间结果路径 ──
self.water_mask_path: Optional[str] = None
self.glint_mask_path: Optional[str] = None
self.interpolated_img_path: Optional[str] = None
self.deglint_img_path: Optional[str] = None
self.processed_csv_path: Optional[str] = None
self.training_csv_path: Optional[str] = None
self.indices_path: Optional[str] = None
self.custom_regression_path: Optional[str] = None
# ── 计时 ──
self.step_timings: Dict[str, dict] = {}
self.pipeline_start_time: Optional[float] = None
self.pipeline_end_time: Optional[float] = None
# ── 回调 ──
self._callback: Optional[Callable] = None
# ── 可视化组件(延迟导入避免循环依赖)──
self._visualizer = None
self._report_generator = None
self._scatter_batch = None
# ── matplotlib 中文字体 ──
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei',
'DejaVu Sans', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
# ═══════════════════════════════════════════════════════════
# 回调
# ═══════════════════════════════════════════════════════════
def set_callback(self, callback: Callable):
"""设置回调函数,用于向 GUI 报告进度。
Args:
callback: 签名为 callback(step_name, status, message="")
status: 'start' | 'completed' | 'skipped' | 'error' | 'info' | 'warning'
"""
self._callback = callback
def notify(self, step_name: str, status: str, message: str = ""):
"""通知回调函数。"""
if self._callback:
try:
self._callback(step_name, status, message)
except Exception as e:
print(f"回调函数执行失败: {e}")
# ═══════════════════════════════════════════════════════════
# 计时
# ═══════════════════════════════════════════════════════════
def record_step_time(self, step_name: str, start_time: float, end_time: float,
status: str = "completed", error: Optional[str] = None):
elapsed = end_time - start_time
self.step_timings[step_name] = {
'start_time': datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'),
'end_time': datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'),
'elapsed_seconds': elapsed,
'elapsed_formatted': self._format_time(elapsed),
'status': status,
'error': error,
}
@staticmethod
def _format_time(seconds: float) -> str:
if seconds < 60:
return f"{seconds:.2f}"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = seconds % 60
return f"{minutes}{secs:.2f}"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours}小时{minutes}{secs:.2f}"
# ═══════════════════════════════════════════════════════════
# 可视化组件(延迟导入)
# ═══════════════════════════════════════════════════════════
@property
def visualizer(self):
if self._visualizer is None:
from src.postprocessing.visualization_reports import WaterQualityVisualization
self._visualizer = WaterQualityVisualization(str(self.visualization_dir))
return self._visualizer
@property
def report_generator(self):
if self._report_generator is None:
from src.postprocessing.visualization_reports import ReportGenerator
self._report_generator = ReportGenerator(str(self.reports_dir))
return self._report_generator
@property
def scatter_batch(self):
if self._scatter_batch is None:
from src.core.prediction.sctter_batch import WaterQualityScatterBatch
self._scatter_batch = WaterQualityScatterBatch()
return self._scatter_batch
# ═══════════════════════════════════════════════════════════
# 步骤输出目录查找(兼容旧接口)
# ═══════════════════════════════════════════════════════════
_STEP_OUTPUT_DIR_MAP: Optional[Dict[str, Path]] = None
def _ensure_step_dir_map(self) -> Dict[str, Path]:
if PipelineContext._STEP_OUTPUT_DIR_MAP is not None:
return PipelineContext._STEP_OUTPUT_DIR_MAP
wp = self.work_dir
m = {
'step1': wp / '1_water_mask',
'step2': wp / '2_Glint_Detection',
'step3': wp / '3_deglint',
'step4_sampling': wp / '4_sampling',
'step5_clean': wp / '5_Data_Cleaning',
'step6_feature': wp / '6_Spectral_Feature_Extraction',
'step7_index': wp / '7_Water_Quality_Indices',
'step8_ml_train': wp / '8_Supervised_Model_Training',
'step9_ml_predict': wp / '8_Non_Empirical_Regression',
'step10_watercolor': wp / '10_WaterIndex_Images',
'step11_map': wp / '14_visualization',
'step12_viz': wp / '14_visualization',
'step13_report': wp / '14_visualization',
'step11_predictions': wp / '11_12_13_predictions',
'step12_predictions': wp / '11_12_13_predictions',
'step13_predictions': wp / '11_12_13_predictions',
'custom_regression': wp / '13_Custom_Regression',
'prediction_dir': wp / '11_12_13_predictions',
'visualization': wp / '14_visualization',
'reports': wp / 'reports',
'step8': wp / '8_Supervised_Model_Training',
'step9': wp / '8_Non_Empirical_Regression',
'step10': wp / '10_WaterIndex_Images',
'step11': wp / '11_12_13_predictions',
'step12': wp / '13_Custom_Regression',
'step13': wp / 'reports',
'step14': wp / '14_visualization',
}
PipelineContext._STEP_OUTPUT_DIR_MAP = m
return m
def get_step_output_dir(self, step_name: str) -> Path:
mapping = self._ensure_step_dir_map()
key = (step_name or '').strip()
if key in mapping:
return mapping[key]
print(f"[PipelineContext.get_step_output_dir] 未知 step_name={key!r},回退到 work_dir")
return self.work_dir
class BaseStepHandler(ABC):
"""步骤处理器抽象基类。
所有步骤 Handler 必须实现:
- step_key: 类属性,对应 config 中的 key'step1', 'step2', ...
- execute(context, config): 执行步骤逻辑,返回结果字典
用法示例::
class Step1WaterMaskHandler(BaseStepHandler):
step_key = 'step1'
def execute(self, ctx, config):
result = WaterMaskStep.run(...)
ctx.water_mask_path = result
return {'water_mask_path': result}
"""
# 子类必须定义:对应 config 字典中的 key
step_key: str = None
@abstractmethod
def execute(self, context: PipelineContext, config: dict) -> dict:
"""执行步骤逻辑。
Args:
context: 管道执行上下文(共享状态)
config: 该步骤的配置字典(即 config[self.step_key]
Returns:
结果字典,包含该步骤产生的输出路径等信息。
调度器会将返回值合并到全局结果中。
Raises:
Exception: 任何异常都会由调度器捕获并记录。
"""
...
def _resolve_path(self, explicit: Optional[str], fallback: Optional[str],
label: str = "path") -> Optional[str]:
"""解析路径:优先使用显式传入值,否则回退到上下文中的缓存值。
Args:
explicit: 调用方显式传入的路径
fallback: 上下文中的缓存路径
label: 用于日志的标签
Returns:
解析后的路径,若两者均为 None 则返回 None
"""
if explicit is not None:
return explicit
if fallback is not None:
return fallback
return None

View File

@ -0,0 +1,199 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
极简管道调度器
替代原 WaterQualityInversionPipeline2598 行上帝类)的调度核心。
调度器自身不包含任何算法逻辑,仅负责:
1. 维护 PipelineContext共享状态
2. 根据 config key 从 Handler 注册表查找对应处理器
3. 按序调用 handler.execute(ctx, config),收集结果
4. 异常时记录错误并继续(或中止,取决于配置)
Handler 注册表是 step_key → BaseStepHandler 的映射。
新增步骤只需:写一个 Handler 类 + 在注册表中加一行。
"""
from __future__ import annotations
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from src.core.handlers.base import BaseStepHandler, PipelineContext
class PipelineScheduler:
"""极简管道调度器。
用法::
scheduler = PipelineScheduler(work_dir="./work_dir")
scheduler.register_handler(Step1WaterMaskHandler())
scheduler.register_handler(Step2GlintDetectionHandler())
# ... 注册所有步骤 ...
scheduler.set_callback(my_callback) # 可选GUI 进度回调
result = scheduler.run_full_pipeline(config)
# result['step1'] → {'water_mask_path': ...}
# result['step2'] → {'glint_mask_path': ...}
# ...
"""
def __init__(self, work_dir: str = "./work_dir"):
self.ctx = PipelineContext(work_dir)
self._handlers: Dict[str, BaseStepHandler] = {}
# ═══════════════════════════════════════════════════════════
# Handler 注册
# ═══════════════════════════════════════════════════════════
def register_handler(self, handler: BaseStepHandler):
"""注册一个步骤处理器。
Args:
handler: BaseStepHandler 实例(其 step_key 类属性决定 config 中的 key
"""
if handler.step_key is None:
raise ValueError(
f"Handler {type(handler).__name__} 未定义 step_key 类属性"
)
self._handlers[handler.step_key] = handler
def register_handlers(self, handlers: List[BaseStepHandler]):
"""批量注册步骤处理器。"""
for h in handlers:
self.register_handler(h)
# ═══════════════════════════════════════════════════════════
# 回调
# ═══════════════════════════════════════════════════════════
def set_callback(self, callback: Callable):
"""设置 GUI 进度回调,代理到 PipelineContext。"""
self.ctx.set_callback(callback)
# ═══════════════════════════════════════════════════════════
# 单步执行
# ═══════════════════════════════════════════════════════════
def run_step(self, step_key: str, config: dict) -> Dict[str, Any]:
"""执行单个步骤。
Args:
step_key: 步骤 key'step1', 'step2', ...
config: 该步骤的配置字典
Returns:
步骤执行结果字典
Raises:
KeyError: 如果 step_key 未注册 Handler
Exception: 步骤执行中的任何异常
"""
handler = self._handlers.get(step_key)
if handler is None:
raise KeyError(
f"未注册的步骤: {step_key!r}"
f"已注册: {list(self._handlers.keys())}"
)
self.ctx.notify(handler.step_key, 'start')
result = handler.execute(self.ctx, config)
self.ctx.notify(handler.step_key, 'completed')
return result
# ═══════════════════════════════════════════════════════════
# 全流程执行
# ═══════════════════════════════════════════════════════════
def run_full_pipeline(self, config: Dict[str, dict]) -> Dict[str, Any]:
"""按 config 中的 key 顺序执行全流程。
遍历 config 的顶层 key对每个 key
- 如果已注册 Handler → 执行并收集结果
- 如果未注册 → 跳过并通知
- 如果执行失败 → 记录错误,继续执行后续步骤(不中止)
Args:
config: 全流程配置字典,格式为 {step_key: step_config, ...}
例如: {'step1': {...}, 'step2': {...}, ...}
Returns:
{
'step_results': {step_key: result_dict, ...},
'step_timings': {...},
'total_elapsed': float,
'errors': {step_key: error_message, ...},
}
"""
self.ctx.pipeline_start_time = time.time()
step_results: Dict[str, Any] = {}
errors: Dict[str, str] = {}
# 按 config 中的顺序遍历Python 3.7+ dict 保序)
for step_key, step_config in config.items():
handler = self._handlers.get(step_key)
if handler is None:
self.ctx.notify(step_key, 'skipped', '未注册 Handler')
continue
try:
result = handler.execute(self.ctx, step_config)
step_results[step_key] = result
self.ctx.notify(step_key, 'completed', str(result))
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"
errors[step_key] = error_msg
step_results[step_key] = {'error': error_msg}
self.ctx.notify(step_key, 'error', error_msg)
# 不中止,继续执行后续步骤
self.ctx.pipeline_end_time = time.time()
total_elapsed = self.ctx.pipeline_end_time - self.ctx.pipeline_start_time
return {
'step_results': step_results,
'step_timings': self.ctx.step_timings,
'total_elapsed': total_elapsed,
'total_elapsed_formatted': self.ctx._format_time(total_elapsed),
'errors': errors,
}
# ═══════════════════════════════════════════════════════════
# 便捷属性(代理到 PipelineContext
# ═══════════════════════════════════════════════════════════
@property
def work_dir(self) -> Path:
return self.ctx.work_dir
@property
def water_mask_path(self) -> Optional[str]:
return self.ctx.water_mask_path
@property
def glint_mask_path(self) -> Optional[str]:
return self.ctx.glint_mask_path
@property
def deglint_img_path(self) -> Optional[str]:
return self.ctx.deglint_img_path
@property
def processed_csv_path(self) -> Optional[str]:
return self.ctx.processed_csv_path
@property
def training_csv_path(self) -> Optional[str]:
return self.ctx.training_csv_path
@property
def indices_path(self) -> Optional[str]:
return self.ctx.indices_path
def get_step_output_dir(self, step_name: str) -> Path:
return self.ctx.get_step_output_dir(step_name)

View File

@ -0,0 +1,83 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Step1 处理器:水域掩膜生成
将原 WaterQualityInversionPipeline.step1_generate_water_mask() 方法
剥离为独立的 Step1WaterMaskHandler。
这是 14 个步骤 Handler 的**打样模板**,其余步骤照此模式拆分:
1. 继承 BaseStepHandler设置 step_key 类属性
2. 实现 execute(ctx, config) → 调用对应 Step 类的静态方法
3. 将输出路径写入 ctx上下文共享
4. 记录步骤耗时
5. 返回结果字典
"""
import time
from typing import Any, Dict
from src.core.handlers.base import BaseStepHandler, PipelineContext
from src.core.steps.water_mask_step import WaterMaskStep
class Step1WaterMaskHandler(BaseStepHandler):
"""步骤1水域掩膜生成。
对应 config key: 'step1'
委托类: WaterMaskStep.run()
用法::
handler = Step1WaterMaskHandler()
result = handler.execute(ctx, config['step1'])
# ctx.water_mask_path 已被更新
"""
step_key = 'step1'
def execute(self, context: PipelineContext, config: dict) -> Dict[str, Any]:
"""执行水域掩膜生成。
config 可包含的键(全部透传给 WaterMaskStep.run()
- mask_path: 水体掩膜文件路径(.shp / .dat / .tif
- img_path: 输入影像路径shp 栅格化或 NDWI 时需要)
- ndwi_threshold: NDWI 阈值(默认 0.4
- use_ndwi: 是否使用 NDWI 方法(默认 False
- generate_png: 是否生成 PNG 预览(默认 True
- output_path: 指定输出路径(可选)
Returns:
{'water_mask_path': str}
"""
step_start_time = time.time()
try:
result = WaterMaskStep.run(
mask_path=config.get('mask_path'),
img_path=config.get('img_path'),
ndwi_threshold=config.get('ndwi_threshold', 0.4),
use_ndwi=config.get('use_ndwi', False),
generate_png=config.get('generate_png', True),
output_path=config.get('output_path'),
water_mask_dir=str(context.water_mask_dir),
callback=context.notify,
)
# 将输出路径写入上下文,供后续步骤使用
context.water_mask_path = result
step_end_time = time.time()
context.record_step_time(
"步骤1: 水域掩膜生成", step_start_time, step_end_time
)
return {'water_mask_path': result}
except Exception as e:
step_end_time = time.time()
context.record_step_time(
"步骤1: 水域掩膜生成", step_start_time, step_end_time,
status="failed", error=str(e)
)
raise

View File

@ -162,14 +162,19 @@ class PanelFactory:
scroll.setWidget(panel)
scroll.setWidgetResizable(True)
# 替换占位页
# 替换占位页blockSignals 阻断 removeTab/insertTab/setCurrentIndex 触发的
# currentChanged 信号风暴,防止 _on_tab_changed → _ensure_loaded 无限递归)
placeholder = self._placeholders.get(tab_index)
if placeholder is not None and self._tab_widget is not None:
tab_title = self._tab_widget.tabText(tab_index)
tab_icon = self._tab_widget.tabIcon(tab_index)
self._tab_widget.removeTab(tab_index)
self._tab_widget.insertTab(tab_index, scroll, tab_icon, tab_title)
self._tab_widget.setCurrentIndex(tab_index)
self._tab_widget.blockSignals(True)
try:
self._tab_widget.removeTab(tab_index)
self._tab_widget.insertTab(tab_index, scroll, tab_icon, tab_title)
self._tab_widget.setCurrentIndex(tab_index)
finally:
self._tab_widget.blockSignals(False)
# 注册
self._panels[step_id] = panel