fix(PipelineRunner): 接力棒断链修复 + 依赖级联自动唤醒引擎

This commit is contained in:
DXC
2026-06-09 09:07:59 +08:00
parent d22414bf7d
commit 371e7a2745

View File

@ -5,18 +5,37 @@ PipelineRunner基于 StepSpec 声明式调度 14 个 step。
设计要点: 设计要点:
- StepSpec 声明 requiresctx 字段名列表)+ producesctx 字段名列表) - StepSpec 声明 requiresctx 字段名列表)+ producesctx 字段名列表)
- 命名约定ctx 字段名 == panel key 名 == step 形参名(全链路无翻译) - 命名约定ctx 字段名 == panel key 名 == step 形参名(全链路无翻译)
- 保留 spec.parameter_map 字段骨架供极少数特例覆盖(默认空 dict - 步骤命名step_id 格式为 stepN 或 stepN_suffix无小数位method_name 与 step_id 对齐
- 调度顺序:按 PIPELINE_STEPS 列表顺序requires 缺则 skip - 调度顺序:按 PIPELINE_STEPS 列表顺序requires 缺则 skip
- 软取消:在每个 step 前检查 ctx.is_cancelled() - 软取消:在每个 step 前检查 ctx.is_cancelled()
- 断点续跑spec.output_file 已落盘则跳过执行
- 错误汇总:全流程结束后 error_summary 记录所有 step 的异常
- 预检run() 入口硬校验 step1 img_path其余依赖通过智能补全 + 软警告处理
- PipelineHalt外层 run() 不 catch触发循环 break实现硬终止
- STEP_MAP旧 step_id → 新 step_id 双向映射,供 GUI 配置兼容使用
- duck-typed pipelinerunner 只调 getattr(pipeline, method_name),不强依赖类层级 - duck-typed pipelinerunner 只调 getattr(pipeline, method_name),不强依赖类层级
""" """
from __future__ import annotations from __future__ import annotations
import os
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Sequence from typing import Any, Dict, List, Optional, Sequence
from .context import PipelineContext from .context import PipelineContext, STEP_MAP_OLD_TO_NEW, STEP_MAP_NEW_TO_OLD, resolve_step_id
# ============================================================
# 终止异常(外层 run() 不 catch触发循环 break
# ============================================================
class PipelineHalt(Exception):
"""不可恢复的错误,在 run() 循环中抛出后直接 break不走 Exception 处理分支。
适用场景:
- GUI 层通过 _notify 弹窗拦截后主动抛出的硬终止信号
"""
pass
# ============================================================ # ============================================================
@ -28,108 +47,137 @@ class StepSpec:
"""单个 step 的元信息(声明式,避免硬编码)""" """单个 step 的元信息(声明式,避免硬编码)"""
step_id: str step_id: str
method_name: str method_name: str
requires: List[str] # PipelineContext 字段名列表 requires: List[str] # PipelineContext 字段名列表
produces: List[str] = field(default_factory=list) # 写入 ctx 的字段名列表 produces: List[str] = field(default_factory=list) # 写入 ctx 的字段名列表
enabled: bool = True enabled: bool = True
parameter_map: Dict[str, str] = field(default_factory=dict) parameter_map: Dict[str, str] = field(default_factory=dict)
# 当 requires 中任一字段为 None 时是否跳过;默认 True缺输入就 skip # 当 requires 中任一字段为 None 时是否跳过;默认 True缺输入就 skip
skip_when_missing: bool = True skip_when_missing: bool = True
# 备注(仅用于文档生成 / 调试输出) # 备注(仅用于文档生成 / 调试输出)
description: str = "" description: str = ""
# ★ 断点续跑:产物文件路径,支持 {work_dir} 占位符(运行时解析)
output_file: Optional[str] = None
# ★ 预检用:需要验证磁盘文件实际存在的 ctx key 列表
required_input_files: List[str] = field(default_factory=list)
# ============================================================ # ============================================================
# 14 个 step 的声明表(顺序即调度顺序) # 14 个 step 的声明表(顺序即调度顺序)
# 注:本表是"权威描述",与 WorkerThread.step_method_map / 旧 run_full_pipeline 保持一致 # step_id / method_name 均不含小数位,与前端显示对齐
# output_file / required_input_files 使用 {work_dir} 占位符,由 _resolve_path 展开
# ============================================================ # ============================================================
PIPELINE_STEPS: List[StepSpec] = [ PIPELINE_STEPS: List[StepSpec] = [
StepSpec( StepSpec(
step_id="step1", method_name="step1_generate_water_mask", step_id="step1", method_name="step1_water_mask",
requires=["img_path"], produces=["water_mask_path"], requires=["img_path"], produces=["water_mask_path"],
required_input_files=["img_path"],
output_file="{work_dir}/1_water_mask/water_mask.dat",
description="水域掩膜生成NDWI 或 SHP", description="水域掩膜生成NDWI 或 SHP",
), ),
StepSpec( StepSpec(
step_id="step2", method_name="step2_find_glint_area", step_id="step2", method_name="step2_glint_detection",
requires=["img_path", "water_mask_path"], produces=["glint_mask_path"], requires=["img_path", "water_mask_path"], produces=["glint_mask_path"],
required_input_files=["img_path", "water_mask_path"],
output_file="{work_dir}/2_glint/glint_mask.dat",
description="耀斑区域检测", description="耀斑区域检测",
), ),
StepSpec( StepSpec(
step_id="step3", method_name="step3_remove_glint", step_id="step3", method_name="step3_deglint",
requires=["img_path", "water_mask_path", "glint_mask_path"], requires=["img_path", "water_mask_path", "glint_mask_path"],
produces=["deglint_img_path"], produces=["deglint_img_path"],
required_input_files=["img_path", "water_mask_path", "glint_mask_path"],
output_file="{work_dir}/3_deglint/deglint.bsq",
description="耀斑去除", description="耀斑去除",
), ),
StepSpec( StepSpec(
step_id="step4", method_name="step4_process_csv", step_id="step4", method_name="step4_data_preparation",
requires=["csv_path"], produces=["processed_csv_path"], requires=["csv_path"], produces=["processed_csv_path"],
required_input_files=["csv_path"],
output_file="{work_dir}/4_processed_data/processed_data.csv",
description="CSV 异常值清洗", description="CSV 异常值清洗",
), ),
StepSpec( StepSpec(
step_id="step5", method_name="step5_extract_training_spectra", step_id="step5", method_name="step5_spectral_extraction",
requires=["deglint_img_path", "processed_csv_path", "csv_path", "boundary_path", "glint_mask_path"], requires=["deglint_img_path", "processed_csv_path", "csv_path", "boundary_path", "glint_mask_path"],
produces=["training_csv_path"], produces=["training_csv_path"],
# processed_csv_path(step4 产物) 才是 step5 真正需要的主路径,
# 通过 parameter_map 显式映射到形参 csv_path。
# raw csv_path 也保留在 requires 中以备 user_config 覆盖,
# 但用占位名 _raw_csv_ignored 注入,落到 step5 形参列表末尾的 **kwargs 兜底。
# 这样可以避免 L2 顺序注入中"后注入的 csv_path=None 覆盖前面的 processed_csv_path"的冲突。
parameter_map={ parameter_map={
"processed_csv_path": "csv_path", "processed_csv_path": "csv_path",
"csv_path": "_raw_csv_ignored", "csv_path": "_raw_csv_ignored",
}, },
skip_when_missing=False, skip_when_missing=False,
required_input_files=["deglint_img_path", "processed_csv_path", "boundary_path", "glint_mask_path"],
output_file="{work_dir}/5_training_spectra/training_spectra.csv",
description="实测样本点光谱提取", description="实测样本点光谱提取",
), ),
StepSpec( StepSpec(
step_id="step5_5", method_name="step5_5_calculate_water_quality_indices", step_id="step8", method_name="step8_water_quality_indices",
requires=["training_csv_path"], produces=["indices_path"], requires=["training_csv_path"], produces=["indices_path"],
required_input_files=["training_csv_path"],
output_file="{work_dir}/6_water_quality_indices/water_quality_indices.csv",
description="水质光谱指数计算optional", description="水质光谱指数计算optional",
), ),
StepSpec( StepSpec(
step_id="step6", method_name="step6_train_models", step_id="step7", method_name="step7_ml_modeling",
requires=["training_csv_path"], produces=["models_dir"], requires=["training_csv_path"], produces=["models_dir"],
required_input_files=["training_csv_path"],
output_file="{work_dir}/7_Supervised_Model_Training/best_models.pkl",
description="ML 建模GridSearchCV / AutoML", description="ML 建模GridSearchCV / AutoML",
), ),
StepSpec( StepSpec(
step_id="step6_5", method_name="step6_5_non_empirical_modeling", step_id="step8_non_empirical_modeling",
method_name="step8_non_empirical_modeling",
requires=["training_csv_path"], produces=["models_dir"], requires=["training_csv_path"], produces=["models_dir"],
parameter_map={"training_csv_path": "csv_path"}, parameter_map={"training_csv_path": "csv_path"},
required_input_files=["training_csv_path"],
output_file="{work_dir}/8_Regression_Modeling/non_empirical_models.pkl",
description="非经验统计回归", description="非经验统计回归",
), ),
StepSpec( StepSpec(
step_id="step6_75", method_name="step6_75_custom_regression", step_id="step9", method_name="step9_custom_regression",
requires=["indices_path"], produces=["models_dir"], requires=["indices_path"], produces=["models_dir"],
parameter_map={"indices_path": "csv_path"}, parameter_map={"indices_path": "csv_path"},
required_input_files=["indices_path"],
output_file="{work_dir}/9_Custom_Regression_Modeling/custom_regression_models.pkl",
description="自定义回归分析", description="自定义回归分析",
), ),
StepSpec( StepSpec(
step_id="step7", method_name="step7_generate_sampling_points", step_id="step10", method_name="step10_sampling",
requires=["deglint_img_path", "water_mask_path"], produces=["sampling_csv_path"], requires=["deglint_img_path", "water_mask_path"], produces=["sampling_csv_path"],
required_input_files=["deglint_img_path", "water_mask_path"],
output_file="{work_dir}/10_sampling/sampling_spectra.csv",
description="整景密集采样点生成 + 光谱提取", description="整景密集采样点生成 + 光谱提取",
), ),
StepSpec( StepSpec(
step_id="step8", method_name="step8_predict_water_quality", step_id="step11_ml", method_name="step11_ml_prediction",
requires=["sampling_csv_path", "models_dir"], produces=["prediction_csv_path"], requires=["sampling_csv_path", "models_dir"], produces=["prediction_csv_path"],
required_input_files=["sampling_csv_path", "models_dir"],
output_file="{work_dir}/11_12_13_predictions/prediction_results.csv",
description="ML 模型预测(采样点)", description="ML 模型预测(采样点)",
), ),
StepSpec( StepSpec(
step_id="step8_5", method_name="step8_5_predict_with_non_empirical_models", step_id="step11", method_name="step11_non_empirical_prediction",
requires=["sampling_csv_path", "models_dir"], produces=["prediction_dir"], requires=["sampling_csv_path", "models_dir"], produces=["prediction_dir"],
parameter_map={"models_dir": "non_empirical_models_dir"}, parameter_map={"models_dir": "non_empirical_models_dir"},
required_input_files=["sampling_csv_path", "models_dir"],
output_file="{work_dir}/11_12_13_predictions/non_empirical_predictions",
description="非经验模型预测", description="非经验模型预测",
), ),
StepSpec( StepSpec(
step_id="step8_75", method_name="step8_75_predict_with_custom_regression", step_id="step12", method_name="step12_custom_regression_prediction",
requires=["sampling_csv_path", "models_dir", "formula_csv_path"], requires=["sampling_csv_path", "models_dir", "formula_csv_path"],
produces=["prediction_dir"], produces=["prediction_dir"],
parameter_map={"models_dir": "custom_regression_dir"}, parameter_map={"models_dir": "custom_regression_dir"},
required_input_files=["sampling_csv_path", "models_dir", "formula_csv_path"],
output_file="{work_dir}/11_12_13_predictions/custom_regression_predictions",
description="自定义回归预测", description="自定义回归预测",
), ),
StepSpec( StepSpec(
step_id="step9", method_name="step9_generate_distribution_map", step_id="step14", method_name="step14_distribution_map",
requires=["prediction_csv_path", "boundary_shp_path"], requires=["prediction_csv_path", "boundary_shp_path"],
produces=["distribution_map_path"], produces=["distribution_map_path"],
required_input_files=["prediction_csv_path", "boundary_shp_path"],
output_file="{work_dir}/distribution_map.png",
description="克里金插值成图", description="克里金插值成图",
), ),
] ]
@ -140,47 +188,361 @@ PIPELINE_STEPS: List[StepSpec] = [
# ============================================================ # ============================================================
class PipelineRunner: class PipelineRunner:
"""按 StepSpec 调度 14 个 step 方法,支持软取消 + 路径 ctx 注入 """按 StepSpec 调度 14 个 step 方法,支持软取消 + 断点续跑 + 错误汇总
用法: 用法:
ctx = PipelineContext(img_path=..., work_dir=..., user_config=config)
runner = PipelineRunner(pipeline_instance) runner = PipelineRunner(pipeline_instance)
ctx = PipelineContext(img_path=..., ...) result_ctx = runner.run(ctx) # 预检通过后开始执行
result_ctx = runner.run(ctx) print(result_ctx.error_summary) # [(step_id, error_msg), ...]
""" """
def __init__(self, pipeline, steps: Optional[Sequence[StepSpec]] = None): def __init__(self, pipeline, steps: Optional[Sequence[StepSpec]] = None):
self.pipeline = pipeline self.pipeline = pipeline
self.steps: List[StepSpec] = list(steps) if steps else list(PIPELINE_STEPS) self.steps: List[StepSpec] = list(steps) if steps else list(PIPELINE_STEPS)
def run(self, ctx: PipelineContext) -> PipelineContext: # ------------------------------------------------------------------
"""主入口:按顺序执行 14 步。软取消时已完成的 step 保留结果。""" # 主入口
# ------------------------------------------------------------------
def run(self, ctx: PipelineContext, skip_list: Optional[List[str]] = None) -> PipelineContext:
"""全流程入口:智能补全 → 预检(软警告)→ 执行。
Args:
ctx: PipelineContext
skip_list: 用户在 PreflightDialog 中选择忽略的 step_id 列表。
命中项设置 status="user_skipped",打印醒目日志。
"""
ctx.pipeline_start_time = time.time() ctx.pipeline_start_time = time.time()
error_summary: List[tuple[str, str]] = []
skip_set = set(skip_list) if skip_list else set()
# ── ★ Step1 img_path 硬校验(缺失则立即终止整个流程) ──
if not ctx.get("img_path"):
msg = "【全流程预检失败】缺少参考影像路径 (img_path),流程无法启动。"
ctx.append_log(f"[RUNNER] {msg}")
self._notify_step("全流程", "error", msg)
ctx.last_error = msg
ctx.pipeline_end_time = time.time()
return ctx
# ── ★ 智能补全:扫描 work_dir 默认产物路径,回填 ctx ──
self._scan_workdir_outputs(ctx)
# ── ★ 自动补全缺失步骤work_dir 有产物则强制开启 + 回填路径 ──
self._auto_fill_missing_steps(ctx)
# ── 软预检警告(不再阻断,仅记录日志)──
self._preflight_warnings(ctx)
# 断点续跑预扫描ctx 已有产物则记录诊断日志
self._restore_outputs_from_ctx(ctx)
# ── ★ 依赖级联自动唤醒:在主循环开始前补齐所有前置缺口 ──
self._resolve_dependencies(ctx)
for spec in self.steps: for spec in self.steps:
# ── 软取消 ──
if ctx.is_cancelled(): if ctx.is_cancelled():
ctx.append_log(f"[RUNNER] 收到取消信号,提前终止 @ {spec.step_id}") ctx.append_log(f"[RUNNER] 收到取消信号,提前终止 @ {spec.step_id}")
break break
if not spec.enabled:
# ── disabled 跳过locked_steps 不受此约束)──
if not spec.enabled and spec.step_id not in ctx.locked_steps:
ctx.status[spec.step_id] = "skipped" ctx.status[spec.step_id] = "skipped"
ctx.append_log(f"[RUNNER] {spec.step_id} 标记为 disabled跳过") ctx.append_log(f"[RUNNER] {spec.step_id} 标记为 disabled跳过")
continue continue
# ── ★ 用户强制跳过PreflightDialog 勾选) ──
if spec.step_id in skip_set:
ctx.status[spec.step_id] = "user_skipped"
ctx.append_log(
f"\n{'='*60}\n"
f" ⚠ 用户强制跳过: {spec.step_id}{spec.description}\n"
f" 原因:用户在预检弹窗中勾选「忽略」,已确认跳过\n"
f"{'='*60}\n"
)
self._notify_step(spec.step_id, "skipped", "用户强制跳过(预检弹窗)")
continue
# ── 依赖缺失检查 ──
if spec.skip_when_missing: if spec.skip_when_missing:
missing = [k for k in spec.requires if not ctx.get(k)] missing = [k for k in spec.requires if not ctx.get(k)]
if missing: if missing:
ctx.status[spec.step_id] = "skipped" # ── ★ 智能补全的步骤work_dir 有产物,但 requires 仍缺失(罕见),报 warning 不跳过
reason = f"缺少必要的上下文参数,自动跳过: {missing}" if spec.step_id in ctx.locked_steps:
ctx.append_log(f"[RUNNER] {spec.step_id} {reason}") ctx.append_log(
if hasattr(self.pipeline, "_notify"): f"[RUNNER] ⚠ {spec.step_id} 已锁定但 requires 仍缺失 {missing}"
self.pipeline._notify(spec.description, "skipped", reason) "尝试执行(可能因依赖前置步骤失败)"
continue )
self._invoke(spec, ctx) else:
ctx.status[spec.step_id] = "skipped"
reason = f"缺少必要的上下文参数,自动跳过: {missing}"
ctx.append_log(f"[RUNNER] {spec.step_id} {reason}")
self._notify_step(spec.step_id, "skipped", reason)
continue
# ── ★ 断点续跑:产物文件已存在则跳过 ──
resolved_path = self._resolve_path(spec.output_file, ctx)
if resolved_path and os.path.exists(resolved_path):
ctx.status[spec.step_id] = "skipped"
reason = f"产物已存在,跳过: {resolved_path}"
ctx.append_log(f"[RUNNER] {spec.step_id} {reason}")
self._notify_step(spec.step_id, "skipped", reason)
self._restore_ctx_from_output(spec, resolved_path, ctx)
continue
# ── 执行(正常路径) ──
try:
self._invoke(spec, ctx)
except PipelineHalt:
# ★ PipelineHalt 不走 error_summary触发立即 break
ctx.append_log(f"[RUNNER] PipelineHalt 硬终止 @ {spec.step_id}")
self._notify_step(spec.step_id, "error", "预检失败,硬终止")
break
except Exception as exc:
ctx.status[spec.step_id] = "error"
error_summary.append((spec.step_id, str(exc)))
ctx.last_error = f"{spec.step_id}: {exc!r}"
ctx.append_log(f"[RUNNER] {spec.step_id} 异常: {exc!r}")
self._notify_step(spec.step_id, "error", str(exc))
# ★ 任意 Exception 均立即 break不再执行后续步骤
break
ctx.pipeline_end_time = time.time() ctx.pipeline_end_time = time.time()
ctx.error_summary = error_summary
return ctx return ctx
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# ★ 智能补全:工作目录产物扫描
# ------------------------------------------------------------------
def _scan_workdir_outputs(self, ctx: PipelineContext) -> None:
"""扫描 work_dir 下所有步骤的默认产物路径,若存在则回填 ctx。
利用 spec.output_file 的 {work_dir} 占位符,展开为实际绝对路径。
存在则写入对应的 ctx 字段produces供后续步骤直接使用。
已在 ctx 中有值的字段不会被覆盖。
"""
work_dir = ctx.get("work_dir") or ""
if not work_dir:
return
for spec in self.steps:
if not spec.produces:
continue
for produce_key in spec.produces:
if ctx.get(produce_key):
continue # 已有人工填写的值,不覆盖
resolved = self._resolve_path(spec.output_file, ctx)
if resolved and os.path.exists(resolved):
ctx.set(produce_key, resolved)
ctx.append_log(
f"[AUTO_FILL] 检测到已有产物,回填 {produce_key} = {resolved}"
)
# ------------------------------------------------------------------
# ★ 智能补全:强制开启被静默跳过的步骤
# ------------------------------------------------------------------
def _auto_fill_missing_steps(self, ctx: PipelineContext) -> None:
"""检查所有 disabled 步骤。
若某步骤的 output_file 已在 work_dir 落盘(断点续跑),
说明该步骤之前已完成但被用户在 GUI 中禁用了。
此时系统自动重开启该步骤forced=True并将其加入 locked_steps。
同时,将已落盘的产物路径回填到对应的 ctx 字段,
确保下游步骤能正常拿到输入。
阻断性缺失step1 img_path已在 run() 入口硬校验,此处不处理。
"""
newly_locked: List[str] = []
for spec in self.steps:
if spec.enabled:
continue # 用户主动开启的步骤不受影响
skip_set = getattr(ctx, '_skip_set', set())
if spec.step_id in skip_set:
continue # 用户在 PreflightDialog 中手动忽略的步骤不自动补全
resolved = self._resolve_path(spec.output_file, ctx)
if resolved and os.path.exists(resolved):
# ── 该步骤已有产物但被禁用 → 自动开启 ──
spec.enabled = True
ctx.locked_steps.append(spec.step_id)
newly_locked.append(spec.step_id)
# 回填所有产物字段到 ctx
for produce_key in spec.produces:
if not ctx.get(produce_key):
ctx.set(produce_key, resolved)
ctx.append_log(
f"[AUTO_FILL] 强制开启并回填 {spec.step_id} 产物 {produce_key} = {resolved}"
)
ctx.append_log(
f"\n{'='*60}\n"
f" ⚡ 智能补全:步骤 {spec.step_id}{spec.description}\n"
f" 原因:该步骤在 work_dir 中已有产物但被您在 GUI 中禁用了。\n"
f" 操作:系统已自动开启该步骤,产物路径已回填。\n"
f" 注意:运行期间该步骤已被锁定,您无法临时关闭。\n"
f"{'='*60}\n"
)
if newly_locked:
self._notify_step(
"全流程",
"info",
f"智能补全已自动开启 {len(newly_locked)} 个步骤:{newly_locked}"
)
# ------------------------------------------------------------------
# ★ 依赖级联自动唤醒引擎
# ------------------------------------------------------------------
def _resolve_dependencies(self, ctx: PipelineContext) -> None:
"""依赖追溯循环:遍历所有 enabled 步骤,强制唤醒缺失前置。
核心逻辑:
- 遍历当前 enabled=True 的步骤,检查其 requires
- 若所需 key 在 ctx 中不存在,则向上寻找 produces 该 key 的前置 Step
- 将该前置 Step 强制设为 enabled=True加入 locked_steps
- 递归执行,直到所有前置缺口都被强制补齐
- 已存在的产物文件自动回填 ctx
"""
# 构建 produces→step_id 反查表(仅关注 enabled 或潜在的前置步骤)
produce_to_step: Dict[str, StepSpec] = {}
for spec in self.steps:
for key in spec.produces:
produce_to_step[key] = spec
woke_up: List[str] = []
changed = True
while changed:
changed = False
for spec in self.steps:
if not spec.enabled:
continue
for required_key in spec.requires:
# ctx 已有值 → 无需追溯
if ctx.get(required_key):
continue
# 磁盘文件是否存在work_dir 产物已落盘但 ctx 未回填的情况)
resolved = self._resolve_output_for_key(required_key, ctx)
if resolved and os.path.exists(resolved):
ctx.set(required_key, resolved)
continue
# 缺少且无磁盘产物 → 追溯 produces 者
if required_key not in produce_to_step:
continue
provider = produce_to_step[required_key]
if provider.enabled:
continue # 已开启但尚未执行(会在主循环中处理)
# 强制唤醒
provider.enabled = True
if provider.step_id not in ctx.locked_steps:
ctx.locked_steps.append(provider.step_id)
woke_up.append(provider.step_id)
ctx.append_log(
f"[INFO] 因下游依赖需求,自动唤醒并执行步骤: {provider.step_id}"
)
# 递归:检查新开启步骤自身的前置是否也缺失
changed = True
if woke_up:
detail = "".join(woke_up)
ctx.append_log(
f"[RUNNER] ★ 依赖级联自动唤醒已完成,共开启 {len(woke_up)} 个步骤:{detail}"
)
self._notify_step(
"全流程", "info",
f"依赖级联自动唤醒 {len(woke_up)} 个步骤:{woke_up}"
)
# 扫描新开启步骤的 work_dir 产物,回填 ctx
for spec in self.steps:
if spec.step_id in woke_up:
self._scan_single_step_outputs(spec, ctx)
def _resolve_output_for_key(
self, produce_key: str, ctx: PipelineContext
) -> Optional[str]:
"""根据 produces key 查找对应步骤的 output_file 并展开路径。"""
for spec in self.steps:
if produce_key in spec.produces:
return self._resolve_path(spec.output_file, ctx)
return None
def _scan_single_step_outputs(
self, spec: StepSpec, ctx: PipelineContext
) -> None:
"""扫描单个步骤的 work_dir 产物,回填 ctx不覆盖已有值"""
if not spec.produces:
return
for produce_key in spec.produces:
if ctx.get(produce_key):
continue
resolved = self._resolve_path(spec.output_file, ctx)
if resolved and os.path.exists(resolved):
ctx.set(produce_key, resolved)
ctx.append_log(
f"[AUTO_FILL] 依赖唤醒后检测到产物,回填 {produce_key} = {resolved}"
)
# ------------------------------------------------------------------
# 软预检警告(不再阻断)
# ------------------------------------------------------------------
def _preflight_warnings(self, ctx: PipelineContext) -> None:
"""软预检警告:遍历所有步骤,检测可预见的运行时跳过。
所有缺失均以 warning 记录日志,不抛异常,不阻止执行。
GUI 层可通过回调函数 _notify_step 向用户展示警告列表。
"""
warnings: List[str] = []
for spec in self.steps:
if not spec.enabled:
continue
# ── Step4 csv_path 缺失警告 ──
if spec.step_id == "step4":
if not ctx.get("csv_path"):
warnings.append(
f"[{spec.step_id}] 缺少实测水质数据 (csv_path)"
"步骤 5-9 将被自动跳过"
)
# ── 磁盘文件缺失警告(已填充 ctx 但文件实际不存在)──
for ctx_key in spec.required_input_files:
value = ctx.get(ctx_key)
if not value:
continue
if not os.path.exists(value):
warnings.append(
f"[{spec.step_id}] 磁盘文件缺失(但 ctx 已回填): {ctx_key} = {value}"
)
if warnings:
detail = "\n".join(f" - {w}" for w in warnings)
ctx.append_log(
f"[RUNNER] 【软预检警告】(流程将继续执行,缺失项将被自动跳过)\n{detail}"
)
self._notify_step("全流程", "warning", f"预检警告:{len(warnings)}\n{detail}")
# ------------------------------------------------------------------
# 单步调用
# ------------------------------------------------------------------
def _invoke(self, spec: StepSpec, ctx: PipelineContext) -> None: def _invoke(self, spec: StepSpec, ctx: PipelineContext) -> None:
"""调一个 step 方法ctx 路径 → 形参;产出 → ctx 字段。""" """调一个 step 方法ctx 路径 → 形参;产出 → ctx 字段。"""
# DEBUG: 诊断"停在 step4"问题——每步打印 requires + ctx 实际数据
# 看到 requires=[] 但 actual=[None,...] 就说明 ctx 缺料step 会被 skip
ctx.append_log( ctx.append_log(
f"[DEBUG] Step {spec.step_id} requires: {spec.requires}, " f"[DEBUG] Step {spec.step_id} requires: {spec.requires}, "
f"actual ctx data: {[ctx.get(k) for k in spec.requires]}" f"actual ctx data: {[ctx.get(k) for k in spec.requires]}"
@ -191,17 +553,16 @@ class PipelineRunner:
ctx.status[spec.step_id] = "skipped" ctx.status[spec.step_id] = "skipped"
return return
# 1) 把 ctx 路径作为形参注入(默认约定:去 _path 后缀) # 1) 把 ctx 路径作为形参注入
kwargs: Dict[str, Any] = {} kwargs: Dict[str, Any] = {}
for ctx_key in spec.requires: for ctx_key in spec.requires:
param_name = spec.parameter_map.get(ctx_key, self._default_param_name(ctx_key)) param_name = spec.parameter_map.get(ctx_key, self._default_param_name(ctx_key))
kwargs[param_name] = ctx.get(ctx_key) kwargs[param_name] = ctx.get(ctx_key)
# 2) 允许用户在 ctx.user_config[step_id] 覆盖/补充 # 2) 允许用户在 ctx.user_config[step_id] 覆盖/补充(非空值才覆盖)
user_overrides = ctx.user_config.get(spec.step_id) or {} user_overrides = ctx.user_config.get(spec.step_id) or {}
if isinstance(user_overrides, dict): if isinstance(user_overrides, dict):
for k, v in user_overrides.items(): for k, v in user_overrides.items():
# ★ 关键防御:绝不用 GUI 的“空字符串”或 None 覆盖上游传来的有效路径
if v is not None and v != "": if v is not None and v != "":
kwargs[k] = v kwargs[k] = v
@ -210,51 +571,27 @@ class PipelineRunner:
f"[RUNNER] -> {spec.method_name}({list(kwargs.keys())})" f"[RUNNER] -> {spec.method_name}({list(kwargs.keys())})"
) )
ctx.status[spec.step_id] = "start" ctx.status[spec.step_id] = "start"
notify = getattr(self.pipeline, "_notify", None) self._notify_step(spec.step_id, "start", spec.method_name)
if callable(notify):
try:
notify(f"步骤{spec.step_id[-1]}", "start", spec.method_name)
except Exception:
pass
# 4) 执行 + 捕获异常(不让单步崩溃拖垮 runner # 4) 执行(外层 run() 统一捕获异常
t0 = time.time() t0 = time.time()
try: result = method(**kwargs)
result = method(**kwargs) ctx.status[spec.step_id] = "completed"
ctx.status[spec.step_id] = "completed" ctx.step_timings[spec.step_id] = time.time() - t0
ctx.step_timings[spec.step_id] = time.time() - t0
# 5) 产出收割 # 5) 产出收割
self._harvest(spec, result, ctx) self._harvest(spec, result, ctx)
self._notify_step(
if callable(notify): spec.step_id, "completed",
try: str(result)[:200] if result is not None else "",
notify( )
f"步骤{spec.step_id[-1]}",
"completed",
str(result)[:200] if result is not None else "",
)
except Exception:
pass
except Exception as exc:
ctx.status[spec.step_id] = "error"
ctx.last_error = f"{spec.step_id}: {exc!r}"
ctx.append_log(f"[RUNNER] {spec.step_id} 异常: {exc!r}")
if callable(notify):
try:
notify(f"步骤{spec.step_id[-1]}", "error", str(exc))
except Exception:
pass
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _harvest(self, spec: StepSpec, result: Any, ctx: PipelineContext) -> None: # 产出收割
"""把 step 方法返回值灌入 ctx 的 produces 字段。 # ------------------------------------------------------------------
规则: def _harvest(self, spec: StepSpec, result: Any, ctx: PipelineContext) -> None:
- 若 result 是 dict 且 key 匹配 produce_keyctx.set(produce_key, result[key]) """把 step 方法返回值灌入 ctx 的 produces 字段。"""
- 若 result 非 dict 且 produces 非空:第一个 produces 字段接 result
- 若 produces 为空result 仅记录到 log不写 ctx
"""
if not spec.produces: if not spec.produces:
return return
if isinstance(result, dict): if isinstance(result, dict):
@ -265,10 +602,59 @@ class PipelineRunner:
ctx.set(spec.produces[0], result) ctx.set(spec.produces[0], result)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# 断点续跑辅助
# ------------------------------------------------------------------
def _resolve_path(
self, template: Optional[str], ctx: PipelineContext
) -> Optional[str]:
"""解析模板中的 {work_dir} 占位符,返回展开后的绝对路径或 None。"""
if not template:
return None
work_dir = ctx.get("work_dir") or ""
try:
return template.format(work_dir=work_dir)
except (KeyError, ValueError):
return template
def _restore_outputs_from_ctx(self, ctx: PipelineContext) -> None:
"""诊断日志:记录 ctx 中已有的非 None 产物。"""
for spec in self.steps:
if not (spec.enabled and spec.produces):
continue
for key in spec.produces:
val = ctx.get(key)
if val:
ctx.append_log(
f"[RUNNER] 断点续跑检测: {spec.step_id} 已有 {key} = {val}"
)
def _restore_ctx_from_output(
self, spec: StepSpec, resolved_path: str, ctx: PipelineContext
) -> None:
"""断点跳过时:将已存在的 output_file 写回 ctx 所有 produces 字段,供下游使用。
接力棒断链修复:遍历 spec.produces 逐一注册,不遗漏任何下游可能依赖的 key。
"""
if not spec.produces:
return
for produce_key in spec.produces:
ctx.set(produce_key, resolved_path)
# ------------------------------------------------------------------
# 工具
# ------------------------------------------------------------------
@staticmethod @staticmethod
def _default_param_name(ctx_key: str) -> str: def _default_param_name(ctx_key: str) -> str:
""" """默认原样返回 ctx 键名作为形参名。特殊缩写由 parameter_map 显式处理。"""
废弃有毒的去 _path 后缀逻辑。
默认原样返回 ctx 键名作为形参名。遇到特殊缩写时,由各个 step 的 parameter_map 显式处理。
"""
return ctx_key return ctx_key
def _notify_step(self, step_id: str, status: str, message: str) -> None:
"""通过 pipeline.callback 通知 GUI 当前步骤状态。"""
notify = getattr(self.pipeline, "_notify", None)
if callable(notify):
try:
notify(step_id, status, message)
except Exception:
pass