refactor: 引入 EventBus 事件总线,实现各步骤面板间的去中心化自动参数传导,完成最终解耦

This commit is contained in:
DXC
2026-06-17 16:27:26 +08:00
parent a58744cfbb
commit bb5c2a50f8
5 changed files with 220 additions and 120 deletions

View File

@ -10,6 +10,8 @@
import copy
from pathlib import Path
from src.gui.core.event_bus import global_event_bus
class WorkspaceManager:
"""管理步骤默认输出路径、文件扫描与配置裁剪"""
@ -23,19 +25,40 @@ class WorkspaceManager:
def __init__(self):
self.step_default_outputs = {
'step1': "1_water_mask/water_mask_from_ndwi.dat",
'step2': "2_Glint_Detection/severe_glint_area.dat",
'step3': "3_deglint/deglint_goodman.bsq",
'step4_sampling': "4_sampling/sampling_spectra.csv",
'step5_clean': "5_Data_Cleaning/processed_data.csv",
'step6_feature': "6_Spectral_Feature_Extraction/training_spectra.csv",
'step7_index': "7_Water_Quality_Indices/training_spectra_indices.csv",
'step8_ml_train': "8_Supervised_Model_Training/",
'step9_ml_predict': "9_ML_Prediction/",
'step10_watercolor': "10_WaterIndex_Images/",
'step11_map': "14_visualization/"
'step1': {'water_mask': "1_water_mask/water_mask_from_ndwi.dat"},
'step2': {'glint_mask': "2_Glint_Detection/severe_glint_area.dat"},
'step3': {'deglint_image': "3_deglint/deglint_goodman.bsq"},
'step4_sampling': {'sampling_points': "4_sampling/sampling_spectra.csv"},
'step5_clean': {'processed_data': "5_Data_Cleaning/processed_data.csv"},
'step6_feature': {'training_spectra': "6_Spectral_Feature_Extraction/training_spectra.csv"},
'step7_index': {'training_spectra_indices': "7_Water_Quality_Indices/training_spectra_indices.csv"},
'step8_ml_train': {'Supervised_Model_Training': "8_Supervised_Model_Training/"},
'step9_ml_predict': {'9_ML_Prediction': "9_ML_Prediction/"},
'step10_watercolor': {'WaterIndex_Images': "10_WaterIndex_Images/"},
'step11_map': {'14_visualization': "14_visualization/"},
}
self.step_outputs = {}
# pipeline step_id → panel step_id 映射(由 WaterQualityGUI 注入)
self._pipeline_to_panel = {}
def set_step_id_mapping(self, mapping: dict):
"""注入 pipeline step_id → panel step_id 映射,用于事件发布时统一 ID。"""
self._pipeline_to_panel = mapping
def _publish_outputs(self, step_id: str, outputs: dict):
"""将发现的产出发布到 EventBus。
Args:
step_id: 面板 step_id'step1', 'step5_clean'
outputs: {output_type: path_str}
"""
for output_type, path in outputs.items():
if path:
global_event_bus.publish('OutputUpdated', {
'step_id': step_id,
'output_type': output_type,
'path': path,
})
@staticmethod
def _is_scientific_mask(path_str):
@ -177,15 +200,25 @@ class WorkspaceManager:
if step_id not in self.step_outputs:
self.step_outputs[step_id] = {}
self.step_outputs[step_id].update(outputs)
# ★ 发布 EventBus 事件,驱动下游面板自动填充
self._publish_outputs(step_id, outputs)
return discovered_outputs
def update_step_outputs(self, step_name, work_path):
"""更新指定步骤的输出路径记录"""
if step_name not in self.step_default_outputs:
"""更新指定步骤的输出路径记录并发布 EventBus 事件。
step_name 可能是 pipeline step_id'step4'
会先通过 _pipeline_to_panel 映射为面板 step_id'step5_clean')。
"""
# 映射 pipeline step_id → panel step_id
panel_step_id = self._pipeline_to_panel.get(step_name, step_name)
if panel_step_id not in self.step_default_outputs:
return
step_outputs = self.step_default_outputs[step_name]
step_outputs = self.step_default_outputs[panel_step_id]
published = {}
for output_type, relative_path in step_outputs.items():
if '*' in relative_path:
@ -193,11 +226,18 @@ class WorkspaceManager:
matching_files = list(pattern_path.parent.glob(pattern_path.name))
if matching_files:
latest_file = max(matching_files, key=lambda p: p.stat().st_mtime)
self.step_outputs[step_name][output_type] = str(latest_file)
path_str = str(latest_file)
self.step_outputs.setdefault(panel_step_id, {})[output_type] = path_str
published[output_type] = path_str
else:
output_path = work_path / relative_path
if output_path.exists():
self.step_outputs[step_name][output_type] = str(output_path)
path_str = str(output_path)
self.step_outputs.setdefault(panel_step_id, {})[output_type] = path_str
published[output_type] = path_str
if published:
self._publish_outputs(panel_step_id, published)
@staticmethod
def prune_config_for_prediction_mode(config: dict) -> dict:

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
依赖订阅混入模块
提供 subscribe_panel_to_dependencies() 函数,让步骤面板根据
PANEL_REGISTRY 中声明的 dependencies 自动向 global_event_bus
订阅 OutputUpdated 事件。当上游步骤产出落地时,面板自动将路径
填入对应的 FileSelectWidget无需主窗口手工传导。
"""
from src.gui.core.event_bus import global_event_bus
def subscribe_panel_to_dependencies(panel, step_id, dependencies):
"""为面板订阅其依赖的上游步骤产出事件。
当 global_event_bus 发布 OutputUpdated 事件且 step_id/output_type
匹配时,自动将路径填入面板对应的 FileSelectWidget。
Args:
panel: 步骤面板实例QWidget 子类)
step_id: 当前面板的 step_id仅用于日志非匹配键
dependencies: dict, {input_field: (dep_step, output_type, panel_attr)}
"""
if not dependencies:
return
for _input_field, (dep_step, output_type, panel_attr) in dependencies.items():
_make_subscription(panel, dep_step, output_type, panel_attr)
def _make_subscription(panel, dep_step, output_type, panel_attr):
"""为单个依赖项创建事件订阅。使用工厂函数避免闭包变量延迟绑定。"""
def callback(data):
if data.get('step_id') != dep_step:
return
if data.get('output_type') != output_type:
return
widget = getattr(panel, panel_attr, None)
if widget is None:
return
current = ''
if hasattr(widget, 'get_path'):
current = widget.get_path().strip()
elif hasattr(widget, 'text'):
current = widget.text().strip()
if current:
return
path = data.get('path', '')
if not path:
return
if hasattr(widget, 'set_path'):
widget.set_path(path)
elif hasattr(widget, 'setText'):
widget.setText(path)
global_event_bus.subscribe('OutputUpdated', callback)

34
src/gui/core/event_bus.py Normal file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
轻量级事件总线
支持 subscribe(event_name, callback) 和 publish(event_name, data)
用于步骤面板间的去中心化参数传导。
"""
from collections import defaultdict
from typing import Any, Callable, Dict, List
class EventBus:
"""发布-订阅事件总线"""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
def subscribe(self, event_name: str, callback: Callable[[dict], None]):
"""订阅事件。callback 接收一个 dict 作为事件数据。"""
self._subscribers[event_name].append(callback)
def publish(self, event_name: str, data: Dict[str, Any]):
"""发布事件,通知所有订阅者。"""
for callback in self._subscribers.get(event_name, []):
try:
callback(data)
except Exception:
pass
# 全局单例
global_event_bus = EventBus()

