feat(pipeline): 一键式运行 - 调度引擎核心 + 预检/免检系统 + 线程桥接

This commit is contained in:
DXC
2026-06-09 11:29:11 +08:00
parent 624a5bdcd4
commit aefc9d5aac
3 changed files with 766 additions and 158 deletions

View File

@ -17,6 +17,8 @@ PipelineRunner基于 StepSpec 声明式调度 14 个 step。
"""
from __future__ import annotations
import inspect
import logging
import os
import time
from dataclasses import dataclass, field
@ -24,6 +26,8 @@ from typing import Any, Dict, List, Optional, Sequence
from .context import PipelineContext, STEP_MAP_OLD_TO_NEW, STEP_MAP_NEW_TO_OLD, resolve_step_id
logger = logging.getLogger(__name__)
# ============================================================
# 终止异常(外层 run() 不 catch触发循环 break
@ -69,21 +73,21 @@ class StepSpec:
PIPELINE_STEPS: List[StepSpec] = [
StepSpec(
step_id="step1", method_name="step1_water_mask",
step_id="step1", method_name="step1_generate_water_mask",
requires=["img_path"], produces=["water_mask_path"],
required_input_files=["img_path"],
output_file="{work_dir}/1_water_mask/water_mask.dat",
description="水域掩膜生成NDWI 或 SHP",
),
StepSpec(
step_id="step2", method_name="step2_glint_detection",
step_id="step2", method_name="step2_find_glint_area",
requires=["img_path", "water_mask_path"], produces=["glint_mask_path"],
required_input_files=["img_path", "water_mask_path"],
output_file="{work_dir}/2_glint/glint_mask.dat",
description="耀斑区域检测",
),
StepSpec(
step_id="step3", method_name="step3_deglint",
step_id="step3", method_name="step3_remove_glint",
requires=["img_path", "water_mask_path", "glint_mask_path"],
produces=["deglint_img_path"],
required_input_files=["img_path", "water_mask_path", "glint_mask_path"],
@ -91,14 +95,14 @@ PIPELINE_STEPS: List[StepSpec] = [
description="耀斑去除",
),
StepSpec(
step_id="step4", method_name="step4_data_preparation",
step_id="step4", method_name="step4_process_csv",
requires=["csv_path"], produces=["processed_csv_path"],
required_input_files=["csv_path"],
output_file="{work_dir}/4_processed_data/processed_data.csv",
description="CSV 异常值清洗",
),
StepSpec(
step_id="step5", method_name="step5_spectral_extraction",
step_id="step5", method_name="step5_extract_training_spectra",
requires=["deglint_img_path", "processed_csv_path", "csv_path", "boundary_path", "glint_mask_path"],
produces=["training_csv_path"],
parameter_map={
@ -193,7 +197,7 @@ class PipelineRunner:
用法:
ctx = PipelineContext(img_path=..., work_dir=..., user_config=config)
runner = PipelineRunner(pipeline_instance)
result_ctx = runner.run(ctx) # 预检通过后开始执行
result_ctx = runner.run(ctx, config=config) # 预检通过后开始执行
print(result_ctx.error_summary) # [(step_id, error_msg), ...]
"""
@ -205,14 +209,11 @@ class PipelineRunner:
# 主入口
# ------------------------------------------------------------------
def run(self, ctx: PipelineContext, skip_list: Optional[List[str]] = None) -> PipelineContext:
"""全流程入口:智能补全 → 预检(软警告)→ 执行。
def run(self, ctx: PipelineContext, config=None, skip_list: Optional[List[str]] = None) -> PipelineContext:
self.config = config or {}
skip_list = skip_list or []
logger.info("开始运行完整流程 (Runner 调度模式)...")
Args:
ctx: PipelineContext
skip_list: 用户在 PreflightDialog 中选择忽略的 step_id 列表。
命中项设置 status="user_skipped",打印醒目日志。
"""
ctx.pipeline_start_time = time.time()
error_summary: List[tuple[str, str]] = []
skip_set = set(skip_list) if skip_list else set()
@ -238,75 +239,148 @@ class PipelineRunner:
# 断点续跑预扫描ctx 已有产物则记录诊断日志
self._restore_outputs_from_ctx(ctx)
# ── ★ 依赖级联自动唤醒:在主循环开始前补齐所有前置缺口 ──
self._resolve_dependencies(ctx)
# 1. 暴力上下文注入:将 GUI config 中的所有参数强行塞入 ctx防丢失
for step_id, cfg in self.config.items():
if isinstance(cfg, dict):
for k, v in cfg.items():
if k != 'enabled' and v:
setattr(ctx, k, v)
for spec in self.steps:
# 2. 构建依赖提供者映射 (Provider Map)
provider_map = {}
for step in self.steps:
for prod in step.produces:
provider_map[prod] = step
# 3. 强力依赖级联唤醒 (Auto-Wakeup Engine)
changed = True
woke_up_steps = []
while changed:
changed = False
for step in self.steps:
if step.step_id in skip_set:
continue # 用户强踢的,绝不唤醒
step_cfg = self.config.setdefault(step.step_id, {})
if not step_cfg.get('enabled', True):
continue
for req in step.requires:
# 如果上下文缺这个参数
if not (hasattr(ctx, req) and getattr(ctx, req)):
provider = provider_map.get(req)
if provider and provider.step_id not in skip_set:
prov_cfg = self.config.setdefault(provider.step_id, {})
if not prov_cfg.get('enabled', True):
prov_cfg['enabled'] = True
changed = True
woke_up_steps.append(provider.step_id)
logger.info(f"[*] 自动唤醒: {provider.step_id} (为下游提供 {req})")
if woke_up_steps:
logger.info(f"★ 依赖唤醒完成,共唤醒 {len(woke_up_steps)} 个次/步骤")
# 4. 正式执行流水线
for step in self.steps:
# ── 软取消 ──
if ctx.is_cancelled():
ctx.append_log(f"[RUNNER] 收到取消信号,提前终止 @ {spec.step_id}")
ctx.append_log(f"[RUNNER] 收到取消信号,提前终止 @ {step.step_id}")
break
# ── disabled 跳过locked_steps 不受此约束)──
if not spec.enabled and spec.step_id not in ctx.locked_steps:
ctx.status[spec.step_id] = "skipped"
ctx.append_log(f"[RUNNER] {spec.step_id} 标记为 disabled跳过")
continue
# ── ★ 用户强制跳过PreflightDialog 勾选) ──
if spec.step_id in skip_set:
ctx.status[spec.step_id] = "user_skipped"
if step.step_id in skip_set:
ctx.status[step.step_id] = "user_skipped"
ctx.append_log(
f"\n{'='*60}\n"
f" ⚠ 用户强制跳过: {spec.step_id}{spec.description}\n"
f" ⚠ 用户强制跳过: {step.step_id}{step.description}\n"
f" 原因:用户在预检弹窗中勾选「忽略」,已确认跳过\n"
f"{'='*60}\n"
)
self._notify_step(spec.step_id, "skipped", "用户强制跳过(预检弹窗)")
self._notify_step(step.step_id, "skipped", "用户强制跳过(预检弹窗)")
continue
# ── 依赖缺失检查 ──
if spec.skip_when_missing:
missing = [k for k in spec.requires if not ctx.get(k)]
if missing:
# ── ★ 智能补全的步骤work_dir 有产物,但 requires 仍缺失(罕见),报 warning 不跳过
if spec.step_id in ctx.locked_steps:
ctx.append_log(
f"[RUNNER] ⚠ {spec.step_id} 已锁定但 requires 仍缺失 {missing}"
"尝试执行(可能因依赖前置步骤失败)"
)
else:
ctx.status[spec.step_id] = "skipped"
reason = f"缺少必要的上下文参数,自动跳过: {missing}"
ctx.append_log(f"[RUNNER] {spec.step_id} {reason}")
self._notify_step(spec.step_id, "skipped", reason)
continue
# ── ★ 断点续跑:产物文件已存在则跳过 ──
resolved_path = self._resolve_path(spec.output_file, ctx)
if resolved_path and os.path.exists(resolved_path):
ctx.status[spec.step_id] = "skipped"
reason = f"产物已存在,跳过: {resolved_path}"
ctx.append_log(f"[RUNNER] {spec.step_id} {reason}")
self._notify_step(spec.step_id, "skipped", reason)
self._restore_ctx_from_output(spec, resolved_path, ctx)
step_cfg = self.config.get(step.step_id, {})
if not step_cfg.get('enabled', True):
continue
# ── 执行(正常路径) ──
# 4.1 检查磁盘产物:如果已落盘,恢复上下文并跳过(拒绝静默跳过,必须打日志)
if step.output_file and os.path.exists(step.output_file):
for prod in step.produces:
if not (hasattr(ctx, prod) and getattr(ctx, prod)):
setattr(ctx, prod, step.output_file)
ctx.status[step.step_id] = "skipped"
ctx.append_log(f"[CACHE] 产物已存在,跳过运行并恢复上下文: {step.step_id}")
self._notify_step(step.step_id, "skipped", "产物已存在(断点续跑)")
continue
# 4.2 依赖死线检查
missing = [req for req in step.requires if not (hasattr(ctx, req) and getattr(ctx, req))]
if missing:
ctx.status[step.step_id] = "skipped"
reason = f"缺少必要的上下文参数,自动跳过: {missing}"
ctx.append_log(f"[RUNNER] 跳过 {step.step_id},仍缺少必要参数: {missing}")
self._notify_step(step.step_id, "skipped", reason)
continue
# 4.3 真正执行
ctx.append_log(f"[START] 正在执行步骤: {step.step_id}")
self._notify_step(step.step_id, "running", f"正在执行: {step.description}")
try:
self._invoke(spec, ctx)
method = getattr(self.pipeline, step.method_name)
sig = inspect.signature(method)
kwargs = {}
current_step_cfg = self.config.get(step.step_id, {})
for param_name in sig.parameters:
# 优先级 1直接使用当前步骤专属配置中的值
if param_name in current_step_cfg:
kwargs[param_name] = current_step_cfg[param_name]
continue
# 优先级 1.5:【核心修复】硬隔离 output_file防止被其他步骤的同名变量污染
if param_name == 'output_file' and hasattr(step, 'output_file') and step.output_file:
work_dir = getattr(ctx, 'work_dir', '')
kwargs[param_name] = step.output_file.format(work_dir=work_dir)
continue
# 优先级 2处理跨步骤的映射逻辑
ctx_key = param_name
if hasattr(step, 'parameter_map') and step.parameter_map:
for k, v in step.parameter_map.items():
if v == param_name:
ctx_key = k
break
# 优先级 3从全局大背包 ctx 中取(排在最后)
if hasattr(ctx, ctx_key):
kwargs[param_name] = getattr(ctx, ctx_key)
# 使用解包后的关键字参数调用底层函数
result = method(**kwargs)
# 【产物接力 1】如果底层函数返回了字典直接合并到上下文
if isinstance(result, dict):
for k, v in result.items():
setattr(ctx, k, v)
# 【产物接力 2】强制通过 StepSpec 的 output_file 模板注入
if hasattr(step, 'output_file') and step.output_file:
work_dir = getattr(ctx, 'work_dir', '')
actual_out_path = step.output_file.format(work_dir=work_dir)
for prod in step.produces:
if not hasattr(ctx, prod) or not getattr(ctx, prod):
setattr(ctx, prod, actual_out_path)
logger.info(f"[产物接力] 登记 {prod} = {actual_out_path}")
except PipelineHalt:
# ★ PipelineHalt 不走 error_summary触发立即 break
ctx.append_log(f"[RUNNER] PipelineHalt 硬终止 @ {spec.step_id}")
self._notify_step(spec.step_id, "error", "预检失败,硬终止")
ctx.status[step.step_id] = "error"
ctx.append_log(f"[RUNNER] PipelineHalt 硬终止 @ {step.step_id}")
self._notify_step(step.step_id, "error", "预检失败,硬终止")
break
except Exception as exc:
ctx.status[spec.step_id] = "error"
error_summary.append((spec.step_id, str(exc)))
ctx.last_error = f"{spec.step_id}: {exc!r}"
ctx.append_log(f"[RUNNER] {spec.step_id} 异常: {exc!r}")
self._notify_step(spec.step_id, "error", str(exc))
# ★ 任意 Exception 均立即 break不再执行后续步骤
except Exception as e:
ctx.status[step.step_id] = "error"
error_summary.append((step.step_id, str(e)))
ctx.last_error = f"{step.step_id}: {e!r}"
ctx.append_log(f"[ERROR] 步骤 {step.step_id} 执行崩溃: {str(e)}")
self._notify_step(step.step_id, "error", str(e))
break
ctx.pipeline_end_time = time.time()
@ -397,80 +471,6 @@ class PipelineRunner:
f"智能补全已自动开启 {len(newly_locked)} 个步骤:{newly_locked}"
)
# ------------------------------------------------------------------
# ★ 依赖级联自动唤醒引擎
# ------------------------------------------------------------------
def _resolve_dependencies(self, ctx: PipelineContext) -> None:
"""依赖追溯循环:遍历所有 enabled 步骤,强制唤醒缺失前置。
核心逻辑:
- 遍历当前 enabled=True 的步骤,检查其 requires
- 若所需 key 在 ctx 中不存在,则向上寻找 produces 该 key 的前置 Step
- 将该前置 Step 强制设为 enabled=True加入 locked_steps
- 递归执行,直到所有前置缺口都被强制补齐
- 已存在的产物文件自动回填 ctx
"""
# 构建 produces→step_id 反查表(仅关注 enabled 或潜在的前置步骤)
produce_to_step: Dict[str, StepSpec] = {}
for spec in self.steps:
for key in spec.produces:
produce_to_step[key] = spec
woke_up: List[str] = []
changed = True
while changed:
changed = False
for spec in self.steps:
if not spec.enabled:
continue
for required_key in spec.requires:
# ctx 已有值 → 无需追溯
if ctx.get(required_key):
continue
# 磁盘文件是否存在work_dir 产物已落盘但 ctx 未回填的情况)
resolved = self._resolve_output_for_key(required_key, ctx)
if resolved and os.path.exists(resolved):
ctx.set(required_key, resolved)
continue
# 缺少且无磁盘产物 → 追溯 produces 者
if required_key not in produce_to_step:
continue
provider = produce_to_step[required_key]
if provider.enabled:
continue # 已开启但尚未执行(会在主循环中处理)
# 强制唤醒
provider.enabled = True
if provider.step_id not in ctx.locked_steps:
ctx.locked_steps.append(provider.step_id)
woke_up.append(provider.step_id)
ctx.append_log(
f"[INFO] 因下游依赖需求,自动唤醒并执行步骤: {provider.step_id}"
)
# 递归:检查新开启步骤自身的前置是否也缺失
changed = True
if woke_up:
detail = "".join(woke_up)
ctx.append_log(
f"[RUNNER] ★ 依赖级联自动唤醒已完成,共开启 {len(woke_up)} 个步骤:{detail}"
)
self._notify_step(
"全流程", "info",
f"依赖级联自动唤醒 {len(woke_up)} 个步骤:{woke_up}"
)
# 扫描新开启步骤的 work_dir 产物,回填 ctx
for spec in self.steps:
if spec.step_id in woke_up:
self._scan_single_step_outputs(spec, ctx)
def _resolve_output_for_key(
self, produce_key: str, ctx: PipelineContext
) -> Optional[str]: