fix(runner): 14 Facade kwargs 兜底 + 4 spec parameter_map 修正 + step6_75 路由切到 indices

- 14 个 stepX_... Facade 形参表末尾加 **kwargs,杜绝 Runner 注入未声明 key 时的 TypeError(典型:step3 收到 glint_mask_path)

- runner._invoke user_overrides 合并加 v is not None and v != '' 过滤,避免 GUI 面板空值覆盖 ctx 中已写入的有效路径

- PIPELINE_STEPS 加 4 个 parameter_map 修正 ctx 字段名→形参名错位:step6_5/6_75: training_csv_path→csv_path;step8_5: models_dir→non_empirical_models_dir;step8_75: models_dir→custom_regression_dir

- step6_75 路由从 training_csv_path 切到 indices_path(requires + parameter_map 同步);配合 skip_when_missing,未跑 step5_5 时自动 skip

- worker_thread.py: mode='full' 切到 PipelineRunner + PipelineContext 调度
This commit is contained in:
DXC
2026-06-04 09:15:04 +08:00
parent 343e316799
commit 64aa5b8f40
3 changed files with 45 additions and 20 deletions

View File

@ -84,11 +84,13 @@ PIPELINE_STEPS: List[StepSpec] = [
StepSpec(
step_id="step6_5", method_name="step6_5_non_empirical_modeling",
requires=["training_csv_path"], produces=["models_dir"],
parameter_map={"training_csv_path": "csv_path"},
description="非经验统计回归",
),
StepSpec(
step_id="step6_75", method_name="step6_75_custom_regression",
requires=["training_csv_path"], produces=["models_dir"],
requires=["indices_path"], produces=["models_dir"],
parameter_map={"indices_path": "csv_path"},
description="自定义回归分析",
),
StepSpec(
@ -104,12 +106,14 @@ PIPELINE_STEPS: List[StepSpec] = [
StepSpec(
step_id="step8_5", method_name="step8_5_predict_with_non_empirical_models",
requires=["sampling_csv_path", "models_dir"], produces=["prediction_dir"],
parameter_map={"models_dir": "non_empirical_models_dir"},
description="非经验模型预测",
),
StepSpec(
step_id="step8_75", method_name="step8_75_predict_with_custom_regression",
requires=["sampling_csv_path", "models_dir", "formula_csv_path"],
produces=["prediction_dir"],
parameter_map={"models_dir": "custom_regression_dir"},
description="自定义回归预测",
),
StepSpec(
@ -185,7 +189,10 @@ class PipelineRunner:
# 2) 允许用户在 ctx.user_config[step_id] 覆盖/补充
user_overrides = ctx.user_config.get(spec.step_id) or {}
if isinstance(user_overrides, dict):
kwargs.update(user_overrides)
for k, v in user_overrides.items():
# ★ 关键防御:绝不用 GUI 的“空字符串”或 None 覆盖上游传来的有效路径
if v is not None and v != "":
kwargs[k] = v
# 3) 状态置 start
ctx.append_log(

View File

@ -267,7 +267,7 @@ class WaterQualityInversionPipeline:
use_ndwi: bool = False,
skip_dependency_check: bool = False,
generate_png: bool = True,
output_path: Optional[str] = None) -> str:
output_path: Optional[str] = None, **kwargs) -> str:
"""步骤1: 生成或设置水域maskFacade"""
step_start_time = time.time()
try:
@ -392,7 +392,7 @@ class WaterQualityInversionPipeline:
max_area: Optional[int] = None,
buffer_size: Optional[int] = None,
water_mask_path: Optional[str] = None,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""步骤2: 找到耀斑区域Facade"""
step_start_time = time.time()
try:
@ -533,7 +533,7 @@ class WaterQualityInversionPipeline:
sugar_iter: Optional[int] = 3,
sugar_termination_thresh: float = 20.0,
output_path: Optional[str] = None,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""步骤3: 去除耀斑Facade"""
step_start_time = time.time()
try:
@ -588,7 +588,7 @@ class WaterQualityInversionPipeline:
status="failed", error=str(e))
raise
def step4_process_csv(self, csv_path: str, skip_dependency_check: bool = False) -> str:
def step4_process_csv(self, csv_path: str, skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤4: 对csv文件进行处理筛选剔除异常值
@ -615,7 +615,7 @@ class WaterQualityInversionPipeline:
csv_path: Optional[str] = None,
boundary_path: Optional[str] = None,
glint_mask_path: Optional[str] = None,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤5: 根据csv文件的采样点坐标在去除耀斑的文件中统计采样点的平均光谱
@ -666,7 +666,7 @@ class WaterQualityInversionPipeline:
formula_names: Optional[List[str]] = None,
output_file: Optional[str] = None,
enabled: bool = True,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤5.5: 根据训练光谱计算水质光谱指数
@ -710,7 +710,7 @@ class WaterQualityInversionPipeline:
split_methods: List[str] = None,
cv_folds: int = 5,
training_csv_path: Optional[str] = None,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤6: 使用采样点的平均光谱和对应的实测值建立机器学习模型,保存模型权重
@ -753,7 +753,7 @@ class WaterQualityInversionPipeline:
chunk_size: int = 1000,
water_mask_path: Optional[str] = None,
glint_mask_path: Optional[str] = None,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤7: 生成根据水域掩膜内且耀斑掩膜外的采样点,统计采样点的平均光谱
@ -795,7 +795,7 @@ class WaterQualityInversionPipeline:
models_dir: Optional[str] = None,
metric: str = 'test_r2',
prediction_column: str = 'prediction',
skip_dependency_check: bool = False) -> Dict[str, str]:
skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]:
"""
步骤8: 将训练好的最佳机器学习模型应用到采样点的平均光谱上,预测水质参数
@ -835,7 +835,7 @@ class WaterQualityInversionPipeline:
diffusion_n_neighbors: int = 15,
cmap: Optional[str] = None,
expand_ratio: float = 0.05,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤9: 根据采样点的坐标和反演的实测参数,以及水域掩膜,通过插值的方法,得到水质参数的可视化分布图
@ -1764,7 +1764,7 @@ class WaterQualityInversionPipeline:
window: int = 5,
output_dir: Optional[str] = None,
enabled: bool = True,
skip_dependency_check: bool = False) -> Dict[str, str]:
skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]:
"""
步骤6.5: 非经验统计回归模型训练
@ -1812,7 +1812,7 @@ class WaterQualityInversionPipeline:
methods: Union[str, List[str]] = 'all',
output_dir: Optional[str] = None,
enabled: bool = True,
skip_dependency_check: bool = False) -> str:
skip_dependency_check: bool = False, **kwargs) -> str:
"""
步骤6.75: 使用自定义回归方法分析指标与目标参数之间的关系
"""
@ -1991,7 +1991,7 @@ class WaterQualityInversionPipeline:
metric: str = 'Average Accuracy(%)',
prediction_column: str = 'prediction',
enabled: bool = True,
skip_dependency_check: bool = False) -> Dict[str, str]:
skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]:
"""
步骤8.5: 使用非经验统计回归模型进行参数预测
@ -2028,7 +2028,7 @@ class WaterQualityInversionPipeline:
output_dir: Optional[str] = None,
filename_prefix: str = "custom_regression_prediction",
enabled: bool = True,
skip_dependency_check: bool = False) -> Dict[str, str]:
skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]:
"""
步骤8.75: 使用自定义回归模型进行参数预测

