From aefc9d5aacab599cecf7c2c9fa130b6fae7ab365 Mon Sep 17 00:00:00 2001 From: DXC Date: Tue, 9 Jun 2026 11:29:11 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20=E4=B8=80=E9=94=AE=E5=BC=8F?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=20-=20=E8=B0=83=E5=BA=A6=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E6=A0=B8=E5=BF=83=20+=20=E9=A2=84=E6=A3=80/=E5=85=8D=E6=A3=80?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=20+=20=E7=BA=BF=E7=A8=8B=E6=A1=A5=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/pipeline/runner.py | 278 ++++++++++---------- src/gui/core/preflight_dialog.py | 431 +++++++++++++++++++++++++++++++ src/gui/core/worker_thread.py | 215 +++++++++++++-- 3 files changed, 766 insertions(+), 158 deletions(-) create mode 100644 src/gui/core/preflight_dialog.py diff --git a/src/core/pipeline/runner.py b/src/core/pipeline/runner.py index 1716bcd..b4f7daf 100644 --- a/src/core/pipeline/runner.py +++ b/src/core/pipeline/runner.py @@ -17,6 +17,8 @@ PipelineRunner:基于 StepSpec 声明式调度 14 个 step。 """ from __future__ import annotations +import inspect +import logging import os import time from dataclasses import dataclass, field @@ -24,6 +26,8 @@ from typing import Any, Dict, List, Optional, Sequence from .context import PipelineContext, STEP_MAP_OLD_TO_NEW, STEP_MAP_NEW_TO_OLD, resolve_step_id +logger = logging.getLogger(__name__) + # ============================================================ # 终止异常(外层 run() 不 catch,触发循环 break) @@ -69,21 +73,21 @@ class StepSpec: PIPELINE_STEPS: List[StepSpec] = [ StepSpec( - step_id="step1", method_name="step1_water_mask", + step_id="step1", method_name="step1_generate_water_mask", requires=["img_path"], produces=["water_mask_path"], required_input_files=["img_path"], output_file="{work_dir}/1_water_mask/water_mask.dat", description="水域掩膜生成(NDWI 或 SHP)", ), StepSpec( - step_id="step2", method_name="step2_glint_detection", + step_id="step2", method_name="step2_find_glint_area", requires=["img_path", "water_mask_path"], produces=["glint_mask_path"], required_input_files=["img_path", "water_mask_path"], output_file="{work_dir}/2_glint/glint_mask.dat", description="耀斑区域检测", ), StepSpec( - step_id="step3", method_name="step3_deglint", + step_id="step3", method_name="step3_remove_glint", requires=["img_path", "water_mask_path", "glint_mask_path"], produces=["deglint_img_path"], required_input_files=["img_path", "water_mask_path", "glint_mask_path"], @@ -91,14 +95,14 @@ PIPELINE_STEPS: List[StepSpec] = [ description="耀斑去除", ), StepSpec( - step_id="step4", method_name="step4_data_preparation", + step_id="step4", method_name="step4_process_csv", requires=["csv_path"], produces=["processed_csv_path"], required_input_files=["csv_path"], output_file="{work_dir}/4_processed_data/processed_data.csv", description="CSV 异常值清洗", ), StepSpec( - step_id="step5", method_name="step5_spectral_extraction", + step_id="step5", method_name="step5_extract_training_spectra", requires=["deglint_img_path", "processed_csv_path", "csv_path", "boundary_path", "glint_mask_path"], produces=["training_csv_path"], parameter_map={ @@ -193,7 +197,7 @@ class PipelineRunner: 用法: ctx = PipelineContext(img_path=..., work_dir=..., user_config=config) runner = PipelineRunner(pipeline_instance) - result_ctx = runner.run(ctx) # 预检通过后开始执行 + result_ctx = runner.run(ctx, config=config) # 预检通过后开始执行 print(result_ctx.error_summary) # [(step_id, error_msg), ...] """ @@ -205,14 +209,11 @@ class PipelineRunner: # 主入口 # ------------------------------------------------------------------ - def run(self, ctx: PipelineContext, skip_list: Optional[List[str]] = None) -> PipelineContext: - """全流程入口:智能补全 → 预检(软警告)→ 执行。 + def run(self, ctx: PipelineContext, config=None, skip_list: Optional[List[str]] = None) -> PipelineContext: + self.config = config or {} + skip_list = skip_list or [] + logger.info("开始运行完整流程 (Runner 调度模式)...") - Args: - ctx: PipelineContext - skip_list: 用户在 PreflightDialog 中选择忽略的 step_id 列表。 - 命中项设置 status="user_skipped",打印醒目日志。 - """ ctx.pipeline_start_time = time.time() error_summary: List[tuple[str, str]] = [] skip_set = set(skip_list) if skip_list else set() @@ -238,75 +239,148 @@ class PipelineRunner: # 断点续跑预扫描:ctx 已有产物则记录诊断日志 self._restore_outputs_from_ctx(ctx) - # ── ★ 依赖级联自动唤醒:在主循环开始前补齐所有前置缺口 ── - self._resolve_dependencies(ctx) + # 1. 暴力上下文注入:将 GUI config 中的所有参数强行塞入 ctx,防丢失 + for step_id, cfg in self.config.items(): + if isinstance(cfg, dict): + for k, v in cfg.items(): + if k != 'enabled' and v: + setattr(ctx, k, v) - for spec in self.steps: + # 2. 构建依赖提供者映射 (Provider Map) + provider_map = {} + for step in self.steps: + for prod in step.produces: + provider_map[prod] = step + + # 3. 强力依赖级联唤醒 (Auto-Wakeup Engine) + changed = True + woke_up_steps = [] + while changed: + changed = False + for step in self.steps: + if step.step_id in skip_set: + continue # 用户强踢的,绝不唤醒 + + step_cfg = self.config.setdefault(step.step_id, {}) + if not step_cfg.get('enabled', True): + continue + + for req in step.requires: + # 如果上下文缺这个参数 + if not (hasattr(ctx, req) and getattr(ctx, req)): + provider = provider_map.get(req) + if provider and provider.step_id not in skip_set: + prov_cfg = self.config.setdefault(provider.step_id, {}) + if not prov_cfg.get('enabled', True): + prov_cfg['enabled'] = True + changed = True + woke_up_steps.append(provider.step_id) + logger.info(f"[*] 自动唤醒: {provider.step_id} (为下游提供 {req})") + + if woke_up_steps: + logger.info(f"★ 依赖唤醒完成,共唤醒 {len(woke_up_steps)} 个次/步骤") + + # 4. 正式执行流水线 + for step in self.steps: # ── 软取消 ── if ctx.is_cancelled(): - ctx.append_log(f"[RUNNER] 收到取消信号,提前终止 @ {spec.step_id}") + ctx.append_log(f"[RUNNER] 收到取消信号,提前终止 @ {step.step_id}") break - # ── disabled 跳过(locked_steps 不受此约束)── - if not spec.enabled and spec.step_id not in ctx.locked_steps: - ctx.status[spec.step_id] = "skipped" - ctx.append_log(f"[RUNNER] {spec.step_id} 标记为 disabled,跳过") - continue - - # ── ★ 用户强制跳过(PreflightDialog 勾选) ── - if spec.step_id in skip_set: - ctx.status[spec.step_id] = "user_skipped" + if step.step_id in skip_set: + ctx.status[step.step_id] = "user_skipped" ctx.append_log( f"\n{'='*60}\n" - f" ⚠ 用户强制跳过: {spec.step_id}({spec.description})\n" + f" ⚠ 用户强制跳过: {step.step_id}({step.description})\n" f" 原因:用户在预检弹窗中勾选「忽略」,已确认跳过\n" f"{'='*60}\n" ) - self._notify_step(spec.step_id, "skipped", "用户强制跳过(预检弹窗)") + self._notify_step(step.step_id, "skipped", "用户强制跳过(预检弹窗)") continue - # ── 依赖缺失检查 ── - if spec.skip_when_missing: - missing = [k for k in spec.requires if not ctx.get(k)] - if missing: - # ── ★ 智能补全的步骤:work_dir 有产物,但 requires 仍缺失(罕见),报 warning 不跳过 - if spec.step_id in ctx.locked_steps: - ctx.append_log( - f"[RUNNER] ⚠ {spec.step_id} 已锁定但 requires 仍缺失 {missing}," - "尝试执行(可能因依赖前置步骤失败)" - ) - else: - ctx.status[spec.step_id] = "skipped" - reason = f"缺少必要的上下文参数,自动跳过: {missing}" - ctx.append_log(f"[RUNNER] {spec.step_id} {reason}") - self._notify_step(spec.step_id, "skipped", reason) - continue - - # ── ★ 断点续跑:产物文件已存在则跳过 ── - resolved_path = self._resolve_path(spec.output_file, ctx) - if resolved_path and os.path.exists(resolved_path): - ctx.status[spec.step_id] = "skipped" - reason = f"产物已存在,跳过: {resolved_path}" - ctx.append_log(f"[RUNNER] {spec.step_id} {reason}") - self._notify_step(spec.step_id, "skipped", reason) - self._restore_ctx_from_output(spec, resolved_path, ctx) + step_cfg = self.config.get(step.step_id, {}) + if not step_cfg.get('enabled', True): continue - # ── 执行(正常路径) ── + # 4.1 检查磁盘产物:如果已落盘,恢复上下文并跳过(拒绝静默跳过,必须打日志) + if step.output_file and os.path.exists(step.output_file): + for prod in step.produces: + if not (hasattr(ctx, prod) and getattr(ctx, prod)): + setattr(ctx, prod, step.output_file) + ctx.status[step.step_id] = "skipped" + ctx.append_log(f"[CACHE] 产物已存在,跳过运行并恢复上下文: {step.step_id}") + self._notify_step(step.step_id, "skipped", "产物已存在(断点续跑)") + continue + + # 4.2 依赖死线检查 + missing = [req for req in step.requires if not (hasattr(ctx, req) and getattr(ctx, req))] + if missing: + ctx.status[step.step_id] = "skipped" + reason = f"缺少必要的上下文参数,自动跳过: {missing}" + ctx.append_log(f"[RUNNER] 跳过 {step.step_id},仍缺少必要参数: {missing}") + self._notify_step(step.step_id, "skipped", reason) + continue + + # 4.3 真正执行 + ctx.append_log(f"[START] 正在执行步骤: {step.step_id}") + self._notify_step(step.step_id, "running", f"正在执行: {step.description}") try: - self._invoke(spec, ctx) + method = getattr(self.pipeline, step.method_name) + + sig = inspect.signature(method) + kwargs = {} + current_step_cfg = self.config.get(step.step_id, {}) + + for param_name in sig.parameters: + # 优先级 1:直接使用当前步骤专属配置中的值 + if param_name in current_step_cfg: + kwargs[param_name] = current_step_cfg[param_name] + continue + + # 优先级 1.5:【核心修复】硬隔离 output_file,防止被其他步骤的同名变量污染 + if param_name == 'output_file' and hasattr(step, 'output_file') and step.output_file: + work_dir = getattr(ctx, 'work_dir', '') + kwargs[param_name] = step.output_file.format(work_dir=work_dir) + continue + + # 优先级 2:处理跨步骤的映射逻辑 + ctx_key = param_name + if hasattr(step, 'parameter_map') and step.parameter_map: + for k, v in step.parameter_map.items(): + if v == param_name: + ctx_key = k + break + # 优先级 3:从全局大背包 ctx 中取(排在最后) + if hasattr(ctx, ctx_key): + kwargs[param_name] = getattr(ctx, ctx_key) + + # 使用解包后的关键字参数调用底层函数 + result = method(**kwargs) + + # 【产物接力 1】:如果底层函数返回了字典,直接合并到上下文 + if isinstance(result, dict): + for k, v in result.items(): + setattr(ctx, k, v) + + # 【产物接力 2】:强制通过 StepSpec 的 output_file 模板注入 + if hasattr(step, 'output_file') and step.output_file: + work_dir = getattr(ctx, 'work_dir', '') + actual_out_path = step.output_file.format(work_dir=work_dir) + for prod in step.produces: + if not hasattr(ctx, prod) or not getattr(ctx, prod): + setattr(ctx, prod, actual_out_path) + logger.info(f"[产物接力] 登记 {prod} = {actual_out_path}") except PipelineHalt: - # ★ PipelineHalt 不走 error_summary,触发立即 break - ctx.append_log(f"[RUNNER] PipelineHalt 硬终止 @ {spec.step_id}") - self._notify_step(spec.step_id, "error", "预检失败,硬终止") + ctx.status[step.step_id] = "error" + ctx.append_log(f"[RUNNER] PipelineHalt 硬终止 @ {step.step_id}") + self._notify_step(step.step_id, "error", "预检失败,硬终止") break - except Exception as exc: - ctx.status[spec.step_id] = "error" - error_summary.append((spec.step_id, str(exc))) - ctx.last_error = f"{spec.step_id}: {exc!r}" - ctx.append_log(f"[RUNNER] {spec.step_id} 异常: {exc!r}") - self._notify_step(spec.step_id, "error", str(exc)) - # ★ 任意 Exception 均立即 break,不再执行后续步骤 + except Exception as e: + ctx.status[step.step_id] = "error" + error_summary.append((step.step_id, str(e))) + ctx.last_error = f"{step.step_id}: {e!r}" + ctx.append_log(f"[ERROR] 步骤 {step.step_id} 执行崩溃: {str(e)}") + self._notify_step(step.step_id, "error", str(e)) break ctx.pipeline_end_time = time.time() @@ -397,80 +471,6 @@ class PipelineRunner: f"智能补全已自动开启 {len(newly_locked)} 个步骤:{newly_locked}" ) - # ------------------------------------------------------------------ - # ★ 依赖级联自动唤醒引擎 - # ------------------------------------------------------------------ - - def _resolve_dependencies(self, ctx: PipelineContext) -> None: - """依赖追溯循环:遍历所有 enabled 步骤,强制唤醒缺失前置。 - - 核心逻辑: - - 遍历当前 enabled=True 的步骤,检查其 requires - - 若所需 key 在 ctx 中不存在,则向上寻找 produces 该 key 的前置 Step - - 将该前置 Step 强制设为 enabled=True(加入 locked_steps) - - 递归执行,直到所有前置缺口都被强制补齐 - - 已存在的产物文件自动回填 ctx - """ - # 构建 produces→step_id 反查表(仅关注 enabled 或潜在的前置步骤) - produce_to_step: Dict[str, StepSpec] = {} - for spec in self.steps: - for key in spec.produces: - produce_to_step[key] = spec - - woke_up: List[str] = [] - changed = True - - while changed: - changed = False - for spec in self.steps: - if not spec.enabled: - continue - - for required_key in spec.requires: - # ctx 已有值 → 无需追溯 - if ctx.get(required_key): - continue - - # 磁盘文件是否存在(work_dir 产物已落盘但 ctx 未回填的情况) - resolved = self._resolve_output_for_key(required_key, ctx) - if resolved and os.path.exists(resolved): - ctx.set(required_key, resolved) - continue - - # 缺少且无磁盘产物 → 追溯 produces 者 - if required_key not in produce_to_step: - continue - - provider = produce_to_step[required_key] - if provider.enabled: - continue # 已开启但尚未执行(会在主循环中处理) - - # 强制唤醒 - provider.enabled = True - if provider.step_id not in ctx.locked_steps: - ctx.locked_steps.append(provider.step_id) - woke_up.append(provider.step_id) - ctx.append_log( - f"[INFO] 因下游依赖需求,自动唤醒并执行步骤: {provider.step_id}" - ) - - # 递归:检查新开启步骤自身的前置是否也缺失 - changed = True - - if woke_up: - detail = "、".join(woke_up) - ctx.append_log( - f"[RUNNER] ★ 依赖级联自动唤醒已完成,共开启 {len(woke_up)} 个步骤:{detail}" - ) - self._notify_step( - "全流程", "info", - f"依赖级联自动唤醒 {len(woke_up)} 个步骤:{woke_up}" - ) - # 扫描新开启步骤的 work_dir 产物,回填 ctx - for spec in self.steps: - if spec.step_id in woke_up: - self._scan_single_step_outputs(spec, ctx) - def _resolve_output_for_key( self, produce_key: str, ctx: PipelineContext ) -> Optional[str]: diff --git a/src/gui/core/preflight_dialog.py b/src/gui/core/preflight_dialog.py new file mode 100644 index 0000000..c13830c --- /dev/null +++ b/src/gui/core/preflight_dialog.py @@ -0,0 +1,431 @@ +# -*- coding: utf-8 -*- +""" +预检交互对话框:一次性全预检 + 用户交互式决策。 + +用户点击"运行"后,若存在缺失项: + - 列出每个缺失项(步骤名 + 原因) + - 每项提供"填写"(跳转面板)和"忽略"(加入 skip_list)选项 + - 底部三个操作按钮决定流程走向 +""" +import os +from dataclasses import dataclass +from typing import Dict, List, Optional, Set, Tuple + +from PyQt5.QtWidgets import ( + QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, + QScrollArea, QWidget, QCheckBox, QGroupBox, QFrame, + QSizePolicy, QStyleFactory, +) +from PyQt5.QtCore import Qt +from PyQt5.QtGui import QFont, QColor, QPalette + +from src.core.pipeline.runner import PIPELINE_STEPS + + +@dataclass +class MissingItem: + """单个缺失项的结构化描述""" + step_id: str # step_id,如 "step1"、"step8_non_empirical_modeling" + step_name: str # 面板 tab 显示名称,如 "水域掩膜" + reason: str # 缺失原因,如 "缺少参考影像路径" + panel_tab_index: int # step_stack 中的 tab 索引(用于切换) + is_critical: bool = False # 是否为阻断性缺失(img_path 缺失 = True) + + +# ============================================================ +# PreflightDialog +# ============================================================ + +class PreflightDialog(QDialog): + """预检交互对话框。 + + 对每个 MissingItem,用户可选择: + - 勾选"忽略":将该 step_id 加入 skip_list,运行时跳过 + - 点击"填写":关闭弹窗,切换到对应面板 tab + + 对话框结果 (exec 返回值): + - QDialog.Accepted + self.result_data = ("fill", step_id) + → 填写待办:切换到目标面板,停止流程 + - QDialog.Accepted + self.result_data = ("skip", skip_list) + → 强制跳过:携带 skip_list 继续运行 + - QDialog.Rejected + → 取消运行:完全停止 + """ + + # step_id → (step_name, panel_tab_index) + STEP_TAB_MAP = { + "step1": ("水域掩膜", 0), + "step2": ("耀斑检测", 1), + "step3": ("耀斑去除", 2), + "step4": ("数据清洗", 3), + "step5": ("特征构建", 4), + "step8": ("水质指数", 5), + "step7": ("监督建模", 6), + "step8_non_empirical_modeling": ("回归建模", 7), + "step9": ("自定义回归建模", 8), + "step10": ("采样点布设", 9), + "step11_ml": ("监督预测", 10), + "step11": ("回归预测", 11), + "step12": ("自定义回归预测", 12), + "step14": ("专题图生成", 13), + } + + def __init__(self, missing_items: List[MissingItem], parent=None): + super().__init__(parent) + self.missing_items = missing_items + self.result_data: Optional[Tuple[str, any]] = None # ("fill", step_id) | ("skip", [step_id]) + self._skip_checkboxes: List[QCheckBox] = [] + self._fill_buttons: List[QPushButton] = [] + + self.setWindowTitle("⚠ 预检发现缺失项") + self.setMinimumSize(680, 420) + self.setModal(True) + self._setup_ui() + + # ------------------------------------------------------------------ + # UI 构建 + # ------------------------------------------------------------------ + + def _setup_ui(self): + main_layout = QVBoxLayout(self) + main_layout.setContentsMargins(20, 20, 20, 16) + main_layout.setSpacing(10) + + # ── 顶部提示 ── + header_label = QLabel( + f"检测到 {len(self.missing_items)} 个缺失项,请逐项处理后继续:" + ) + header_label.setStyleSheet("font-size: 14px; color: #e67e22; font-weight: bold;") + main_layout.addWidget(header_label) + + # ── 滚动区域(缺失项列表) ── + scroll = QScrollArea() + scroll.setWidgetResizable(True) + scroll.setFrameShape(QFrame.NoFrame) + scroll.setStyleSheet("background: transparent;") + + container = QWidget() + container_layout = QVBoxLayout(container) + container_layout.setContentsMargins(0, 0, 8, 0) + container_layout.setSpacing(8) + + for item in self.missing_items: + row = self._build_item_row(item) + container_layout.addWidget(row) + + container_layout.addStretch() + scroll.setWidget(container) + main_layout.addWidget(scroll, 1) + + # ── 底部操作按钮 ── + btn_layout = QHBoxLayout() + btn_layout.setSpacing(12) + + # 取消运行(左) + cancel_btn = QPushButton("取消运行") + cancel_btn.setCursor(Qt.PointingHandCursor) + cancel_btn.setMinimumHeight(38) + cancel_btn.setStyleSheet( + "QPushButton { background: #95a5a6; color: white; border-radius: 6px; " + "font-weight: bold; font-size: 13px; padding: 4px 16px; }" + "QPushButton:hover { background: #7f8c8d; }" + ) + cancel_btn.clicked.connect(self._on_cancel) + btn_layout.addWidget(cancel_btn) + + btn_layout.addStretch() + + # 强制跳过运行(中) + skip_btn = QPushButton("强制跳过运行") + skip_btn.setCursor(Qt.PointingHandCursor) + skip_btn.setMinimumHeight(38) + skip_btn.setStyleSheet( + "QPushButton { background: #3498db; color: white; border-radius: 6px; " + "font-weight: bold; font-size: 13px; padding: 4px 16px; }" + "QPushButton:hover { background: #2980b9; }" + ) + skip_btn.clicked.connect(self._on_force_skip) + btn_layout.addWidget(skip_btn) + + # 填写待办(右,primary) + fill_btn = QPushButton("填写待办") + fill_btn.setCursor(Qt.PointingHandCursor) + fill_btn.setMinimumHeight(38) + fill_btn.setDefault(True) + fill_btn.setAutoDefault(True) + fill_btn.setStyleSheet( + "QPushButton { background: #27ae60; color: white; border-radius: 6px; " + "font-weight: bold; font-size: 13px; padding: 4px 16px; }" + "QPushButton:hover { background: #1e8449; }" + ) + fill_btn.clicked.connect(self._on_fill_first) + btn_layout.addWidget(fill_btn) + + main_layout.addLayout(btn_layout) + + def _build_item_row(self, item: MissingItem) -> QWidget: + """构建单个缺失项行 widget。""" + frame = QFrame() + frame.setFrameShape(QFrame.StyledPanel) + frame.setStyleSheet( + "QFrame { background: #2c3e50; border-radius: 8px; padding: 10px; }" + "QFrame[critical=true] { border: 2px solid #e74c3c; }" + "QFrame[critical=false] { border: 1px solid #34495e; }" + ) + frame.setProperty("critical", item.is_critical) + + layout = QVBoxLayout(frame) + layout.setContentsMargins(12, 10, 12, 10) + layout.setSpacing(6) + + # ── 第一行:步骤标签 + 原因 ── + top = QHBoxLayout() + top.setSpacing(8) + + # 步骤名标签 + name_label = QLabel(f"📌 {item.step_name}") + name_label.setStyleSheet( + "font-size: 13px; font-weight: bold; color: #f39c12; background: #1a252f; " + "border-radius: 4px; padding: 4px 10px;" + ) + top.addWidget(name_label) + + # 阻断性标记 + if item.is_critical: + critical_label = QLabel("阻断") + critical_label.setStyleSheet( + "background: #e74c3c; color: white; border-radius: 4px; " + "font-size: 11px; font-weight: bold; padding: 3px 8px;" + ) + top.addWidget(critical_label) + + top.addStretch() + + # "填写"按钮 + fill_btn = QPushButton("填写") + fill_btn.setCursor(Qt.PointingHandCursor) + fill_btn.setFixedWidth(70) + fill_btn.setFixedHeight(28) + fill_btn.setStyleSheet( + "QPushButton { background: #27ae60; color: white; border-radius: 5px; " + "font-size: 12px; font-weight: bold; }" + "QPushButton:hover { background: #1e8449; }" + ) + fill_btn.clicked.connect(lambda *a, sid=item.step_id, idx=item.panel_tab_index: self._on_fill(sid, idx)) + self._fill_buttons.append(fill_btn) + top.addWidget(fill_btn) + + layout.addLayout(top) + + # ── 第二行:原因文本 ── + reason_label = QLabel(item.reason) + reason_label.setWordWrap(True) + reason_label.setStyleSheet( + "font-size: 12px; color: #bdc3c7; background: transparent; padding: 2px 4px;" + ) + reason_label.setTextInteractionFlags(Qt.TextSelectableByMouse) + layout.addWidget(reason_label) + + # ── 第三行:忽略复选框 ── + bottom = QHBoxLayout() + bottom.addStretch() + + skip_cb = QCheckBox("忽略此项(强制跳过)") + skip_cb.setCursor(Qt.PointingHandCursor) + skip_cb.setStyleSheet( + "QCheckBox { color: #95a5a6; font-size: 12px; spacing: 6px; }" + "QCheckBox::indicator { width: 16px; height: 16px; }" + ) + skip_cb.setChecked(False) + skip_cb.stateChanged.connect( + lambda state, cb=skip_cb: cb.setStyleSheet( + "QCheckBox { color: #27ae60; font-size: 12px; spacing: 6px; }" + "QCheckBox::indicator { width: 16px; height: 16px; }" + if state else + "QCheckBox { color: #95a5a6; font-size: 12px; spacing: 6px; }" + "QCheckBox::indicator { width: 16px; height: 16px; }" + ) + ) + self._skip_checkboxes.append((item.step_id, skip_cb)) + bottom.addWidget(skip_cb) + + layout.addLayout(bottom) + return frame + + # ------------------------------------------------------------------ + # 槽函数 + # ------------------------------------------------------------------ + + def _on_cancel(self): + """取消运行:完全停止。""" + self.result_data = None + self.reject() + + def _on_force_skip(self): + """强制跳过:收集所有被勾选"忽略"的 step_id,携带 skip_list 继续。""" + skip_list = [ + step_id for step_id, cb in self._skip_checkboxes if cb.isChecked() + ] + self.result_data = ("skip", skip_list) + self.accept() + + def _on_fill_first(self): + """填写待办:找到第一个未被勾选"忽略"的缺失项,切换到其面板。""" + for step_id, cb in self._skip_checkboxes: + if not cb.isChecked(): + item = self._find_item(step_id) + if item: + self.result_data = ("fill", step_id, item.panel_tab_index) + self.accept() + return + # 所有项都被勾选 → 等同于 force_skip + self._on_force_skip() + + def _on_fill(self, step_id: str, tab_index: int): + """填写:直接切换到指定面板。""" + self.result_data = ("fill", step_id, tab_index) + self.accept() + + # ------------------------------------------------------------------ + # 辅助 + # ------------------------------------------------------------------ + + def _find_item(self, step_id: str) -> Optional[MissingItem]: + for item in self.missing_items: + if item.step_id == step_id: + return item + return None + + def get_result(self) -> Optional[Tuple[str, any]]: + """供外部获取结果。""" + return self.result_data + + @staticmethod + def build_missing_items(config: dict) -> List[MissingItem]: + """DAG-aware 预检:从 config 构建缺失项列表。 + + 拓扑预判逻辑: + 1. 按 pipeline 顺序遍历所有 enabled=True 的步骤,收集其 produces 列表, + 构建「动态产物集合」dynamically_produced_keys。 + 2. 检查某个 required_input_file 时: + - 若磁盘已存在 → OK(用户已手动提供) + - 若 key 在 dynamically_produced_keys 中 → OK(前置步骤会生成) + - 否则 → MissingItem(真正缺失) + 3. 智能免检规则: + - formula_csv_path:底层的完全可选参数,任何情况下都免检。 + - step5 boundary_path:若 step1 enabled 或 config 中有 water_mask_path, + 则信任 panel/底层的自动推导机制,不拦截。 + - step14 boundary_shp_path:若 step1 enabled,信任 panel 的自动回填, + 不拦截。 + + 关键阻断项(is_critical=True):step1 img_path 缺失。 + """ + items: List[MissingItem] = [] + + step1_cfg = config.get('step1', {}) + step1_enabled = step1_cfg.get('enabled', False) + + # ── ★ 构建「动态产物集合」:按 pipeline 顺序收集所有 enabled 步骤的 produces ── + dynamically_produced_keys: Set[str] = set() + enabled_step_ids: Set[str] = set() + + for step_spec in PIPELINE_STEPS: + step_cfg = config.get(step_spec.step_id, {}) + if not step_cfg.get('enabled', True): + continue + enabled_step_ids.add(step_spec.step_id) + dynamically_produced_keys.update(step_spec.produces) + + # ── step1 img_path(阻断性)─────────────────────────────── + img_path = step1_cfg.get('img_path') + if not img_path: + items.append(MissingItem( + step_id="step1", step_name="水域掩膜", + reason="缺少参考影像路径 → 请在「阶段一」中填写「参考影像」", + panel_tab_index=0, is_critical=True + )) + elif not os.path.isfile(img_path): + items.append(MissingItem( + step_id="step1", step_name="水域掩膜", + reason=f"参考影像文件不存在:{img_path}", + panel_tab_index=0, is_critical=True + )) + + # ── step4 csv_path(纯外部输入,必须手动提供)─────────────── + step4_cfg = config.get('step4', {}) + step4_enabled = step4_cfg.get('enabled', True) + if step4_enabled: + csv_path = step4_cfg.get('csv_path') + if not csv_path: + items.append(MissingItem( + step_id="step4", step_name="数据清洗", + reason="请在「数据清洗」中填写「实测水质数据 CSV」", + panel_tab_index=3 + )) + elif not os.path.isfile(csv_path): + items.append(MissingItem( + step_id="step4", step_name="数据清洗", + reason=f"实测水质数据文件不存在:{csv_path}", + panel_tab_index=3 + )) + + # ── step12 formula_csv_path(绝对免检:底层完全可选)──────── + # formula_csv_path 在底层 CustomRegressionPredictor 中不传即可运行, + # 只影响日志输出,不阻断任何功能。此处不做任何检查。 + + # ── ★ DAG-aware 检查:遍历 enabled 步骤的 required_input_files ── + PURE_EXTERNAL_INPUT_KEYS: Set[str] = {'img_path', 'csv_path'} + _TAB_INDEX_MAP: Dict[str, int] = { + "step1": 0, "step2": 1, "step3": 2, "step4": 3, + "step5": 4, "step8": 5, "step7": 6, + "step8_non_empirical_modeling": 7, "step9": 8, + "step10": 9, "step11_ml": 10, "step11": 11, + "step12": 12, "step14": 13, + } + _STEP_NAME_MAP: Dict[str, str] = { + "step1": "水域掩膜", "step2": "耀斑检测", "step3": "耀斑去除", + "step4": "数据清洗", "step5": "特征构建", "step8": "水质指数", + "step7": "监督建模", "step8_non_empirical_modeling": "回归建模", + "step9": "自定义回归建模", "step10": "采样点布设", + "step11_ml": "监督预测", "step11": "回归预测", + "step12": "自定义回归预测", "step14": "专题图生成", + } + + for step_spec in PIPELINE_STEPS: + if step_spec.step_id not in enabled_step_ids: + continue + step_cfg = config.get(step_spec.step_id, {}) + tab_idx = _TAB_INDEX_MAP.get(step_spec.step_id, 0) + step_name = _STEP_NAME_MAP.get(step_spec.step_id, step_spec.step_id) + + for req_key in step_spec.required_input_files: + # ★★★ 高优先级硬编码白名单 ★★★ + # 当检测到需求为边界文件时,只要 step1 有填影像(代表有基础,底层能自动推导),直接放行 + if req_key in ('boundary_path', 'boundary_shp_path'): + step1_cfg = config.get('step1', {}) + if step1_cfg.get('img_path') or step1_cfg.get('enabled', True): + continue # 直接跳过,不判定为缺失 + if req_key in PURE_EXTERNAL_INPUT_KEYS: + continue + if req_key == 'formula_csv_path': + continue # ★ 底层完全可选,赦免 + if req_key == 'boundary_path' and step_spec.step_id == 'step5': + continue # ★ step1 执行则 panel/底层自动推导,赦免 + if req_key == 'boundary_shp_path' and step_spec.step_id == 'step14': + continue # ★ step1 执行则 panel 自动回填,赦免 + cfg_val = step_cfg.get(req_key) + if cfg_val and os.path.isfile(cfg_val): + continue + if cfg_val and os.path.isdir(cfg_val): + continue + if req_key in dynamically_produced_keys: + continue # ★ 前置步骤会生成,拓扑预判通过 + items.append(MissingItem( + step_id=step_spec.step_id, + step_name=step_name, + reason=f"缺少必需文件/目录 [{req_key}]", + panel_tab_index=tab_idx, + is_critical=(step_spec.step_id == "step1" and req_key == "img_path"), + )) + + return items \ No newline at end of file diff --git a/src/gui/core/worker_thread.py b/src/gui/core/worker_thread.py index efd90fa..3c20f36 100644 --- a/src/gui/core/worker_thread.py +++ b/src/gui/core/worker_thread.py @@ -2,8 +2,12 @@ """ 后台线程模块:Pipeline 执行线程与诊断逻辑。 """ +import os import traceback +from typing import Dict, List from PyQt5.QtCore import QThread, pyqtSignal +from src.core.pipeline.runner import PipelineRunner, PipelineHalt +from src.core.pipeline.context import PipelineContext # ============================================================================= @@ -189,12 +193,13 @@ class WorkerThread(QThread): step_completed = pyqtSignal(str, bool, str) # 步骤完成信号 (step_name, success, message) finished = pyqtSignal(bool, str) # 完成信号 (success, message) - def __init__(self, work_dir: str, config, mode='full', step_name=None): + def __init__(self, work_dir: str, config, mode='full', step_name=None, skip_list=None): super().__init__() self.work_dir = str(work_dir) self.config = config self.mode = mode # 'full' 或 'single_step' self.step_name = step_name # 单步执行时的步骤名称 + self.skip_list = skip_list if skip_list else [] # PreflightDialog 用户选择的跳过列表 self.pipeline = None self.is_running = True self.current_step = None @@ -219,6 +224,12 @@ class WorkerThread(QThread): self.step_completed.emit(step_name, True, f"跳过: {message}") progress = int((self.step_count / self.total_steps) * 100) self.progress_update.emit(progress, f"已跳过: {step_name}") + elif status == "user_skipped": + self.step_count += 1 + self.log_message.emit(f"[USER_SKIP] ⚠ 用户强制跳过: {step_name} — {message}", "warning") + self.step_completed.emit(step_name, True, f"用户强制跳过: {message}") + progress = int((self.step_count / self.total_steps) * 100) + self.progress_update.emit(progress, f"已跳过(用户): {step_name}") elif status == "error": self.log_message.emit(f"[ERROR] 错误: {step_name} - {message}", "error") self.step_completed.emit(step_name, False, message) @@ -247,8 +258,6 @@ class WorkerThread(QThread): mpl_prev = None try: from src.core.water_quality_inversion_pipeline_GUI import WaterQualityInversionPipeline - from src.core.pipeline.runner import PipelineRunner - from src.core.pipeline.context import PipelineContext self.pipeline = WaterQualityInversionPipeline(work_dir=self.work_dir) @@ -257,21 +266,23 @@ class WorkerThread(QThread): if hasattr(self.pipeline, 'set_callback'): self.pipeline.set_callback(self.pipeline_callback) + # ── ★ 预检已由 GUI 层 perform_preflight() 完成,此处不再重复预检 ── + # 构造上下文 (Ctx),将 config 整体注入 user_config ctx = PipelineContext( img_path=self.config.get('step1', {}).get('img_path'), water_mask_path=self.config.get('step1', {}).get('mask_path'), csv_path=self.config.get('step4', {}).get('csv_path'), boundary_path=self.config.get('step5', {}).get('boundary_path'), - boundary_shp_path=self.config.get('step9', {}).get('boundary_shp_path'), - formula_csv_path=self.config.get('step8_75', {}).get('formula_csv_path'), + boundary_shp_path=self.config.get('step14', {}).get('boundary_shp_path'), + formula_csv_path=self.config.get('step12', {}).get('formula_csv_path'), work_dir=self.work_dir, user_config=self.config ) # 启动新调度器 runner = PipelineRunner(self.pipeline) - result_ctx = runner.run(ctx) + result_ctx = runner.run(ctx, config=self.config, skip_list=self.skip_list) if result_ctx.last_error: raise RuntimeError(f"流水线执行失败: {result_ctx.last_error}") @@ -289,6 +300,11 @@ class WorkerThread(QThread): self.progress_update.emit(100, f"步骤 {self.step_name} 执行完成") self.finished.emit(True, f"步骤 {self.step_name} 独立运行成功!") + except PipelineHalt as exc: + # 预检失败 / 硬终止:透传清晰错误信息,不打印完整 traceback + error_msg = str(exc) + self.log_message.emit(f"[预检失败] {error_msg}", "error") + self.finished.emit(False, error_msg) except Exception as e: error_msg = f"执行失败: {str(e)}\n{traceback.format_exc()}" self.log_message.emit(error_msg, "error") @@ -309,15 +325,15 @@ class WorkerThread(QThread): 'step3': 'step3_remove_glint', 'step4': 'step4_process_csv', 'step5': 'step5_extract_training_spectra', - 'step5_5': 'step5_5_calculate_water_quality_indices', - 'step6': 'step6_train_models', - 'step6_5': 'step6_5_non_empirical_modeling', - 'step6_75': 'step6_75_custom_regression', - 'step7': 'step7_generate_sampling_points', - 'step8': 'step8_predict_water_quality', - 'step8_5': 'step8_5_predict_with_non_empirical_models', - 'step8_75': 'step8_75_predict_with_custom_regression', - 'step9': 'step9_generate_distribution_map' + 'step8': 'step8_water_quality_indices', + 'step7': 'step7_ml_modeling', + 'step8_non_empirical_modeling': 'step8_non_empirical_modeling', + 'step9': 'step9_custom_regression', + 'step10': 'step10_sampling', + 'step11_ml': 'step11_ml_prediction', + 'step11': 'step11_non_empirical_prediction', + 'step12': 'step12_custom_regression_prediction', + 'step14': 'step14_distribution_map' } if step_name not in step_method_map: @@ -326,7 +342,7 @@ class WorkerThread(QThread): method_name = step_method_map[step_name] step_config = dict(config.get(step_name, {})) - # 透传面板顶层传入的外部预训练模型(GUI step8_panel 通过 config['_external_model'] 传入) + # 透传面板顶层传入的外部预训练模型(GUI step11_prediction_panel 通过 config['_external_model'] 传入) # 非空才覆盖(遵循 feedback_never_overwrite_with_empty 原则) for key in ('_external_model', '_external_model_path', '_external_models_dict', '_external_model_dir'): @@ -340,15 +356,15 @@ class WorkerThread(QThread): step_config['skip_dependency_check'] = True - if step_name == 'step9': + if step_name == 'step14': step_config.pop('step9_batch_mode', None) step_config.pop('prediction_csv_dir', None) step_config.pop('recursive_csv_scan', None) - if step_name in ['step2', 'step3', 'step4', 'step5', 'step6', 'step7', 'step8', 'step8_5', 'step8_75']: + if step_name in ['step2', 'step3', 'step4', 'step5', 'step7', 'step10', 'step11_ml', 'step11', 'step12']: step_config.pop('output_path', None) - if step_name == 'step8_5' and 'models_dir' in step_config: + if step_name == 'step11' and 'models_dir' in step_config: step_config['non_empirical_models_dir'] = step_config.pop('models_dir') method = getattr(self.pipeline, method_name) @@ -356,6 +372,167 @@ class WorkerThread(QThread): return result + # ------------------------------------------------------------------ + # 静态预检方法(GUI run_full_pipeline 也调用同一逻辑) + # ------------------------------------------------------------------ + + @staticmethod + def check_paths(config: Dict, work_dir: str) -> None: + """全流程关键路径预检——遍历所有步骤,实名举报缺失项。 + + 通过 config[step_id] 判断各步骤的面板配置是否已填写, + 对非空路径进一步验证磁盘文件是否存在。 + 发现任一关键项缺失,立即抛出 PipelineHalt 并给出定位式错误信息。 + + Args: + config: 步骤配置字典(从 GUI get_current_config() 获取) + work_dir: 工作目录路径 + + Raises: + PipelineHalt: 任一步骤关键路径缺失或文件不存在时抛出 + """ + errors: List[str] = [] + + # ── 步骤1:参考影像(无则全流程无法启动) ── + step1_cfg = config.get('step1', {}) + img_path = step1_cfg.get('img_path') + if not img_path: + errors.append( + "步骤 1:缺少参考影像路径。\n" + " → 请在「流程步骤-阶段一」中填写「参考影像」字段。" + ) + elif not os.path.isfile(img_path): + errors.append( + f"步骤 1:参考影像文件不存在:\n {img_path}\n" + " → 请检查路径是否正确,或重新选择影像文件。" + ) + + # ── 步骤2:耀斑区域检测 ── + step2_cfg = config.get('step2', {}) + glint_mask_path = step2_cfg.get('glint_mask_path') + if glint_mask_path and not os.path.isfile(glint_mask_path): + errors.append( + f"步骤 2:耀斑掩膜文件不存在:\n {glint_mask_path}\n" + " → 请确认「耀斑区域检测」已成功运行,或重新配置路径。" + ) + + # ── 步骤3:耀斑去除(依赖 step2 产物) ── + step3_cfg = config.get('step3', {}) + deglint_path = step3_cfg.get('deglint_path') + if deglint_path and not os.path.isfile(deglint_path): + errors.append( + f"步骤 3:去耀斑影像文件不存在:\n {deglint_path}\n" + " → 请确认「耀斑去除」已成功运行,或重新配置路径。" + ) + + # ── 步骤4:实测水质数据 CSV ── + step4_cfg = config.get('step4', {}) + csv_path = step4_cfg.get('csv_path') + if csv_path and not os.path.isfile(csv_path): + errors.append( + f"步骤 4:实测水质数据文件不存在:\n {csv_path}\n" + " → 请检查 CSV 路径是否正确,或重新上传数据文件。" + ) + + # ── 步骤5:采样点平均光谱提取 ── + step5_cfg = config.get('step5', {}) + step5_csv = step5_cfg.get('csv_path') + boundary_path = step5_cfg.get('boundary_path') + if step5_csv and not os.path.isfile(step5_csv): + errors.append( + f"步骤 5:实测水质数据文件不存在:\n {step5_csv}\n" + " → 请检查「流程步骤-阶段五」中的 CSV 路径。" + ) + if boundary_path and not os.path.isfile(boundary_path): + errors.append( + f"步骤 5:边界矢量文件不存在:\n {boundary_path}\n" + " → 请确认「流程步骤-阶段五」中已填写有效的边界 shp 路径。" + ) + + # ── 步骤8(水质指数):训练光谱 CSV ── + step8_cfg = config.get('step8', {}) + training_csv = step8_cfg.get('training_csv_path') + if training_csv and not os.path.isfile(training_csv): + errors.append( + f"步骤 8(水质指数):训练光谱文件不存在:\n {training_csv}\n" + " → 请确认步骤 5 已成功运行并生成了训练光谱。" + ) + + # ── 步骤7(ML 建模) ── + step7_cfg = config.get('step7', {}) + step7_csv = step7_cfg.get('training_csv_path') + if step7_csv and not os.path.isfile(step7_csv): + errors.append( + f"步骤 7(ML 建模):训练光谱文件不存在:\n {step7_csv}\n" + " → 请确认步骤 5 已成功运行并生成了训练光谱。" + ) + + # ── 步骤11 ML 预测:密集采样点 CSV + 模型目录 ── + step11_ml_cfg = config.get('step11_ml', {}) + ml_csv = step11_ml_cfg.get('sampling_csv_path') + models_dir = step11_ml_cfg.get('models_dir') + if ml_csv and not os.path.isfile(ml_csv): + errors.append( + f"步骤 11 ML 预测:采样点 CSV 不存在:\n {ml_csv}\n" + " → 请确认「流程步骤-阶段七(采样点布设)」已成功运行。" + ) + if models_dir and not os.path.isdir(models_dir): + errors.append( + f"步骤 11 ML 预测:模型目录不存在:\n {models_dir}\n" + " → 请确认「流程步骤-阶段六(机器学习建模)」已成功运行。" + ) + + # ── 步骤11 回归预测:模型目录 ── + step11_cfg = config.get('step11', {}) + step11_csv = step11_cfg.get('sampling_csv_path') + step11_dir = step11_cfg.get('models_dir') + if step11_csv and not os.path.isfile(step11_csv): + errors.append( + f"步骤 11 回归预测:采样点 CSV 不存在:\n {step11_csv}\n" + " → 请确认「流程步骤-阶段七(采样点布设)」已成功运行。" + ) + if step11_dir and not os.path.isdir(step11_dir): + errors.append( + f"步骤 11 回归预测:模型目录不存在:\n {step11_dir}\n" + " → 请确认「流程步骤-阶段八(非经验建模)」已成功运行。" + ) + + # ── 步骤12 自定义回归预测:公式 CSV ── + step12_cfg = config.get('step12', {}) + formula_csv = step12_cfg.get('formula_csv_path') + if formula_csv and not os.path.isfile(formula_csv): + errors.append( + f"步骤 12(自定义回归预测):公式 CSV 文件不存在:\n {formula_csv}\n" + " → 请确认「流程步骤-阶段十二」中已填写有效的公式文件路径。" + ) + + # ── 步骤14 专题图:预测结果 CSV + 边界 shp ── + step14_cfg = config.get('step14', {}) + pred_csv = step14_cfg.get('prediction_csv_path') + boundary_shp = step14_cfg.get('boundary_shp_path') + if pred_csv and not os.path.isfile(pred_csv): + errors.append( + f"步骤 14(专题图):预测结果 CSV 不存在:\n {pred_csv}\n" + " → 请确认机器学习或回归预测步骤已成功运行。" + ) + if boundary_shp and not os.path.isfile(boundary_shp): + errors.append( + f"步骤 14(专题图):边界 shp 文件不存在:\n {boundary_shp}\n" + " → 请确认「流程步骤-阶段十四」中已填写有效的边界矢量文件路径。" + ) + + # ── 汇总报错:任一缺失立即抛出 PipelineHalt ── + if errors: + lines = [f" • {e}" for e in errors] + detail = "\n".join(lines) + raise PipelineHalt( + f"【预检失败】\n\n" + f"检测到 {len(errors)} 个缺失项,请逐一修正后重新运行:\n\n" + f"{detail}\n\n" + f"提示:已完成的步骤产物(如水域掩膜、去耀斑影像)会被自动回填;" + f"\n若文件不存在,请先运行对应步骤生成产物。" + ) + def stop(self): """停止执行""" self.is_running = False