diff --git a/src/core/workspace_manager.py b/src/core/workspace_manager.py index 52f86e0..ab5a8e5 100644 --- a/src/core/workspace_manager.py +++ b/src/core/workspace_manager.py @@ -10,6 +10,8 @@ import copy from pathlib import Path +from src.gui.core.event_bus import global_event_bus + class WorkspaceManager: """管理步骤默认输出路径、文件扫描与配置裁剪""" @@ -23,19 +25,40 @@ class WorkspaceManager: def __init__(self): self.step_default_outputs = { - 'step1': "1_water_mask/water_mask_from_ndwi.dat", - 'step2': "2_Glint_Detection/severe_glint_area.dat", - 'step3': "3_deglint/deglint_goodman.bsq", - 'step4_sampling': "4_sampling/sampling_spectra.csv", - 'step5_clean': "5_Data_Cleaning/processed_data.csv", - 'step6_feature': "6_Spectral_Feature_Extraction/training_spectra.csv", - 'step7_index': "7_Water_Quality_Indices/training_spectra_indices.csv", - 'step8_ml_train': "8_Supervised_Model_Training/", - 'step9_ml_predict': "9_ML_Prediction/", - 'step10_watercolor': "10_WaterIndex_Images/", - 'step11_map': "14_visualization/" + 'step1': {'water_mask': "1_water_mask/water_mask_from_ndwi.dat"}, + 'step2': {'glint_mask': "2_Glint_Detection/severe_glint_area.dat"}, + 'step3': {'deglint_image': "3_deglint/deglint_goodman.bsq"}, + 'step4_sampling': {'sampling_points': "4_sampling/sampling_spectra.csv"}, + 'step5_clean': {'processed_data': "5_Data_Cleaning/processed_data.csv"}, + 'step6_feature': {'training_spectra': "6_Spectral_Feature_Extraction/training_spectra.csv"}, + 'step7_index': {'training_spectra_indices': "7_Water_Quality_Indices/training_spectra_indices.csv"}, + 'step8_ml_train': {'Supervised_Model_Training': "8_Supervised_Model_Training/"}, + 'step9_ml_predict': {'9_ML_Prediction': "9_ML_Prediction/"}, + 'step10_watercolor': {'WaterIndex_Images': "10_WaterIndex_Images/"}, + 'step11_map': {'14_visualization': "14_visualization/"}, } self.step_outputs = {} + # pipeline step_id → panel step_id 映射(由 WaterQualityGUI 注入) + self._pipeline_to_panel = {} + + def set_step_id_mapping(self, mapping: dict): + """注入 pipeline step_id → panel step_id 映射,用于事件发布时统一 ID。""" + self._pipeline_to_panel = mapping + + def _publish_outputs(self, step_id: str, outputs: dict): + """将发现的产出发布到 EventBus。 + + Args: + step_id: 面板 step_id(如 'step1', 'step5_clean') + outputs: {output_type: path_str} + """ + for output_type, path in outputs.items(): + if path: + global_event_bus.publish('OutputUpdated', { + 'step_id': step_id, + 'output_type': output_type, + 'path': path, + }) @staticmethod def _is_scientific_mask(path_str): @@ -177,15 +200,25 @@ class WorkspaceManager: if step_id not in self.step_outputs: self.step_outputs[step_id] = {} self.step_outputs[step_id].update(outputs) + # ★ 发布 EventBus 事件,驱动下游面板自动填充 + self._publish_outputs(step_id, outputs) return discovered_outputs def update_step_outputs(self, step_name, work_path): - """更新指定步骤的输出路径记录""" - if step_name not in self.step_default_outputs: + """更新指定步骤的输出路径记录并发布 EventBus 事件。 + + step_name 可能是 pipeline step_id(如 'step4'), + 会先通过 _pipeline_to_panel 映射为面板 step_id(如 'step5_clean')。 + """ + # 映射 pipeline step_id → panel step_id + panel_step_id = self._pipeline_to_panel.get(step_name, step_name) + + if panel_step_id not in self.step_default_outputs: return - step_outputs = self.step_default_outputs[step_name] + step_outputs = self.step_default_outputs[panel_step_id] + published = {} for output_type, relative_path in step_outputs.items(): if '*' in relative_path: @@ -193,11 +226,18 @@ class WorkspaceManager: matching_files = list(pattern_path.parent.glob(pattern_path.name)) if matching_files: latest_file = max(matching_files, key=lambda p: p.stat().st_mtime) - self.step_outputs[step_name][output_type] = str(latest_file) + path_str = str(latest_file) + self.step_outputs.setdefault(panel_step_id, {})[output_type] = path_str + published[output_type] = path_str else: output_path = work_path / relative_path if output_path.exists(): - self.step_outputs[step_name][output_type] = str(output_path) + path_str = str(output_path) + self.step_outputs.setdefault(panel_step_id, {})[output_type] = path_str + published[output_type] = path_str + + if published: + self._publish_outputs(panel_step_id, published) @staticmethod def prune_config_for_prediction_mode(config: dict) -> dict: diff --git a/src/gui/core/dependency_subscriber.py b/src/gui/core/dependency_subscriber.py new file mode 100644 index 0000000..2e11612 --- /dev/null +++ b/src/gui/core/dependency_subscriber.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +依赖订阅混入模块 + +提供 subscribe_panel_to_dependencies() 函数,让步骤面板根据 +PANEL_REGISTRY 中声明的 dependencies 自动向 global_event_bus +订阅 OutputUpdated 事件。当上游步骤产出落地时,面板自动将路径 +填入对应的 FileSelectWidget,无需主窗口手工传导。 +""" + +from src.gui.core.event_bus import global_event_bus + + +def subscribe_panel_to_dependencies(panel, step_id, dependencies): + """为面板订阅其依赖的上游步骤产出事件。 + + 当 global_event_bus 发布 OutputUpdated 事件且 step_id/output_type + 匹配时,自动将路径填入面板对应的 FileSelectWidget。 + + Args: + panel: 步骤面板实例(QWidget 子类) + step_id: 当前面板的 step_id(仅用于日志,非匹配键) + dependencies: dict, {input_field: (dep_step, output_type, panel_attr)} + """ + if not dependencies: + return + + for _input_field, (dep_step, output_type, panel_attr) in dependencies.items(): + _make_subscription(panel, dep_step, output_type, panel_attr) + + +def _make_subscription(panel, dep_step, output_type, panel_attr): + """为单个依赖项创建事件订阅。使用工厂函数避免闭包变量延迟绑定。""" + + def callback(data): + if data.get('step_id') != dep_step: + return + if data.get('output_type') != output_type: + return + + widget = getattr(panel, panel_attr, None) + if widget is None: + return + + current = '' + if hasattr(widget, 'get_path'): + current = widget.get_path().strip() + elif hasattr(widget, 'text'): + current = widget.text().strip() + + if current: + return + + path = data.get('path', '') + if not path: + return + + if hasattr(widget, 'set_path'): + widget.set_path(path) + elif hasattr(widget, 'setText'): + widget.setText(path) + + global_event_bus.subscribe('OutputUpdated', callback) diff --git a/src/gui/core/event_bus.py b/src/gui/core/event_bus.py new file mode 100644 index 0000000..b8f407f --- /dev/null +++ b/src/gui/core/event_bus.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +轻量级事件总线 + +支持 subscribe(event_name, callback) 和 publish(event_name, data), +用于步骤面板间的去中心化参数传导。 +""" + +from collections import defaultdict +from typing import Any, Callable, Dict, List + + +class EventBus: + """发布-订阅事件总线""" + + def __init__(self): + self._subscribers: Dict[str, List[Callable]] = defaultdict(list) + + def subscribe(self, event_name: str, callback: Callable[[dict], None]): + """订阅事件。callback 接收一个 dict 作为事件数据。""" + self._subscribers[event_name].append(callback) + + def publish(self, event_name: str, data: Dict[str, Any]): + """发布事件,通知所有订阅者。""" + for callback in self._subscribers.get(event_name, []): + try: + callback(data) + except Exception: + pass + + +# 全局单例 +global_event_bus = EventBus() diff --git a/src/gui/core/panel_registry.py b/src/gui/core/panel_registry.py index 135fc46..c4343da 100644 --- a/src/gui/core/panel_registry.py +++ b/src/gui/core/panel_registry.py @@ -251,3 +251,20 @@ def get_entry(step_id): if entry['step_id'] == step_id: return entry return None + + +def build_output_types(): + """从 PANEL_REGISTRY 的 dependencies 反向推导每个步骤产出的 output_type 列表。 + + Returns: + dict: {step_id: [output_type, ...]} + """ + output_types = {} + for entry in PANEL_REGISTRY: + if entry['dependencies']: + for _input_field, (dep_step, output_type, _panel_attr) in entry['dependencies'].items(): + if dep_step not in output_types: + output_types[dep_step] = [] + if output_type not in output_types[dep_step]: + output_types[dep_step].append(output_type) + return output_types diff --git a/src/gui/water_quality_gui.py b/src/gui/water_quality_gui.py index a4f08b4..c55dac0 100644 --- a/src/gui/water_quality_gui.py +++ b/src/gui/water_quality_gui.py @@ -124,6 +124,8 @@ from src.gui.core.panel_registry import ( get_step_id_by_tab_index, get_entry, ) +from src.gui.core.event_bus import global_event_bus +from src.gui.core.dependency_subscriber import subscribe_panel_to_dependencies from src.gui.dialogs import BandConfirmDialog, AISettingsDialog # Pipeline 核心异常(用于预检弹窗) @@ -155,6 +157,22 @@ from src.gui.core.pipeline_mode_dialog import PipelineModeDialog from src.gui.core.viz_thread import VisualizationWorkerThread, _viz_training_spectra_csv_path from src.core.workspace_manager import WorkspaceManager +# pipeline step_id → panel step_id 映射(pipeline 内部编号与 GUI 面板编号不同) +PIPELINE_TO_PANEL_STEP = { + 'step1': 'step1', + 'step2': 'step2', + 'step3': 'step3', + 'step4': 'step5_clean', + 'step5': 'step6_feature', + 'step7': 'step7_index', + 'step8': 'step8_ml_train', + 'step9': 'step10_watercolor', + 'step10': 'step4_sampling', + 'step11_ml': 'step9_ml_predict', + 'step11': 'step11_map', + 'step14': 'step11_map', +} + class WaterQualityGUI(QMainWindow): """水质参数反演分析系统主窗口""" @@ -181,6 +199,7 @@ class WaterQualityGUI(QMainWindow): # 工作空间管理器(文件扫描、路径发现、配置裁剪) self.workspace_manager = WorkspaceManager() + self.workspace_manager.set_step_id_mapping(PIPELINE_TO_PANEL_STEP) # 面板实例字典(step_id → panel instance),由 create_content_area 填充 self._panels = {} @@ -669,6 +688,11 @@ class WaterQualityGUI(QMainWindow): QIcon(self.get_icon_path(icon_name)), title ) + + # ★ 事件驱动:面板根据注册表依赖自动订阅 OutputUpdated 事件 + deps = entry.get('dependencies') + if deps: + subscribe_panel_to_dependencies(panel, step_id, deps) # 连接Tab切换信号,实现双向同步(必须在step_stack创建后) self.step_stack.currentChanged.connect(self.on_tab_changed) @@ -809,8 +833,6 @@ class WaterQualityGUI(QMainWindow): tab_index = get_tab_index(item_data) if tab_index >= 0: self.step_stack.setCurrentIndex(tab_index) - # 切换到步骤时自动填充输入路径 - self.auto_populate_step_inputs(item_data) def on_tab_changed(self, index): """Tab页面切换时同步更新左侧步骤列表""" @@ -911,88 +933,38 @@ class WaterQualityGUI(QMainWindow): return config def auto_populate_step_inputs(self, step_id): - """自动填充指定步骤的输入路径,返回填充的字段数量""" - if step_id not in self.step_dependencies: - return 0 # 该步骤没有依赖关系 - - # 获取对应的面板 - panel = self.get_step_panel(step_id) - if not panel: - return 0 - - work_dir = getattr(self, 'work_dir', './work_dir') - work_path = Path(work_dir) - - dependencies = self.step_dependencies[step_id] - filled_count = 0 - - ref_img_path = None - step1_panel = self._panels.get('step1') - if step1_panel and hasattr(step1_panel, 'img_file'): - ref_img_path = step1_panel.img_file.get_path() - - for input_field, (dep_step, output_type, panel_attr) in dependencies.items(): - # 检查面板是否有对应的属性 - if not hasattr(panel, panel_attr): - continue - - file_widget = getattr(panel, panel_attr) - - # ★ 兼容 FileSelectWidget 与原生 QLineEdit - current_text = ( - file_widget.get_path().strip() - if hasattr(file_widget, 'get_path') - else file_widget.text().strip() - ) - - # 如果输入框已经有内容,跳过自动填充 - if current_text: - continue - - # 查找依赖步骤的输出文件 - output_path = self.workspace_manager.find_step_output(work_path, dep_step, output_type, ref_img_path=ref_img_path) - - if output_path and Path(output_path).exists(): - # ★ 兼容 FileSelectWidget 与原生 QLineEdit - if hasattr(file_widget, 'set_path'): - file_widget.set_path(str(output_path)) - else: - file_widget.setText(str(output_path)) - self.log_message(f"自动填充 {step_id}.{input_field}: {output_path}", "info") - filled_count += 1 - - return filled_count + """自动填充指定步骤的输入路径(事件总线已接管,保留接口兼容)。""" + return 0 def get_step_panel(self, step_id): """根据步骤ID获取对应的面板对象(从动态注册表查找)""" return self._panels.get(step_id) def auto_populate_all_steps(self): - """自动填充所有步骤的输入路径""" + """扫描工作目录并触发事件总线自动填充所有步骤的输入路径。""" work_dir = getattr(self, 'work_dir', './work_dir') work_path = Path(work_dir) - + if not work_path.exists(): QMessageBox.warning(self, "警告", f"工作目录不存在: {work_dir}\n请先设置正确的工作目录。") return - - # 首先扫描工作目录发现已有的输出文件 + + # 扫描工作目录 → WorkspaceManager 发布 OutputUpdated 事件 → 面板自动填充 self.workspace_manager.scan_work_directory_for_files(work_path) - - # 从注册表推导步骤顺序(跳过 step1,从 step2 开始) - step_order = [e['step_id'] for e in PANEL_REGISTRY if e['step_id'] != 'step1'] - - filled_count = 0 - for step_id in step_order: - old_count = filled_count - filled_count += self.auto_populate_step_inputs(step_id) - - if filled_count > 0: - self.log_message(f"已完成所有步骤的自动路径填充,共填充 {filled_count} 个输入字段", "info") - QMessageBox.information(self, "完成", f"自动填充完成!\n共填充了 {filled_count} 个输入字段。") - else: - self.log_message("未发现可自动填充的路径", "info") - QMessageBox.information(self, "完成", "未发现可自动填充的路径。\n请确保工作目录中有相关的输出文件。") + + # 补充发布 reference_img(step1 的输入影像,非 step1 产出但下游依赖它) + step1_panel = self._panels.get('step1') + if step1_panel and hasattr(step1_panel, 'img_file'): + ref_img = step1_panel.img_file.get_path() + if ref_img: + global_event_bus.publish('OutputUpdated', { + 'step_id': 'step1', + 'output_type': 'reference_img', + 'path': ref_img, + }) + + self.log_message("✓ 工作目录扫描完成,事件总线已通知所有面板自动填充", "info") + QMessageBox.information(self, "完成", "工作目录扫描完成!\n各步骤输入路径已自动填充。") def add_auto_fill_buttons_to_panels(self): """为各个步骤面板添加自动填充按钮(动态遍历所有面板)""" @@ -1409,46 +1381,19 @@ class WaterQualityGUI(QMainWindow): QMessageBox.critical(self, "失败", f"流程执行失败:\n\n{message[:200]}") def on_step_completed(self, step_name, success, message): - """步骤完成回调:记录输出路径并更新后续步骤""" + """步骤完成回调:记录输出路径,WorkspaceManager 自动发布 EventBus 事件。""" if not success: return - - # 记录步骤输出路径到内存 + work_dir = getattr(self, 'work_dir', './work_dir') work_path = Path(work_dir) - - # 根据步骤名称和约定路径,记录实际输出 + if step_name not in self.workspace_manager.step_outputs: self.workspace_manager.step_outputs[step_name] = {} - # 扫描工作目录,更新该步骤的输出路径 + # WorkspaceManager.update_step_outputs 内部会发布 OutputUpdated 事件 + # 下游面板通过 DependencySubscriber 自动接收并填充 self.workspace_manager.update_step_outputs(step_name, work_path) - - # 自动填充依赖该步骤输出的后续步骤 - self.auto_populate_dependent_steps(step_name) - - def auto_populate_dependent_steps(self, completed_step): - """自动填充依赖于已完成步骤的后续步骤""" - ref_img_path = None - step1_panel = self._panels.get('step1') - if step1_panel and hasattr(step1_panel, 'img_file'): - ref_img_path = step1_panel.img_file.get_path() - - for step_id, dependencies in self.step_dependencies.items(): - for input_field, (dep_step, output_type, panel_attr) in dependencies.items(): - if dep_step == completed_step: - # 找到依赖于刚完成步骤的后续步骤,尝试自动填充 - panel = self.get_step_panel(step_id) - if panel and hasattr(panel, panel_attr): - file_widget = getattr(panel, panel_attr) - # 如果输入框为空,则自动填充 - if not file_widget.get_path().strip(): - work_dir = getattr(self, 'work_dir', './work_dir') - work_path = Path(work_dir) - output_path = self.workspace_manager.find_step_output(work_path, dep_step, output_type, ref_img_path=ref_img_path) - if output_path and Path(output_path).exists(): - file_widget.set_path(output_path) - self.log_message(f"步骤完成后自动填充 {step_id}.{input_field}: {output_path}", "info") def run_single_step(self, step_name, config): """运行单个步骤"""