diff --git a/src/core/pipeline/runner.py b/src/core/pipeline/runner.py index 0f2371e..4999b71 100644 --- a/src/core/pipeline/runner.py +++ b/src/core/pipeline/runner.py @@ -84,11 +84,13 @@ PIPELINE_STEPS: List[StepSpec] = [ StepSpec( step_id="step6_5", method_name="step6_5_non_empirical_modeling", requires=["training_csv_path"], produces=["models_dir"], + parameter_map={"training_csv_path": "csv_path"}, description="非经验统计回归", ), StepSpec( step_id="step6_75", method_name="step6_75_custom_regression", - requires=["training_csv_path"], produces=["models_dir"], + requires=["indices_path"], produces=["models_dir"], + parameter_map={"indices_path": "csv_path"}, description="自定义回归分析", ), StepSpec( @@ -104,12 +106,14 @@ PIPELINE_STEPS: List[StepSpec] = [ StepSpec( step_id="step8_5", method_name="step8_5_predict_with_non_empirical_models", requires=["sampling_csv_path", "models_dir"], produces=["prediction_dir"], + parameter_map={"models_dir": "non_empirical_models_dir"}, description="非经验模型预测", ), StepSpec( step_id="step8_75", method_name="step8_75_predict_with_custom_regression", requires=["sampling_csv_path", "models_dir", "formula_csv_path"], produces=["prediction_dir"], + parameter_map={"models_dir": "custom_regression_dir"}, description="自定义回归预测", ), StepSpec( @@ -185,7 +189,10 @@ class PipelineRunner: # 2) 允许用户在 ctx.user_config[step_id] 覆盖/补充 user_overrides = ctx.user_config.get(spec.step_id) or {} if isinstance(user_overrides, dict): - kwargs.update(user_overrides) + for k, v in user_overrides.items(): + # ★ 关键防御:绝不用 GUI 的“空字符串”或 None 覆盖上游传来的有效路径 + if v is not None and v != "": + kwargs[k] = v # 3) 状态置 start ctx.append_log( diff --git a/src/core/water_quality_inversion_pipeline_GUI.py b/src/core/water_quality_inversion_pipeline_GUI.py index 89a0758..307f6da 100644 --- a/src/core/water_quality_inversion_pipeline_GUI.py +++ b/src/core/water_quality_inversion_pipeline_GUI.py @@ -267,7 +267,7 @@ class WaterQualityInversionPipeline: use_ndwi: bool = False, skip_dependency_check: bool = False, generate_png: bool = True, - output_path: Optional[str] = None) -> str: + output_path: Optional[str] = None, **kwargs) -> str: """步骤1: 生成或设置水域mask(Facade)""" step_start_time = time.time() try: @@ -392,7 +392,7 @@ class WaterQualityInversionPipeline: max_area: Optional[int] = None, buffer_size: Optional[int] = None, water_mask_path: Optional[str] = None, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """步骤2: 找到耀斑区域(Facade)""" step_start_time = time.time() try: @@ -533,7 +533,7 @@ class WaterQualityInversionPipeline: sugar_iter: Optional[int] = 3, sugar_termination_thresh: float = 20.0, output_path: Optional[str] = None, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """步骤3: 去除耀斑(Facade)""" step_start_time = time.time() try: @@ -588,7 +588,7 @@ class WaterQualityInversionPipeline: status="failed", error=str(e)) raise - def step4_process_csv(self, csv_path: str, skip_dependency_check: bool = False) -> str: + def step4_process_csv(self, csv_path: str, skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤4: 对csv文件进行处理,筛选剔除异常值 @@ -615,7 +615,7 @@ class WaterQualityInversionPipeline: csv_path: Optional[str] = None, boundary_path: Optional[str] = None, glint_mask_path: Optional[str] = None, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤5: 根据csv文件的采样点坐标,在去除耀斑的文件中统计采样点的平均光谱 @@ -666,7 +666,7 @@ class WaterQualityInversionPipeline: formula_names: Optional[List[str]] = None, output_file: Optional[str] = None, enabled: bool = True, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤5.5: 根据训练光谱计算水质光谱指数 @@ -710,7 +710,7 @@ class WaterQualityInversionPipeline: split_methods: List[str] = None, cv_folds: int = 5, training_csv_path: Optional[str] = None, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤6: 使用采样点的平均光谱和对应的实测值建立机器学习模型,保存模型权重 @@ -753,7 +753,7 @@ class WaterQualityInversionPipeline: chunk_size: int = 1000, water_mask_path: Optional[str] = None, glint_mask_path: Optional[str] = None, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤7: 生成根据水域掩膜内且耀斑掩膜外的采样点,统计采样点的平均光谱 @@ -795,7 +795,7 @@ class WaterQualityInversionPipeline: models_dir: Optional[str] = None, metric: str = 'test_r2', prediction_column: str = 'prediction', - skip_dependency_check: bool = False) -> Dict[str, str]: + skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]: """ 步骤8: 将训练好的最佳机器学习模型应用到采样点的平均光谱上,预测水质参数 @@ -835,7 +835,7 @@ class WaterQualityInversionPipeline: diffusion_n_neighbors: int = 15, cmap: Optional[str] = None, expand_ratio: float = 0.05, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤9: 根据采样点的坐标和反演的实测参数,以及水域掩膜,通过插值的方法,得到水质参数的可视化分布图 @@ -1764,7 +1764,7 @@ class WaterQualityInversionPipeline: window: int = 5, output_dir: Optional[str] = None, enabled: bool = True, - skip_dependency_check: bool = False) -> Dict[str, str]: + skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]: """ 步骤6.5: 非经验统计回归模型训练 @@ -1812,7 +1812,7 @@ class WaterQualityInversionPipeline: methods: Union[str, List[str]] = 'all', output_dir: Optional[str] = None, enabled: bool = True, - skip_dependency_check: bool = False) -> str: + skip_dependency_check: bool = False, **kwargs) -> str: """ 步骤6.75: 使用自定义回归方法分析指标与目标参数之间的关系 """ @@ -1991,7 +1991,7 @@ class WaterQualityInversionPipeline: metric: str = 'Average Accuracy(%)', prediction_column: str = 'prediction', enabled: bool = True, - skip_dependency_check: bool = False) -> Dict[str, str]: + skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]: """ 步骤8.5: 使用非经验统计回归模型进行参数预测 @@ -2028,7 +2028,7 @@ class WaterQualityInversionPipeline: output_dir: Optional[str] = None, filename_prefix: str = "custom_regression_prediction", enabled: bool = True, - skip_dependency_check: bool = False) -> Dict[str, str]: + skip_dependency_check: bool = False, **kwargs) -> Dict[str, str]: """ 步骤8.75: 使用自定义回归模型进行参数预测 diff --git a/src/gui/core/worker_thread.py b/src/gui/core/worker_thread.py index fc7659b..85558d2 100644 --- a/src/gui/core/worker_thread.py +++ b/src/gui/core/worker_thread.py @@ -247,16 +247,34 @@ class WorkerThread(QThread): mpl_prev = None try: from src.core.water_quality_inversion_pipeline_GUI import WaterQualityInversionPipeline + from src.core.pipeline.runner import PipelineRunner + from src.core.pipeline.context import PipelineContext + self.pipeline = WaterQualityInversionPipeline(work_dir=self.work_dir) if self.mode == 'full': - self.log_message.emit("开始运行完整流程...", "info") - self.step_count = 0 - + self.log_message.emit("开始运行完整流程 (Runner 调度模式)...", "info") if hasattr(self.pipeline, 'set_callback'): self.pipeline.set_callback(self.pipeline_callback) - self.pipeline.run_full_pipeline(self.config) + # 构造上下文 (Ctx),将 config 整体注入 user_config + ctx = PipelineContext( + img_path=self.config.get('step1', {}).get('img_path'), + water_mask_path=self.config.get('step1', {}).get('mask_path'), + csv_path=self.config.get('step4', {}).get('csv_path'), + boundary_path=self.config.get('step5', {}).get('boundary_path'), + boundary_shp_path=self.config.get('step9', {}).get('boundary_shp_path'), + formula_csv_path=self.config.get('step8_75', {}).get('formula_csv_path'), + work_dir=self.work_dir, + user_config=self.config + ) + + # 启动新调度器 + runner = PipelineRunner(self.pipeline) + result_ctx = runner.run(ctx) + + if result_ctx.last_error: + raise RuntimeError(f"流水线执行失败: {result_ctx.last_error}") self.progress_update.emit(100, "流程执行完成") self.finished.emit(True, "完整流程执行成功!")