View File

@ -247,16 +247,34 @@ class WorkerThread(QThread):
mpl_prev = None
try:
from src.core.water_quality_inversion_pipeline_GUI import WaterQualityInversionPipeline
from src.core.pipeline.runner import PipelineRunner
from src.core.pipeline.context import PipelineContext
self.pipeline = WaterQualityInversionPipeline(work_dir=self.work_dir)
if self.mode == 'full':
self.log_message.emit("开始运行完整流程...", "info")
self.step_count = 0
self.log_message.emit("开始运行完整流程 (Runner 调度模式)...", "info")
if hasattr(self.pipeline, 'set_callback'):
self.pipeline.set_callback(self.pipeline_callback)
self.pipeline.run_full_pipeline(self.config)
# 构造上下文 (Ctx),将 config 整体注入 user_config
ctx = PipelineContext(
img_path=self.config.get('step1', {}).get('img_path'),
water_mask_path=self.config.get('step1', {}).get('mask_path'),
csv_path=self.config.get('step4', {}).get('csv_path'),
boundary_path=self.config.get('step5', {}).get('boundary_path'),
boundary_shp_path=self.config.get('step9', {}).get('boundary_shp_path'),
formula_csv_path=self.config.get('step8_75', {}).get('formula_csv_path'),
work_dir=self.work_dir,
user_config=self.config
)
# 启动新调度器
runner = PipelineRunner(self.pipeline)
result_ctx = runner.run(ctx)
if result_ctx.last_error:
raise RuntimeError(f"流水线执行失败: {result_ctx.last_error}")
self.progress_update.emit(100, "流程执行完成")
self.finished.emit(True, "完整流程执行成功!")