From f6455b71bac12dd9bff0674f1d8d923e6c3df048 Mon Sep 17 00:00:00 2001 From: DXC Date: Wed, 17 Jun 2026 17:48:40 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20PanelFactory=20=E4=BF=A1=E5=8F=B7?= =?UTF-8?q?=E9=A3=8E=E6=9A=B4=E4=BF=AE=E5=A4=8D=20+=20=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E4=B8=8A=E5=B8=9D=E7=B1=BB=E8=82=A2=E8=A7=A3=EF=BC=88BaseStepH?= =?UTF-8?q?andler/=E8=B0=83=E5=BA=A6=E5=99=A8/Step1=E6=89=93=E6=A0=B7?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/handlers/__init__.py | 20 ++ src/core/handlers/base.py | 277 ++++++++++++++++++++++++ src/core/handlers/pipeline_scheduler.py | 199 +++++++++++++++++ src/core/handlers/step1_water_mask.py | 83 +++++++ src/gui/core/panel_factory.py | 13 +- 5 files changed, 588 insertions(+), 4 deletions(-) create mode 100644 src/core/handlers/__init__.py create mode 100644 src/core/handlers/base.py create mode 100644 src/core/handlers/pipeline_scheduler.py create mode 100644 src/core/handlers/step1_water_mask.py diff --git a/src/core/handlers/__init__.py b/src/core/handlers/__init__.py new file mode 100644 index 0000000..0e9d3d6 --- /dev/null +++ b/src/core/handlers/__init__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +步骤处理器包 + +将 WaterQualityInversionPipeline 的 14 个巨型 step* 方法 +拆分为独立的 Handler 类,每个 Handler 实现 BaseStepHandler 接口。 + +调度器(PipelineScheduler)仅维护执行上下文并根据 step_key +从注册表查找对应 Handler 执行,自身不再包含任何算法逻辑。 +""" + +from src.core.handlers.base import BaseStepHandler, PipelineContext +from src.core.handlers.step1_water_mask import Step1WaterMaskHandler + +__all__ = [ + 'BaseStepHandler', + 'PipelineContext', + 'Step1WaterMaskHandler', +] diff --git a/src/core/handlers/base.py b/src/core/handlers/base.py new file mode 100644 index 0000000..4ffdaab --- /dev/null +++ b/src/core/handlers/base.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Handler 基类与 Pipeline 执行上下文 + +BaseStepHandler —— 所有步骤 Handler 的抽象基类,定义统一的 execute 接口。 +PipelineContext —— 在 Handler 之间传递的共享状态容器(路径、计时、回调等)。 + +设计原则: +- Handler 只负责"执行一个步骤的算法逻辑",不管理调度/依赖/跳过。 +- Context 是 Handler 之间唯一的共享状态通道。 +- 调度器(PipelineScheduler)负责遍历 config、查找 Handler、调用 execute。 +""" + +from __future__ import annotations + +import time +from abc import ABC, abstractmethod +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import warnings + +warnings.filterwarnings('ignore') + + +class PipelineContext: + """管道执行上下文 —— Handler 之间共享状态的唯一载体。 + + 包含: + - 工作目录及子目录 + - 中间结果路径(water_mask_path, glint_mask_path, ...) + - 步骤计时记录 + - 回调函数(用于 GUI 进度通知) + - 可视化/报告生成器实例 + """ + + def __init__(self, work_dir: str = "./work_dir"): + self.work_dir = Path(work_dir) + self.work_dir.mkdir(parents=True, exist_ok=True) + + # ── 子目录 ── + self.water_mask_dir = self.work_dir / "1_water_mask" + self.glint_dir = self.work_dir / "2_Glint_Detection" + self.deglint_dir = self.work_dir / "3_deglint" + self.processed_data_dir = self.work_dir / "5_Data_Cleaning" + self.training_spectra_dir = self.work_dir / "6_Spectral_Feature_Extraction" + self.indices_dir = self.work_dir / "7_Water_Quality_Indices" + self.models_dir = self.work_dir / "8_Supervised_Model_Training" + self.non_empirical_models_dir = self.work_dir / "8_Non_Empirical_Regression" + self.custom_regression_dir = self.work_dir / "13_Custom_Regression" + self.sampling_dir = self.work_dir / "4_sampling" + self.prediction_dir = self.work_dir / "11_12_13_predictions" + self.visualization_dir = self.work_dir / "14_visualization" + self.reports_dir = self.work_dir / "reports" + + for d in [self.water_mask_dir, self.glint_dir, self.deglint_dir, + self.processed_data_dir, self.training_spectra_dir, + self.indices_dir, self.models_dir, self.non_empirical_models_dir, + self.custom_regression_dir, self.sampling_dir, self.prediction_dir, + self.visualization_dir, self.reports_dir]: + d.mkdir(parents=True, exist_ok=True) + + # ── 中间结果路径 ── + self.water_mask_path: Optional[str] = None + self.glint_mask_path: Optional[str] = None + self.interpolated_img_path: Optional[str] = None + self.deglint_img_path: Optional[str] = None + self.processed_csv_path: Optional[str] = None + self.training_csv_path: Optional[str] = None + self.indices_path: Optional[str] = None + self.custom_regression_path: Optional[str] = None + + # ── 计时 ── + self.step_timings: Dict[str, dict] = {} + self.pipeline_start_time: Optional[float] = None + self.pipeline_end_time: Optional[float] = None + + # ── 回调 ── + self._callback: Optional[Callable] = None + + # ── 可视化组件(延迟导入避免循环依赖)── + self._visualizer = None + self._report_generator = None + self._scatter_batch = None + + # ── matplotlib 中文字体 ── + plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', + 'DejaVu Sans', 'Arial Unicode MS'] + plt.rcParams['axes.unicode_minus'] = False + + # ═══════════════════════════════════════════════════════════ + # 回调 + # ═══════════════════════════════════════════════════════════ + + def set_callback(self, callback: Callable): + """设置回调函数,用于向 GUI 报告进度。 + + Args: + callback: 签名为 callback(step_name, status, message="") + status: 'start' | 'completed' | 'skipped' | 'error' | 'info' | 'warning' + """ + self._callback = callback + + def notify(self, step_name: str, status: str, message: str = ""): + """通知回调函数。""" + if self._callback: + try: + self._callback(step_name, status, message) + except Exception as e: + print(f"回调函数执行失败: {e}") + + # ═══════════════════════════════════════════════════════════ + # 计时 + # ═══════════════════════════════════════════════════════════ + + def record_step_time(self, step_name: str, start_time: float, end_time: float, + status: str = "completed", error: Optional[str] = None): + elapsed = end_time - start_time + self.step_timings[step_name] = { + 'start_time': datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'), + 'end_time': datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'), + 'elapsed_seconds': elapsed, + 'elapsed_formatted': self._format_time(elapsed), + 'status': status, + 'error': error, + } + + @staticmethod + def _format_time(seconds: float) -> str: + if seconds < 60: + return f"{seconds:.2f}秒" + elif seconds < 3600: + minutes = int(seconds // 60) + secs = seconds % 60 + return f"{minutes}分{secs:.2f}秒" + else: + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = seconds % 60 + return f"{hours}小时{minutes}分{secs:.2f}秒" + + # ═══════════════════════════════════════════════════════════ + # 可视化组件(延迟导入) + # ═══════════════════════════════════════════════════════════ + + @property + def visualizer(self): + if self._visualizer is None: + from src.postprocessing.visualization_reports import WaterQualityVisualization + self._visualizer = WaterQualityVisualization(str(self.visualization_dir)) + return self._visualizer + + @property + def report_generator(self): + if self._report_generator is None: + from src.postprocessing.visualization_reports import ReportGenerator + self._report_generator = ReportGenerator(str(self.reports_dir)) + return self._report_generator + + @property + def scatter_batch(self): + if self._scatter_batch is None: + from src.core.prediction.sctter_batch import WaterQualityScatterBatch + self._scatter_batch = WaterQualityScatterBatch() + return self._scatter_batch + + # ═══════════════════════════════════════════════════════════ + # 步骤输出目录查找(兼容旧接口) + # ═══════════════════════════════════════════════════════════ + + _STEP_OUTPUT_DIR_MAP: Optional[Dict[str, Path]] = None + + def _ensure_step_dir_map(self) -> Dict[str, Path]: + if PipelineContext._STEP_OUTPUT_DIR_MAP is not None: + return PipelineContext._STEP_OUTPUT_DIR_MAP + wp = self.work_dir + m = { + 'step1': wp / '1_water_mask', + 'step2': wp / '2_Glint_Detection', + 'step3': wp / '3_deglint', + 'step4_sampling': wp / '4_sampling', + 'step5_clean': wp / '5_Data_Cleaning', + 'step6_feature': wp / '6_Spectral_Feature_Extraction', + 'step7_index': wp / '7_Water_Quality_Indices', + 'step8_ml_train': wp / '8_Supervised_Model_Training', + 'step9_ml_predict': wp / '8_Non_Empirical_Regression', + 'step10_watercolor': wp / '10_WaterIndex_Images', + 'step11_map': wp / '14_visualization', + 'step12_viz': wp / '14_visualization', + 'step13_report': wp / '14_visualization', + 'step11_predictions': wp / '11_12_13_predictions', + 'step12_predictions': wp / '11_12_13_predictions', + 'step13_predictions': wp / '11_12_13_predictions', + 'custom_regression': wp / '13_Custom_Regression', + 'prediction_dir': wp / '11_12_13_predictions', + 'visualization': wp / '14_visualization', + 'reports': wp / 'reports', + 'step8': wp / '8_Supervised_Model_Training', + 'step9': wp / '8_Non_Empirical_Regression', + 'step10': wp / '10_WaterIndex_Images', + 'step11': wp / '11_12_13_predictions', + 'step12': wp / '13_Custom_Regression', + 'step13': wp / 'reports', + 'step14': wp / '14_visualization', + } + PipelineContext._STEP_OUTPUT_DIR_MAP = m + return m + + def get_step_output_dir(self, step_name: str) -> Path: + mapping = self._ensure_step_dir_map() + key = (step_name or '').strip() + if key in mapping: + return mapping[key] + print(f"[PipelineContext.get_step_output_dir] 未知 step_name={key!r},回退到 work_dir") + return self.work_dir + + +class BaseStepHandler(ABC): + """步骤处理器抽象基类。 + + 所有步骤 Handler 必须实现: + - step_key: 类属性,对应 config 中的 key(如 'step1', 'step2', ...) + - execute(context, config): 执行步骤逻辑,返回结果字典 + + 用法示例:: + + class Step1WaterMaskHandler(BaseStepHandler): + step_key = 'step1' + + def execute(self, ctx, config): + result = WaterMaskStep.run(...) + ctx.water_mask_path = result + return {'water_mask_path': result} + """ + + # 子类必须定义:对应 config 字典中的 key + step_key: str = None + + @abstractmethod + def execute(self, context: PipelineContext, config: dict) -> dict: + """执行步骤逻辑。 + + Args: + context: 管道执行上下文(共享状态) + config: 该步骤的配置字典(即 config[self.step_key]) + + Returns: + 结果字典,包含该步骤产生的输出路径等信息。 + 调度器会将返回值合并到全局结果中。 + + Raises: + Exception: 任何异常都会由调度器捕获并记录。 + """ + ... + + def _resolve_path(self, explicit: Optional[str], fallback: Optional[str], + label: str = "path") -> Optional[str]: + """解析路径:优先使用显式传入值,否则回退到上下文中的缓存值。 + + Args: + explicit: 调用方显式传入的路径 + fallback: 上下文中的缓存路径 + label: 用于日志的标签 + + Returns: + 解析后的路径,若两者均为 None 则返回 None + """ + if explicit is not None: + return explicit + if fallback is not None: + return fallback + return None diff --git a/src/core/handlers/pipeline_scheduler.py b/src/core/handlers/pipeline_scheduler.py new file mode 100644 index 0000000..a4a1803 --- /dev/null +++ b/src/core/handlers/pipeline_scheduler.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +极简管道调度器 + +替代原 WaterQualityInversionPipeline(2598 行上帝类)的调度核心。 +调度器自身不包含任何算法逻辑,仅负责: +1. 维护 PipelineContext(共享状态) +2. 根据 config key 从 Handler 注册表查找对应处理器 +3. 按序调用 handler.execute(ctx, config),收集结果 +4. 异常时记录错误并继续(或中止,取决于配置) + +Handler 注册表是 step_key → BaseStepHandler 的映射。 +新增步骤只需:写一个 Handler 类 + 在注册表中加一行。 +""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +from src.core.handlers.base import BaseStepHandler, PipelineContext + + +class PipelineScheduler: + """极简管道调度器。 + + 用法:: + + scheduler = PipelineScheduler(work_dir="./work_dir") + scheduler.register_handler(Step1WaterMaskHandler()) + scheduler.register_handler(Step2GlintDetectionHandler()) + # ... 注册所有步骤 ... + + scheduler.set_callback(my_callback) # 可选:GUI 进度回调 + + result = scheduler.run_full_pipeline(config) + # result['step1'] → {'water_mask_path': ...} + # result['step2'] → {'glint_mask_path': ...} + # ... + """ + + def __init__(self, work_dir: str = "./work_dir"): + self.ctx = PipelineContext(work_dir) + self._handlers: Dict[str, BaseStepHandler] = {} + + # ═══════════════════════════════════════════════════════════ + # Handler 注册 + # ═══════════════════════════════════════════════════════════ + + def register_handler(self, handler: BaseStepHandler): + """注册一个步骤处理器。 + + Args: + handler: BaseStepHandler 实例(其 step_key 类属性决定 config 中的 key) + """ + if handler.step_key is None: + raise ValueError( + f"Handler {type(handler).__name__} 未定义 step_key 类属性" + ) + self._handlers[handler.step_key] = handler + + def register_handlers(self, handlers: List[BaseStepHandler]): + """批量注册步骤处理器。""" + for h in handlers: + self.register_handler(h) + + # ═══════════════════════════════════════════════════════════ + # 回调 + # ═══════════════════════════════════════════════════════════ + + def set_callback(self, callback: Callable): + """设置 GUI 进度回调,代理到 PipelineContext。""" + self.ctx.set_callback(callback) + + # ═══════════════════════════════════════════════════════════ + # 单步执行 + # ═══════════════════════════════════════════════════════════ + + def run_step(self, step_key: str, config: dict) -> Dict[str, Any]: + """执行单个步骤。 + + Args: + step_key: 步骤 key(如 'step1', 'step2', ...) + config: 该步骤的配置字典 + + Returns: + 步骤执行结果字典 + + Raises: + KeyError: 如果 step_key 未注册 Handler + Exception: 步骤执行中的任何异常 + """ + handler = self._handlers.get(step_key) + if handler is None: + raise KeyError( + f"未注册的步骤: {step_key!r}。" + f"已注册: {list(self._handlers.keys())}" + ) + + self.ctx.notify(handler.step_key, 'start') + result = handler.execute(self.ctx, config) + self.ctx.notify(handler.step_key, 'completed') + return result + + # ═══════════════════════════════════════════════════════════ + # 全流程执行 + # ═══════════════════════════════════════════════════════════ + + def run_full_pipeline(self, config: Dict[str, dict]) -> Dict[str, Any]: + """按 config 中的 key 顺序执行全流程。 + + 遍历 config 的顶层 key,对每个 key: + - 如果已注册 Handler → 执行并收集结果 + - 如果未注册 → 跳过并通知 + - 如果执行失败 → 记录错误,继续执行后续步骤(不中止) + + Args: + config: 全流程配置字典,格式为 {step_key: step_config, ...} + 例如: {'step1': {...}, 'step2': {...}, ...} + + Returns: + { + 'step_results': {step_key: result_dict, ...}, + 'step_timings': {...}, + 'total_elapsed': float, + 'errors': {step_key: error_message, ...}, + } + """ + self.ctx.pipeline_start_time = time.time() + + step_results: Dict[str, Any] = {} + errors: Dict[str, str] = {} + + # 按 config 中的顺序遍历(Python 3.7+ dict 保序) + for step_key, step_config in config.items(): + handler = self._handlers.get(step_key) + + if handler is None: + self.ctx.notify(step_key, 'skipped', '未注册 Handler') + continue + + try: + result = handler.execute(self.ctx, step_config) + step_results[step_key] = result + self.ctx.notify(step_key, 'completed', str(result)) + except Exception as e: + error_msg = f"{type(e).__name__}: {e}" + errors[step_key] = error_msg + step_results[step_key] = {'error': error_msg} + self.ctx.notify(step_key, 'error', error_msg) + # 不中止,继续执行后续步骤 + + self.ctx.pipeline_end_time = time.time() + total_elapsed = self.ctx.pipeline_end_time - self.ctx.pipeline_start_time + + return { + 'step_results': step_results, + 'step_timings': self.ctx.step_timings, + 'total_elapsed': total_elapsed, + 'total_elapsed_formatted': self.ctx._format_time(total_elapsed), + 'errors': errors, + } + + # ═══════════════════════════════════════════════════════════ + # 便捷属性(代理到 PipelineContext) + # ═══════════════════════════════════════════════════════════ + + @property + def work_dir(self) -> Path: + return self.ctx.work_dir + + @property + def water_mask_path(self) -> Optional[str]: + return self.ctx.water_mask_path + + @property + def glint_mask_path(self) -> Optional[str]: + return self.ctx.glint_mask_path + + @property + def deglint_img_path(self) -> Optional[str]: + return self.ctx.deglint_img_path + + @property + def processed_csv_path(self) -> Optional[str]: + return self.ctx.processed_csv_path + + @property + def training_csv_path(self) -> Optional[str]: + return self.ctx.training_csv_path + + @property + def indices_path(self) -> Optional[str]: + return self.ctx.indices_path + + def get_step_output_dir(self, step_name: str) -> Path: + return self.ctx.get_step_output_dir(step_name) diff --git a/src/core/handlers/step1_water_mask.py b/src/core/handlers/step1_water_mask.py new file mode 100644 index 0000000..5b8f6a8 --- /dev/null +++ b/src/core/handlers/step1_water_mask.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Step1 处理器:水域掩膜生成 + +将原 WaterQualityInversionPipeline.step1_generate_water_mask() 方法 +剥离为独立的 Step1WaterMaskHandler。 + +这是 14 个步骤 Handler 的**打样模板**,其余步骤照此模式拆分: +1. 继承 BaseStepHandler,设置 step_key 类属性 +2. 实现 execute(ctx, config) → 调用对应 Step 类的静态方法 +3. 将输出路径写入 ctx(上下文共享) +4. 记录步骤耗时 +5. 返回结果字典 +""" + +import time +from typing import Any, Dict + +from src.core.handlers.base import BaseStepHandler, PipelineContext +from src.core.steps.water_mask_step import WaterMaskStep + + +class Step1WaterMaskHandler(BaseStepHandler): + """步骤1:水域掩膜生成。 + + 对应 config key: 'step1' + 委托类: WaterMaskStep.run() + + 用法:: + + handler = Step1WaterMaskHandler() + result = handler.execute(ctx, config['step1']) + # ctx.water_mask_path 已被更新 + """ + + step_key = 'step1' + + def execute(self, context: PipelineContext, config: dict) -> Dict[str, Any]: + """执行水域掩膜生成。 + + config 可包含的键(全部透传给 WaterMaskStep.run()): + - mask_path: 水体掩膜文件路径(.shp / .dat / .tif) + - img_path: 输入影像路径(shp 栅格化或 NDWI 时需要) + - ndwi_threshold: NDWI 阈值(默认 0.4) + - use_ndwi: 是否使用 NDWI 方法(默认 False) + - generate_png: 是否生成 PNG 预览(默认 True) + - output_path: 指定输出路径(可选) + + Returns: + {'water_mask_path': str} + """ + step_start_time = time.time() + + try: + result = WaterMaskStep.run( + mask_path=config.get('mask_path'), + img_path=config.get('img_path'), + ndwi_threshold=config.get('ndwi_threshold', 0.4), + use_ndwi=config.get('use_ndwi', False), + generate_png=config.get('generate_png', True), + output_path=config.get('output_path'), + water_mask_dir=str(context.water_mask_dir), + callback=context.notify, + ) + + # 将输出路径写入上下文,供后续步骤使用 + context.water_mask_path = result + + step_end_time = time.time() + context.record_step_time( + "步骤1: 水域掩膜生成", step_start_time, step_end_time + ) + + return {'water_mask_path': result} + + except Exception as e: + step_end_time = time.time() + context.record_step_time( + "步骤1: 水域掩膜生成", step_start_time, step_end_time, + status="failed", error=str(e) + ) + raise diff --git a/src/gui/core/panel_factory.py b/src/gui/core/panel_factory.py index cea9b6e..6b1d68d 100644 --- a/src/gui/core/panel_factory.py +++ b/src/gui/core/panel_factory.py @@ -162,14 +162,19 @@ class PanelFactory: scroll.setWidget(panel) scroll.setWidgetResizable(True) - # 替换占位页 + # 替换占位页(blockSignals 阻断 removeTab/insertTab/setCurrentIndex 触发的 + # currentChanged 信号风暴,防止 _on_tab_changed → _ensure_loaded 无限递归) placeholder = self._placeholders.get(tab_index) if placeholder is not None and self._tab_widget is not None: tab_title = self._tab_widget.tabText(tab_index) tab_icon = self._tab_widget.tabIcon(tab_index) - self._tab_widget.removeTab(tab_index) - self._tab_widget.insertTab(tab_index, scroll, tab_icon, tab_title) - self._tab_widget.setCurrentIndex(tab_index) + self._tab_widget.blockSignals(True) + try: + self._tab_widget.removeTab(tab_index) + self._tab_widget.insertTab(tab_index, scroll, tab_icon, tab_title) + self._tab_widget.setCurrentIndex(tab_index) + finally: + self._tab_widget.blockSignals(False) # 注册 self._panels[step_id] = panel