View File

@ -251,3 +251,20 @@ def get_entry(step_id):
if entry['step_id'] == step_id:
return entry
return None
def build_output_types():
"""从 PANEL_REGISTRY 的 dependencies 反向推导每个步骤产出的 output_type 列表。
Returns:
dict: {step_id: [output_type, ...]}
"""
output_types = {}
for entry in PANEL_REGISTRY:
if entry['dependencies']:
for _input_field, (dep_step, output_type, _panel_attr) in entry['dependencies'].items():
if dep_step not in output_types:
output_types[dep_step] = []
if output_type not in output_types[dep_step]:
output_types[dep_step].append(output_type)
return output_types

View File

@ -124,6 +124,8 @@ from src.gui.core.panel_registry import (
get_step_id_by_tab_index,
get_entry,
)
from src.gui.core.event_bus import global_event_bus
from src.gui.core.dependency_subscriber import subscribe_panel_to_dependencies
from src.gui.dialogs import BandConfirmDialog, AISettingsDialog
# Pipeline 核心异常(用于预检弹窗)
@ -155,6 +157,22 @@ from src.gui.core.pipeline_mode_dialog import PipelineModeDialog
from src.gui.core.viz_thread import VisualizationWorkerThread, _viz_training_spectra_csv_path
from src.core.workspace_manager import WorkspaceManager
# pipeline step_id → panel step_id 映射pipeline 内部编号与 GUI 面板编号不同)
PIPELINE_TO_PANEL_STEP = {
'step1': 'step1',
'step2': 'step2',
'step3': 'step3',
'step4': 'step5_clean',
'step5': 'step6_feature',
'step7': 'step7_index',
'step8': 'step8_ml_train',
'step9': 'step10_watercolor',
'step10': 'step4_sampling',
'step11_ml': 'step9_ml_predict',
'step11': 'step11_map',
'step14': 'step11_map',
}
class WaterQualityGUI(QMainWindow):
"""水质参数反演分析系统主窗口"""
@ -181,6 +199,7 @@ class WaterQualityGUI(QMainWindow):
# 工作空间管理器(文件扫描、路径发现、配置裁剪)
self.workspace_manager = WorkspaceManager()
self.workspace_manager.set_step_id_mapping(PIPELINE_TO_PANEL_STEP)
# 面板实例字典step_id → panel instance由 create_content_area 填充
self._panels = {}
@ -670,6 +689,11 @@ class WaterQualityGUI(QMainWindow):
title
)
# ★ 事件驱动:面板根据注册表依赖自动订阅 OutputUpdated 事件
deps = entry.get('dependencies')
if deps:
subscribe_panel_to_dependencies(panel, step_id, deps)
# 连接Tab切换信号实现双向同步必须在step_stack创建后
self.step_stack.currentChanged.connect(self.on_tab_changed)
@ -809,8 +833,6 @@ class WaterQualityGUI(QMainWindow):
tab_index = get_tab_index(item_data)
if tab_index >= 0:
self.step_stack.setCurrentIndex(tab_index)
# 切换到步骤时自动填充输入路径
self.auto_populate_step_inputs(item_data)
def on_tab_changed(self, index):
"""Tab页面切换时同步更新左侧步骤列表"""
@ -911,64 +933,15 @@ class WaterQualityGUI(QMainWindow):
return config
def auto_populate_step_inputs(self, step_id):
"""自动填充指定步骤的输入路径,返回填充的字段数量"""
if step_id not in self.step_dependencies:
return 0 # 该步骤没有依赖关系
# 获取对应的面板
panel = self.get_step_panel(step_id)
if not panel:
"""自动填充指定步骤的输入路径(事件总线已接管,保留接口兼容)。"""
return 0
work_dir = getattr(self, 'work_dir', './work_dir')
work_path = Path(work_dir)
dependencies = self.step_dependencies[step_id]
filled_count = 0
ref_img_path = None
step1_panel = self._panels.get('step1')
if step1_panel and hasattr(step1_panel, 'img_file'):
ref_img_path = step1_panel.img_file.get_path()
for input_field, (dep_step, output_type, panel_attr) in dependencies.items():
# 检查面板是否有对应的属性
if not hasattr(panel, panel_attr):
continue
file_widget = getattr(panel, panel_attr)
# ★ 兼容 FileSelectWidget 与原生 QLineEdit
current_text = (
file_widget.get_path().strip()
if hasattr(file_widget, 'get_path')
else file_widget.text().strip()
)
# 如果输入框已经有内容,跳过自动填充
if current_text:
continue
# 查找依赖步骤的输出文件
output_path = self.workspace_manager.find_step_output(work_path, dep_step, output_type, ref_img_path=ref_img_path)
if output_path and Path(output_path).exists():
# ★ 兼容 FileSelectWidget 与原生 QLineEdit
if hasattr(file_widget, 'set_path'):
file_widget.set_path(str(output_path))
else:
file_widget.setText(str(output_path))
self.log_message(f"自动填充 {step_id}.{input_field}: {output_path}", "info")
filled_count += 1
return filled_count
def get_step_panel(self, step_id):
"""根据步骤ID获取对应的面板对象从动态注册表查找"""
return self._panels.get(step_id)
def auto_populate_all_steps(self):
"""自动填充所有步骤的输入路径"""
"""扫描工作目录并触发事件总线自动填充所有步骤的输入路径"""
work_dir = getattr(self, 'work_dir', './work_dir')
work_path = Path(work_dir)
@ -976,23 +949,22 @@ class WaterQualityGUI(QMainWindow):
QMessageBox.warning(self, "警告", f"工作目录不存在: {work_dir}\n请先设置正确的工作目录。")
return
# 首先扫描工作目录发现已有的输出文件
# 扫描工作目录 → WorkspaceManager 发布 OutputUpdated 事件 → 面板自动填充
self.workspace_manager.scan_work_directory_for_files(work_path)
# 从注册表推导步骤顺序(跳过 step1从 step2 开始
step_order = [e['step_id'] for e in PANEL_REGISTRY if e['step_id'] != 'step1']
# 补充发布 reference_imgstep1 的输入影像,非 step1 产出但下游依赖它
step1_panel = self._panels.get('step1')
if step1_panel and hasattr(step1_panel, 'img_file'):
ref_img = step1_panel.img_file.get_path()
if ref_img:
global_event_bus.publish('OutputUpdated', {
'step_id': 'step1',
'output_type': 'reference_img',
'path': ref_img,
})
filled_count = 0
for step_id in step_order:
old_count = filled_count
filled_count += self.auto_populate_step_inputs(step_id)
if filled_count > 0:
self.log_message(f"已完成所有步骤的自动路径填充,共填充 {filled_count} 个输入字段", "info")
QMessageBox.information(self, "完成", f"自动填充完成!\n共填充了 {filled_count} 个输入字段。")
else:
self.log_message("未发现可自动填充的路径", "info")
QMessageBox.information(self, "完成", "未发现可自动填充的路径。\n请确保工作目录中有相关的输出文件。")
self.log_message("✓ 工作目录扫描完成,事件总线已通知所有面板自动填充", "info")
QMessageBox.information(self, "完成", "工作目录扫描完成!\n各步骤输入路径已自动填充。")
def add_auto_fill_buttons_to_panels(self):
"""为各个步骤面板添加自动填充按钮(动态遍历所有面板)"""
@ -1409,47 +1381,20 @@ class WaterQualityGUI(QMainWindow):
QMessageBox.critical(self, "失败", f"流程执行失败:\n\n{message[:200]}")
def on_step_completed(self, step_name, success, message):
"""步骤完成回调:记录输出路径并更新后续步骤"""
"""步骤完成回调:记录输出路径WorkspaceManager 自动发布 EventBus 事件。"""
if not success:
return
# 记录步骤输出路径到内存
work_dir = getattr(self, 'work_dir', './work_dir')
work_path = Path(work_dir)
# 根据步骤名称和约定路径,记录实际输出
if step_name not in self.workspace_manager.step_outputs:
self.workspace_manager.step_outputs[step_name] = {}
# 扫描工作目录,更新该步骤的输出路径
# WorkspaceManager.update_step_outputs 内部会发布 OutputUpdated 事件
# 下游面板通过 DependencySubscriber 自动接收并填充
self.workspace_manager.update_step_outputs(step_name, work_path)
# 自动填充依赖该步骤输出的后续步骤
self.auto_populate_dependent_steps(step_name)
def auto_populate_dependent_steps(self, completed_step):
"""自动填充依赖于已完成步骤的后续步骤"""
ref_img_path = None
step1_panel = self._panels.get('step1')
if step1_panel and hasattr(step1_panel, 'img_file'):
ref_img_path = step1_panel.img_file.get_path()
for step_id, dependencies in self.step_dependencies.items():
for input_field, (dep_step, output_type, panel_attr) in dependencies.items():
if dep_step == completed_step:
# 找到依赖于刚完成步骤的后续步骤,尝试自动填充
panel = self.get_step_panel(step_id)
if panel and hasattr(panel, panel_attr):
file_widget = getattr(panel, panel_attr)
# 如果输入框为空,则自动填充
if not file_widget.get_path().strip():
work_dir = getattr(self, 'work_dir', './work_dir')
work_path = Path(work_dir)
output_path = self.workspace_manager.find_step_output(work_path, dep_step, output_type, ref_img_path=ref_img_path)
if output_path and Path(output_path).exists():
file_widget.set_path(output_path)
self.log_message(f"步骤完成后自动填充 {step_id}.{input_field}: {output_path}", "info")
def run_single_step(self, step_name, config):
"""运行单个步骤"""
if not PIPELINE_AVAILABLE: