From d0eb458392185350540064e3cad46e6ee57dfe12 Mon Sep 17 00:00:00 2001 From: DXC Date: Sat, 9 May 2026 17:30:49 +0800 Subject: [PATCH] =?UTF-8?q?refactor(step4):=20=E5=89=A5=E7=A6=BB=20Steps?= =?UTF-8?q?=20=E5=B1=82=20-=20step1~step3=20=E4=B8=9A=E5=8A=A1=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=B8=8B=E6=B2=89=E5=88=B0=E7=8B=AC=E7=AB=8B=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/steps/__init__.py | 12 + src/core/steps/glint_detection_step.py | 113 ++++ src/core/steps/glint_removal_step.py | 314 +++++++++ src/core/steps/water_mask_step.py | 148 +++++ .../water_quality_inversion_pipeline_GUI.py | 615 +++--------------- 5 files changed, 674 insertions(+), 528 deletions(-) create mode 100644 src/core/steps/__init__.py create mode 100644 src/core/steps/glint_detection_step.py create mode 100644 src/core/steps/glint_removal_step.py create mode 100644 src/core/steps/water_mask_step.py diff --git a/src/core/steps/__init__.py b/src/core/steps/__init__.py new file mode 100644 index 0000000..3d529d4 --- /dev/null +++ b/src/core/steps/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +"""业务步骤层模块""" + +from src.core.steps.water_mask_step import WaterMaskStep +from src.core.steps.glint_detection_step import GlintDetectionStep +from src.core.steps.glint_removal_step import GlintRemovalStep + +__all__ = [ + "WaterMaskStep", + "GlintDetectionStep", + "GlintRemovalStep", +] diff --git a/src/core/steps/glint_detection_step.py b/src/core/steps/glint_detection_step.py new file mode 100644 index 0000000..9bd85d6 --- /dev/null +++ b/src/core/steps/glint_detection_step.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +""" +步骤2: 耀斑区域检测 + +支持多种检测方法: otsu, zscore, percentile, iqr, adaptive, multi_band +""" + +import time +from pathlib import Path +from typing import Optional, List, Union + + +class GlintDetectionStep: + """耀斑区域检测步骤""" + + @staticmethod + def run( + img_path: str, + glint_wave: float = 750.0, + method: str = "otsu", + z_threshold: float = 2.5, + percentile: float = 95.0, + iqr_multiplier: float = 1.5, + window_size: int = 15, + multi_band_waves: Optional[List[float]] = None, + sub_method: str = "zscore", + weights: Optional[List[float]] = None, + max_area: Optional[int] = None, + buffer_size: Optional[int] = None, + water_mask_path: Optional[str] = None, + glint_dir: Union[str, Path] = "./2_glint", + callback: Optional[callable] = None, + ) -> str: + """ + 执行耀斑区域检测 + + Args: + img_path: 输入影像文件路径 + glint_wave: 用于耀斑检测的波段波长(nm) + method: 检测方法 ('otsu' | 'zscore' | 'percentile' | 'iqr' | 'adaptive' | 'multi_band') + z_threshold: Z-score 方法阈值(默认 2.5) + percentile: 百分位数阈值(默认 95.0) + iqr_multiplier: IQR 倍数(默认 1.5) + window_size: 自适应阈值窗口大小(默认 15) + multi_band_waves: 多波段方法的波长列表,如 [750, 800, 850] + sub_method: 多波段方法的子方法(默认 'zscore') + weights: 多波段方法的权重列表(None 表示等权重) + max_area: 最大连通域面积阈值(像素),超过则过滤 + buffer_size: 岸边缓冲区大小(像素),用于去除岸边附近错误掩膜 + water_mask_path: 水域掩膜文件路径(dat 格式优先) + glint_dir: 工作目录 + callback: 回调函数 + + Returns: + 耀斑掩膜文件路径 (.dat) + """ + from src.utils.find_severe_glint_area import find_severe_glint_area + + glint_dir = Path(glint_dir) + glint_dir.mkdir(parents=True, exist_ok=True) + + def notify(status, msg=""): + if callback: + callback("步骤2", status, msg) + + print("\n" + "=" * 80) + print("步骤2: 找到耀斑区域") + print("=" * 80) + + step_start_time = time.time() + + # 确定水体掩膜路径 + if water_mask_path is not None and Path(water_mask_path).exists(): + final_water_mask_path = water_mask_path + else: + final_water_mask_path = None + + output_path = str(glint_dir / "severe_glint_area.dat") + + # 跳过已存在的文件 + if Path(output_path).exists(): + print(f"检测到已存在的耀斑掩膜文件,直接使用: {output_path}") + notify("skipped", f"耀斑掩膜已设置: {output_path}") + return output_path + + # 构建检测参数字典 + kwargs = { + "method": method, + "z_threshold": z_threshold, + "percentile": percentile, + "iqr_multiplier": iqr_multiplier, + "window_size": window_size, + } + if method == "multi_band": + if multi_band_waves is not None: + kwargs["multi_band_waves"] = multi_band_waves + if sub_method is not None: + kwargs["sub_method"] = sub_method + if weights is not None: + kwargs["weights"] = weights + if max_area is not None: + kwargs["max_area"] = max_area + if buffer_size is not None: + kwargs["buffer_size"] = buffer_size + + glint_mask_path = find_severe_glint_area( + img_path, final_water_mask_path, glint_wave, output_path, **kwargs + ) + + print(f"耀斑掩膜已生成: {glint_mask_path}") + print(f"使用检测方法: {method}") + notify("completed", f"耀斑掩膜已生成: {glint_mask_path}") + return glint_mask_path diff --git a/src/core/steps/glint_removal_step.py b/src/core/steps/glint_removal_step.py new file mode 100644 index 0000000..a61fc4d --- /dev/null +++ b/src/core/steps/glint_removal_step.py @@ -0,0 +1,314 @@ +# -*- coding: utf-8 -*- +""" +步骤3: 去除耀斑 + +支持多种方法: subtract_nir, regression_slope, oxygen_absorption, kutser, goodman, hedley, sugar + +每种方法都会: +1. 准备水域掩膜(支持 shp 自动转 dat) +2. 调用对应的算法类执行处理 +3. 复制 hdr 文件到输出影像 +""" + +import os +import time +from pathlib import Path +from typing import Optional, List, Union, Callable + +import numpy as np + + +class GlintRemovalStep: + """去除耀斑步骤""" + + @staticmethod + def run( + img_path: str, + method: str = "subtract_nir", + start_wave: Optional[float] = None, + end_wave: Optional[float] = None, + json_path: Optional[str] = None, + left_shoulder_wave: Optional[float] = None, + valley_wave: Optional[float] = None, + right_shoulder_wave: Optional[float] = None, + water_mask: Optional[Union[str, np.ndarray]] = None, + interpolated_img_path: Optional[str] = None, + interpolate_zeros: bool = False, + interpolation_method: str = "nearest", + enabled: bool = True, + # Kutser 参数 + kutser_shp_path: Optional[str] = None, + oxy_band: int = 38, + lower_oxy: int = 36, + upper_oxy: int = 49, + nir_band: int = 47, + # Goodman 参数 + nir_lower: int = 25, + nir_upper: int = 37, + goodman_A: float = 0.000019, + goodman_B: float = 0.1, + # Hedley 参数 + hedley_shp_path: Optional[str] = None, + hedley_nir_band: int = 47, + # SUGAR 参数 + sugar_bounds: Optional[List[tuple]] = None, + sugar_sigma: float = 1.0, + sugar_estimate_background: bool = True, + sugar_glint_mask_method: str = "cdf", + sugar_iter: Optional[int] = 3, + sugar_termination_thresh: float = 20.0, + # 内部工具函数 + _get_image_geo_info=None, + _load_image_as_array=None, + _save_bands_as_image=None, + _copy_hdr_info=None, + _prepare_water_mask_for_algorithm=None, + _interpolate_zero_pixels_batch=None, + deglint_dir: Union[str, Path] = "./3_deglint", + water_mask_dir: Union[str, Path] = "./1_water_mask", + callback: Optional[Callable] = None, + ) -> str: + """ + 执行去除耀斑处理 + + Args: + img_path: 输入影像文件路径 + method: 去耀斑方法 + ...(其余参数同主类 step3_remove_glint) + + Returns: + 去除耀斑后的影像文件路径 + """ + from src.core.glint_removal.Kutser import Kutser + from src.core.glint_removal.Goodman import Goodman + from src.core.glint_removal.Hedley import Hedley + from src.core.glint_removal.SUGAR import SUGAR, correction_iterative + from src.core.utils.gdal_helper import ( + get_image_geo_info as _default_get_geo, + load_image_as_array as _default_load, + save_bands_as_image as _default_save_bands, + copy_hdr_info as _default_copy_hdr, + ) + from src.core.utils.mask_converter import ( + prepare_water_mask_for_algorithm as _default_prepare, + ) + + # 使用提供的函数或默认函数 + if _get_image_geo_info is None: + _get_image_geo_info = _default_get_geo + if _load_image_as_array is None: + _load_image_as_array = _default_load + if _save_bands_as_image is None: + _save_bands_as_image = _default_save_bands + if _copy_hdr_info is None: + _copy_hdr_info = _default_copy_hdr + if _prepare_water_mask_for_algorithm is None: + _prepare_water_mask_for_algorithm = _default_prepare + + deglint_dir = Path(deglint_dir) + deglint_dir.mkdir(parents=True, exist_ok=True) + + def notify(status, msg=""): + if callback: + callback("步骤3", status, msg) + + print("\n" + "=" * 80) + print("步骤3: 去除耀斑") + print("=" * 80) + + step_start_time = time.time() + + # 方法名标准化 + raw_method = str(method).lower() + if "kutser" in raw_method: + method = "kutser" + elif "goodman" in raw_method: + method = "goodman" + elif "hedley" in raw_method: + method = "hedley" + elif "sugar" in raw_method: + method = "sugar" + + # 如果未启用,直接返回原始影像 + if not enabled: + print("已设置跳过去除耀斑(enabled=False),将直接使用原始影像。") + notify("skipped", "跳过去耀斑,使用原始影像") + return img_path + + # ---- 确定水域掩膜 ---- + final_water_mask = water_mask + if final_water_mask is not None and str(final_water_mask).lower().endswith(".shp"): + # shp 自动替换为 dat + dat_mask = str(Path(water_mask_dir) / "water_mask_from_shp.dat") + if Path(dat_mask).exists(): + print(f"检测到输入掩膜为 .shp,自动替换为栅格掩膜: {dat_mask}") + final_water_mask = dat_mask + + if final_water_mask is None: + dat_mask_default = str(Path(water_mask_dir) / "water_mask_from_shp.dat") + if Path(dat_mask_default).exists(): + final_water_mask = dat_mask_default + print(f"使用步骤1生成的水域掩膜: {final_water_mask}") + + # ---- 步骤3.1: 0值像素插值 ---- + if interpolate_zeros: + print("\n" + "-" * 80) + print("步骤3.1: 对0值像素进行插值") + print("-" * 80) + interp_start_time = time.time() + + if _interpolate_zero_pixels_batch is None: + from src.core.algorithms.interpolation.interpolator import ( + interpolate_zero_pixels_batch as _interp_batch, + ) + _interpolate_zero_pixels_batch = _interp_batch + + interp_result, _ = _interpolate_zero_pixels_batch( + img_path=img_path, + interpolation_method=interpolation_method, + output_path=None, + water_mask=final_water_mask, + deglint_dir=str(deglint_dir), + callback_progress=lambda msg: print(f" {msg}"), + ) + img_path = interp_result + interp_end_time = time.time() + print(f"插值完成,使用插值后的影像: {img_path}") + + # ---- 获取影像信息 ---- + geotransform, projection, width, height, n_bands = _get_image_geo_info(img_path) + print(f"影像尺寸: {width} x {height} x {n_bands}") + + mask_for_algorithm = _prepare_water_mask_for_algorithm( + final_water_mask, (height, width), geotransform, projection, img_path + ) + + # ==================== Kutser ==================== + if method == "kutser": + print(f"使用方法: Kutser (氧吸收波段={oxy_band}, NIR波段={nir_band})") + output_path = str(deglint_dir / "deglint_kutser.bsq") + + if Path(output_path).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") + notify("skipped", f"去耀斑影像已设置: {output_path}") + return output_path + + kutser = Kutser( + img_path, + shp_path=None, + oxy_band=oxy_band, + lower_oxy=lower_oxy, + upper_oxy=upper_oxy, + NIR_band=nir_band, + water_mask=mask_for_algorithm, + output_path=output_path, + ) + kutser.get_corrected_bands() + + if Path(output_path).exists(): + _copy_hdr_info(img_path, output_path) + notify("completed", f"去耀斑影像已生成: {output_path}") + return output_path + raise RuntimeError(f"Kutser算法未生成输出文件: {output_path}") + + # ==================== Goodman ==================== + elif method == "goodman": + print(f"使用方法: Goodman (NIR波段范围: {nir_lower}-{nir_upper})") + output_path = str(deglint_dir / "deglint_goodman.bsq") + + if Path(output_path).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") + notify("skipped", f"去耀斑影像已设置: {output_path}") + return output_path + + goodman = Goodman( + img_path, + NIR_lower=nir_lower, + NIR_upper=nir_upper, + A=goodman_A, + B=goodman_B, + water_mask=mask_for_algorithm, + output_path=output_path, + ) + corrected_bands = goodman.get_corrected_bands() + + if not Path(output_path).exists(): + _save_bands_as_image(corrected_bands, output_path, geotransform, projection) + _copy_hdr_info(img_path, output_path) + else: + _copy_hdr_info(img_path, output_path) + del corrected_bands + + notify("completed", f"去耀斑影像已生成: {output_path}") + return output_path + + # ==================== Hedley ==================== + elif method == "hedley": + print(f"使用方法: Hedley (NIR波段={hedley_nir_band})") + output_path = str(deglint_dir / "deglint_hedley.bsq") + + if Path(output_path).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") + notify("skipped", f"去耀斑影像已设置: {output_path}") + return output_path + + hedley = Hedley( + img_path, + shp_path=None, + NIR_band=hedley_nir_band, + water_mask=mask_for_algorithm, + output_path=output_path, + ) + hedley.get_corrected_bands() + + if Path(output_path).exists(): + _copy_hdr_info(img_path, output_path) + notify("completed", f"去耀斑影像已生成: {output_path}") + return output_path + raise RuntimeError(f"Hedley算法未生成输出文件: {output_path}") + + # ==================== SUGAR ==================== + elif method == "sugar": + # 方法名标准化 + glint_method_raw = str(sugar_glint_mask_method).lower() + if "cdf" in glint_method_raw or "累积" in glint_method_raw: + sugar_glint_mask_method_fixed = "cdf" + elif "otsu" in glint_method_raw or "大津" in glint_method_raw: + sugar_glint_mask_method_fixed = "otsu" + else: + sugar_glint_mask_method_fixed = "cdf" + + print( + f"使用方法: SUGAR (迭代次数={sugar_iter}, 掩膜方法={sugar_glint_mask_method_fixed})" + ) + output_path = str(deglint_dir / "deglint_sugar.bsq") + + if Path(output_path).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") + notify("skipped", f"去耀斑影像已设置: {output_path}") + return output_path + + if sugar_bounds is None: + sugar_bounds = [(1, 2)] + + correction_iterative( + img_path, + iter=sugar_iter, + bounds=sugar_bounds, + estimate_background=sugar_estimate_background, + glint_mask_method=sugar_glint_mask_method_fixed, + termination_thresh=sugar_termination_thresh, + water_mask=mask_for_algorithm, + output_path=output_path, + ) + + if Path(output_path).exists(): + _copy_hdr_info(img_path, output_path) + notify("completed", f"去耀斑影像已生成: {output_path}") + return output_path + raise RuntimeError(f"SUGAR算法未生成输出文件: {output_path}") + + else: + raise ValueError( + f"不支持的方法: {method}。支持的方法: kutser, goodman, hedley, sugar" + ) diff --git a/src/core/steps/water_mask_step.py b/src/core/steps/water_mask_step.py new file mode 100644 index 0000000..ae59e34 --- /dev/null +++ b/src/core/steps/water_mask_step.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +""" +步骤1: 水域掩膜生成 + +支持三种方式: +1. 基于 shp 文件栅格化 +2. 使用现有栅格格式掩膜文件 (.dat/.tif) +3. 基于 NDWI 从影像自动生成水体掩膜 +""" + +import os +import time +from pathlib import Path +from typing import Optional, List, Callable, Union +import numpy as np + + +class WaterMaskStep: + """水域掩膜生成步骤""" + + @staticmethod + def run( + mask_path: Optional[str] = None, + img_path: Optional[str] = None, + ndwi_threshold: float = 0.4, + use_ndwi: bool = False, + generate_png: bool = True, + output_path: Optional[str] = None, + water_mask_dir: Union[str, Path] = "./1_water_mask", + callback: Optional[Callable] = None, + ) -> str: + """ + 执行水域掩膜生成 + + Args: + mask_path: 水体掩膜文件路径,支持 .shp(需 img_path)或 .dat/.tif(直接使用) + img_path: 输入影像文件路径(当 mask_path 为 shp 或 use_ndwi=True 时必须提供) + ndwi_threshold: NDWI 阈值(use_ndwi=True 时使用) + use_ndwi: 是否使用 NDWI 方法从影像生成水体掩膜 + generate_png: 是否生成 PNG 预览图(默认 True) + output_path: 指定输出掩膜文件的保存路径(可选) + water_mask_dir: 工作目录 + callback: 回调函数,签名为 callback(step, status, message) + + Returns: + dat 格式的水域掩膜文件路径 + """ + from src.utils.extract_water_area import rasterize_shp, ndwi + from src.core.utils.preview_generator import ( + generate_image_preview, + generate_water_mask_overlay, + ) + + water_mask_dir = Path(water_mask_dir) + water_mask_dir.mkdir(parents=True, exist_ok=True) + + def notify(status, msg=""): + if callback: + callback("步骤1", status, msg) + + print("\n" + "=" * 80) + print("步骤1: 生成或设置水域mask") + print("=" * 80) + + step_start_time = time.time() + + # 生成影像预览图 + if generate_png and img_path is not None and Path(img_path).exists(): + preview_path = str(water_mask_dir / "hsi_preview.png") + generate_image_preview( + img_path=img_path, + output_path=preview_path, + title="影像预览: RGB波段(基于波长)" + ) + + # ---- NDWI 方法 ---- + if use_ndwi: + if img_path is None: + raise ValueError("当 use_ndwi=True 时,必须提供 img_path 参数") + if not Path(img_path).exists(): + raise ValueError(f"影像文件不存在: {img_path}") + + print(f"使用NDWI方法从影像生成水体掩膜,阈值={ndwi_threshold}...") + + ndwi_output_path = output_path or str(water_mask_dir / "water_mask_from_ndwi.dat") + os.makedirs(Path(ndwi_output_path).parent, exist_ok=True) + + if Path(ndwi_output_path).exists(): + print(f"检测到已存在的NDWI掩膜文件,直接使用: {ndwi_output_path}") + notify("skipped", f"水域掩膜已设置: {ndwi_output_path}") + return ndwi_output_path + + ndwi(img_path, ndwi_threshold, ndwi_output_path) + + if generate_png: + overlay_path = water_mask_dir / "water_mask_overlay.png" + generate_water_mask_overlay( + img_path=img_path, mask_path=ndwi_output_path, output_path=str(overlay_path) + ) + + notify("completed", f"NDWI水体掩膜已生成: {ndwi_output_path}") + return ndwi_output_path + + # ---- 必须提供 mask_path ---- + if mask_path is None: + raise ValueError("必须提供 mask_path 参数或设置 use_ndwi=True") + if not Path(mask_path).exists(): + raise ValueError(f"文件不存在: {mask_path}") + + file_ext = Path(mask_path).suffix.lower() + + # ---- SHP 栅格化 ---- + if file_ext == ".shp": + if img_path is None: + raise ValueError("当 mask_path 为 shp 格式时,必须提供 img_path 参数") + + print(f"检测到shp格式的水体掩膜,正在转换为dat格式...") + + shp_output_path = output_path or str(water_mask_dir / "water_mask_from_shp.dat") + os.makedirs(Path(shp_output_path).parent, exist_ok=True) + + if Path(shp_output_path).exists(): + print(f"检测到已存在的栅格化掩膜文件,直接使用: {shp_output_path}") + notify("skipped", f"水域掩膜已设置: {shp_output_path}") + if generate_png: + overlay_path = water_mask_dir / "water_mask_overlay.png" + if not overlay_path.exists(): + generate_water_mask_overlay(img_path, shp_output_path, str(overlay_path)) + return shp_output_path + + safe_mask_path = os.path.abspath(mask_path).replace("\\", "/") + rasterize_shp(safe_mask_path, shp_output_path, img_path) + + if generate_png: + overlay_path = water_mask_dir / "water_mask_overlay.png" + generate_water_mask_overlay(img_path, shp_output_path, str(overlay_path)) + + notify("completed", f"dat格式水域掩膜已生成: {shp_output_path}") + return shp_output_path + + # ---- 栅格格式直接使用 ---- + print(f"检测到栅格格式的水体掩膜,直接使用: {mask_path}") + if generate_png and img_path is not None and Path(img_path).exists(): + overlay_path = water_mask_dir / "water_mask_overlay.png" + generate_water_mask_overlay(img_path, mask_path, str(overlay_path)) + + notify("completed", f"水域掩膜已设置: {mask_path}") + return mask_path diff --git a/src/core/water_quality_inversion_pipeline_GUI.py b/src/core/water_quality_inversion_pipeline_GUI.py index abee6da..67c9d45 100644 --- a/src/core/water_quality_inversion_pipeline_GUI.py +++ b/src/core/water_quality_inversion_pipeline_GUI.py @@ -71,6 +71,10 @@ from src.core.utils.preview_generator import ( ) # 导入算法层模块 from src.core.algorithms.interpolation.interpolator import interpolate_zero_pixels_batch as _interpolate_zero_pixels_batch +# 导入业务步骤模块 +from src.core.steps.water_mask_step import WaterMaskStep +from src.core.steps.glint_detection_step import GlintDetectionStep +from src.core.steps.glint_removal_step import GlintRemovalStep # 导入新的耀斑去除算法 from src.core.glint_removal.Kutser import Kutser from src.core.glint_removal.Goodman import Goodman @@ -260,164 +264,29 @@ class WaterQualityInversionPipeline: skip_dependency_check: bool = False, generate_png: bool = True, output_path: Optional[str] = None) -> str: - """ - 步骤1: 生成或设置水域mask - - 支持三种方式生成水域掩膜: - 1. 基于shp文件栅格化 - 2. 使用现有的栅格格式掩膜文件 - 3. 基于NDWI从影像自动生成水体掩膜 - - 当提供img_path时,会自动生成PNG预览图,基于波长选择RGB波段: - - 红波段: ~650nm - - 绿波段: ~550nm - - 蓝波段: ~460nm - - Args: - mask_path: 水体掩膜文件路径,支持: - - shp格式文件(.shp):需要提供img_path用于栅格化 - - dat格式文件(.dat/.tif等栅格格式):直接使用,不需要img_path - - None:当use_ndwi=True时,从影像生成NDWI掩膜 - img_path: 输入影像文件路径(当mask_path为shp格式或use_ndwi=True时必须提供) - ndwi_threshold: NDWI阈值(当use_ndwi=True时使用) - use_ndwi: 是否使用NDWI方法从影像生成水体掩膜 - generate_png: 是否生成输入影像的PNG预览图(默认True) - output_path: 指定输出掩膜文件的保存路径(可选)。如果提供,掩膜将保存到此路径; - 如果为None,则使用默认路径(self.water_mask_dir) - - Returns: - dat格式的水域掩膜文件路径 - """ - print("\n" + "="*80) - print("步骤1: 生成或设置水域mask") - print("="*80) - + """步骤1: 生成或设置水域mask(Facade)""" step_start_time = time.time() try: - # 如果提供了img_path且开启生成PNG功能,生成影像预览图 - if generate_png and img_path is not None and Path(img_path).exists(): - self._generate_image_preview(img_path) - - if use_ndwi: - # 使用NDWI方法从影像生成水体掩膜 - if img_path is None: - raise ValueError("当use_ndwi=True时,必须提供img_path参数用于生成NDWI掩膜") - if not Path(img_path).exists(): - raise ValueError(f"影像文件不存在: {img_path}") - - print(f"使用NDWI方法从影像生成水体掩膜,阈值={ndwi_threshold}...") - - # 使用用户指定的输出路径,或使用默认路径 - if output_path: - ndwi_output_path = output_path - # 确保输出目录存在 - os.makedirs(Path(output_path).parent, exist_ok=True) - else: - ndwi_output_path = str(self.water_mask_dir / "water_mask_from_ndwi.dat") - - # 检查文件是否已存在,避免重复生成 - if Path(ndwi_output_path).exists(): - print(f"检测到已存在的NDWI掩膜文件,直接使用: {ndwi_output_path}") - self.water_mask_path = ndwi_output_path - step_end_time = time.time() - self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time, status="skipped") - print(f"水域掩膜已设置: {self.water_mask_path}") - - # 生成水域掩膜叠加图(如果不存在) - overlay_path = self.water_mask_dir / "water_mask_overlay.png" - if generate_png and img_path is not None and Path(img_path).exists() and not overlay_path.exists(): - self._generate_water_mask_overlay(img_path, self.water_mask_path) - - return self.water_mask_path - - # 执行NDWI水体提取 - from src.utils.extract_water_area import ndwi - ndwi(img_path, ndwi_threshold, ndwi_output_path) - self.water_mask_path = ndwi_output_path - step_end_time = time.time() - self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time) - print(f"已生成NDWI水体掩膜: {self.water_mask_path}") - - # 生成水域掩膜叠加图 - if generate_png: - self._generate_water_mask_overlay(img_path, self.water_mask_path) - - return self.water_mask_path - - elif mask_path is None: - raise ValueError("必须提供mask_path参数或设置use_ndwi=True") - - if not Path(mask_path).exists(): - raise ValueError(f"文件不存在: {mask_path}") - - # 检查文件扩展名,判断是shp文件还是dat文件 - file_ext = Path(mask_path).suffix.lower() - - if file_ext == '.shp': - # 如果是shp文件,需要栅格化为dat - if img_path is None: - raise ValueError("当mask_path为shp格式时,必须提供img_path参数用于栅格化") - - print(f"检测到shp格式的水体掩膜,正在转换为dat格式...") - - # 使用用户指定的输出路径,或使用默认路径 - if output_path: - shp_output_path = output_path - # 确保输出目录存在 - os.makedirs(Path(output_path).parent, exist_ok=True) - else: - shp_output_path = str(self.water_mask_dir / "water_mask_from_shp.dat") - - # 检查文件是否已存在,避免重复栅格化 - if Path(shp_output_path).exists(): - print(f"检测到已存在的栅格化掩膜文件,直接使用: {shp_output_path}") - self.water_mask_path = shp_output_path - step_end_time = time.time() - self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time, status="skipped") - print(f"水域掩膜已设置: {self.water_mask_path}") - - # 生成水域掩膜叠加图(如果不存在) - overlay_path = self.water_mask_dir / "water_mask_overlay.png" - if generate_png and img_path is not None and Path(img_path).exists() and not overlay_path.exists(): - self._generate_water_mask_overlay(img_path, self.water_mask_path) - - return self.water_mask_path - - # 执行栅格化 - from src.utils.extract_water_area import rasterize_shp - safe_mask_path = os.path.abspath(mask_path).replace('\\', '/') - rasterize_shp(safe_mask_path, shp_output_path, img_path) - self.water_mask_path = shp_output_path - step_end_time = time.time() - self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time) - print(f"已生成dat格式的水域掩膜: {self.water_mask_path}") - - # 生成水域掩膜叠加图 - if generate_png: - self._generate_water_mask_overlay(img_path, self.water_mask_path) - - return self.water_mask_path - - else: - # 如果是dat或其他栅格格式,直接使用 - print(f"检测到栅格格式的水体掩膜,直接使用: {mask_path}") - self.water_mask_path = mask_path - step_end_time = time.time() - self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time) - print(f"水域掩膜已设置: {self.water_mask_path} (dat格式)") - - # 生成水域掩膜叠加图 - if generate_png and img_path is not None and Path(img_path).exists(): - self._generate_water_mask_overlay(img_path, self.water_mask_path) - - return self.water_mask_path - + result = WaterMaskStep.run( + mask_path=mask_path, + img_path=img_path, + ndwi_threshold=ndwi_threshold, + use_ndwi=use_ndwi, + generate_png=generate_png, + output_path=output_path, + water_mask_dir=str(self.water_mask_dir), + callback=self._notify, + ) + self.water_mask_path = result + step_end_time = time.time() + self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time) + return result except Exception as e: step_end_time = time.time() - self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time, + self._record_step_time("步骤1: 生成水域mask", step_start_time, step_end_time, status="failed", error=str(e)) raise - + def _generate_image_preview(self, img_path: str, bands: Optional[List[int]] = None) -> str: """生成影像预览图,转发至工具模块""" output_path = str(self.water_mask_dir / f"hsi_preview.png") @@ -520,106 +389,36 @@ class WaterQualityInversionPipeline: buffer_size: Optional[int] = None, water_mask_path: Optional[str] = None, skip_dependency_check: bool = False) -> str: - """ - 步骤2: 找到耀斑区域 - - Args: - img_path: 输入影像文件路径 - glint_wave: 用于提取耀斑严重区域的波段波长(单波段方法使用) - method: 检测方法,可选: - - 'otsu': Otsu阈值分割(默认) - - 'zscore': Z-score统计方法 - - 'percentile': 百分位数阈值方法 - - 'iqr': IQR异常值检测 - - 'adaptive': 自适应阈值方法 - - 'multi_band': 多波段融合方法 - z_threshold: Z-score方法的阈值(默认2.5) - percentile: 百分位数阈值(默认95.0) - iqr_multiplier: IQR倍数(默认1.5) - window_size: 自适应阈值窗口大小(默认15) - multi_band_waves: 多波段方法的波长列表,如[750, 800, 850] - sub_method: 多波段方法的子方法('zscore', 'percentile', 'otsu'),默认'zscore' - weights: 多波段方法的权重列表,如果为None则使用等权重 - max_area: 最大连通域面积阈值(像素数),超过此面积的连通域将被过滤掉, - 用于去除岸边、浅水、水华等大面积区域(默认None,表示不过滤) - buffer_size: 岸边缓冲区大小(像素数),用于去除岸边附近的错误耀斑掩膜 - (默认None,表示不进行岸边缓冲区去除;设置为正整数时启用) - - Returns: - 耀斑掩膜文件路径 - """ - print("\n" + "="*80) - print("步骤2: 找到耀斑区域") - print("="*80) - + """步骤2: 找到耀斑区域(Facade)""" step_start_time = time.time() try: - # 使用dat格式的水体掩膜 - if water_mask_path is not None: - # 优先使用传入的参数 - final_water_mask_path = water_mask_path - elif self.water_mask_path is not None: - # 其次使用步骤1生成的水体掩膜 - final_water_mask_path = self.water_mask_path - else: - # 如果没有水体掩膜,根据skip_dependency_check决定行为 - if skip_dependency_check: - print("警告: 未找到水体掩膜,将对全图进行耀斑检测") - final_water_mask_path = None - else: - raise ValueError("请先执行步骤1: 生成水域mask,或提供water_mask_path参数,或设置skip_dependency_check=True") - - output_path = str(self.glint_dir / "severe_glint_area.dat") - - # 检查文件是否已存在 - if Path(output_path).exists(): - print(f"检测到已存在的耀斑掩膜文件,直接使用: {output_path}") - self.glint_mask_path = output_path - step_end_time = time.time() - self._record_step_time("步骤2: 找到耀斑区域", step_start_time, step_end_time, status="skipped") - print(f"耀斑掩膜已设置: {self.glint_mask_path}") - return self.glint_mask_path - - # 构建参数字典 - kwargs = { - 'method': method, - 'z_threshold': z_threshold, - 'percentile': percentile, - 'iqr_multiplier': iqr_multiplier, - 'window_size': window_size, - } - - # 如果是多波段方法,添加相关参数 - if method == 'multi_band': - if multi_band_waves is not None: - kwargs['multi_band_waves'] = multi_band_waves - if sub_method is not None: - kwargs['sub_method'] = sub_method - if weights is not None: - kwargs['weights'] = weights - - # 添加连通域面积过滤和岸边缓冲区参数 - if max_area is not None: - kwargs['max_area'] = max_area - if buffer_size is not None: - kwargs['buffer_size'] = buffer_size - - # 传递dat格式的水体掩膜文件路径 - self.glint_mask_path = find_severe_glint_area( - img_path, final_water_mask_path, glint_wave, output_path, **kwargs + result = GlintDetectionStep.run( + img_path=img_path, + glint_wave=glint_wave, + method=method, + z_threshold=z_threshold, + percentile=percentile, + iqr_multiplier=iqr_multiplier, + window_size=window_size, + multi_band_waves=multi_band_waves, + sub_method=sub_method, + weights=weights, + max_area=max_area, + buffer_size=buffer_size, + water_mask_path=water_mask_path, + glint_dir=str(self.glint_dir), + callback=self._notify, ) - + self.glint_mask_path = result step_end_time = time.time() self._record_step_time("步骤2: 找到耀斑区域", step_start_time, step_end_time) - print(f"耀斑掩膜已生成: {self.glint_mask_path}") - print(f"使用检测方法: {method}") - return self.glint_mask_path + return result except Exception as e: step_end_time = time.time() - self._record_step_time("步骤2: 找到耀斑区域", step_start_time, step_end_time, + self._record_step_time("步骤2: 找到耀斑区域", step_start_time, step_end_time, status="failed", error=str(e)) raise - + def _get_image_geo_info(self, img_path: str) -> tuple: """获取影像地理信息,转发至工具模块""" return _get_image_geo_info(img_path) @@ -708,28 +507,21 @@ class WaterQualityInversionPipeline: left_shoulder_wave: Optional[float] = None, valley_wave: Optional[float] = None, right_shoulder_wave: Optional[float] = None, - # 水域掩膜参数 water_mask: Optional[Union[str, np.ndarray]] = None, - # 0值像素插值参数 interpolate_zeros: bool = False, interpolation_method: str = 'nearest', - # 是否执行去除耀斑 enabled: bool = True, - # Kutser方法参数 kutser_shp_path: Optional[str] = None, oxy_band: int = 38, lower_oxy: int = 36, upper_oxy: int = 49, nir_band: int = 47, - # Goodman方法参数 nir_lower: int = 25, nir_upper: int = 37, goodman_A: float = 0.000019, goodman_B: float = 0.1, - # Hedley方法参数 hedley_shp_path: Optional[str] = None, hedley_nir_band: int = 47, - # SUGAR方法参数 sugar_bounds: Optional[List[tuple]] = None, sugar_sigma: float = 1.0, sugar_estimate_background: bool = True, @@ -737,292 +529,59 @@ class WaterQualityInversionPipeline: sugar_iter: Optional[int] = 3, sugar_termination_thresh: float = 20.0, skip_dependency_check: bool = False) -> str: - """ - 步骤3: 去除耀斑 - - Args: - img_path: 输入影像文件路径 - method: 去耀斑方法,支持: - - "subtract_nir": 减去NIR方法 - - "regression_slope": 回归斜率方法 - - "oxygen_absorption": 氧吸收谷方法 - - "kutser": Kutser方法(基于氧吸收特征) - - "goodman": Goodman方法 - - "hedley": Hedley方法(基于NIR相关性) - - "sugar": SUGAR方法(迭代去耀斑) - start_wave: 起始波长(subtract_nir和regression_slope方法需要) - end_wave: 结束波长(subtract_nir和regression_slope方法需要) - json_path: ROI JSON文件路径(regression_slope方法需要) - left_shoulder_wave: 左肩波长(oxygen_absorption方法需要) - valley_wave: 谷值波长(oxygen_absorption方法需要) - right_shoulder_wave: 右肩波长(oxygen_absorption方法需要) - water_mask: 水域掩膜,可以是: - - None: 自动使用步骤1生成的水域掩膜(如果存在) - - numpy数组: 直接使用数组作为掩膜 - - 文件路径: 栅格文件路径(.dat/.tif)或shapefile路径(.shp) - 如果为None且步骤1未生成掩膜,则处理全图 - interpolate_zeros: 是否对0值像素进行插值(默认False) - interpolation_method: 插值方法,支持: - - 'nearest': 邻近插值(最快) - - 'bilinear': 双线性插值 - - 'spline': 样条插值(RBF) - - 'kriging': 克里金插值(最慢但最准确) - # Kutser方法参数 - kutser_shp_path: 深水区域shp文件路径(可选,已废弃,请使用water_mask) - oxy_band: 氧吸收波段索引(默认38,对应760.6nm) - lower_oxy: 氧吸收下波段索引(默认36,对应742.39nm) - upper_oxy: 氧吸收上波段索引(默认49,对应860.48nm) - nir_band: NIR波段索引(默认47,对应842.36nm) - # Goodman方法参数 - nir_lower: NIR下波段索引(默认25,对应641.93nm) - nir_upper: NIR上波段索引(默认37,对应751.49nm) - goodman_A: Goodman参数A(默认0.000019) - goodman_B: Goodman参数B(默认0.1) - # Hedley方法参数 - hedley_shp_path: 深水区域shp文件路径(可选,已废弃,请使用water_mask) - hedley_nir_band: NIR波段索引(默认47,对应842.36nm) - # SUGAR方法参数 - sugar_bounds: 优化边界列表,如[(1,2)](默认None,使用[(1,2)]) - sugar_sigma: LoG平滑sigma(默认1.0) - sugar_estimate_background: 是否估计背景光谱(默认True) - sugar_glint_mask_method: 耀斑掩膜方法,"cdf"或"otsu"(默认"cdf") - sugar_iter: 迭代次数,None表示自动终止(默认3) - sugar_termination_thresh: 终止阈值(默认20.0) - - Returns: - 去除耀斑后的影像文件路径 - """ - print("\n" + "="*80) - print("步骤3: 去除耀斑") - print("="*80) - + """步骤3: 去除耀斑(Facade)""" step_start_time = time.time() try: - # 兼容中文和各种格式 - raw_method = str(method).lower() - if 'kutser' in raw_method: - method = 'kutser' - elif 'goodman' in raw_method: - method = 'goodman' - elif 'hedley' in raw_method: - method = 'hedley' - elif 'sugar' in raw_method: - method = 'sugar' - # 其余方法(subtract_nir, regression_slope, oxygen_absorption)保持原值 - - # 如果未启用,直接跳过处理并把原始影像路径作为后续流程输入 - if not enabled: - print("已设置跳过去除耀斑(enabled=False),将直接使用原始影像。") - self.deglint_img_path = img_path - step_end_time = time.time() - self._record_step_time("步骤3: 去除耀斑", step_start_time, step_end_time, status="skipped") - return self.deglint_img_path - - # 确定使用的水域掩膜 - final_water_mask = water_mask - - # 【新增智能转换逻辑】:如果 GUI 传入的是 .shp,且第一步已经生成了 .dat 掩膜,强制替换为栅格 .dat - if final_water_mask is not None and str(final_water_mask).lower().endswith('.shp'): - if self.water_mask_path is not None and Path(self.water_mask_path).exists(): - print(f"检测到输入掩膜为 .shp 矢量文件,自动替换为步骤1生成的栅格掩膜: {self.water_mask_path}") - final_water_mask = self.water_mask_path - - # 优先级处理 - if final_water_mask is None: - if self.water_mask_path is not None: - final_water_mask = self.water_mask_path - print(f"使用步骤1生成的水域掩膜: {final_water_mask}") - else: - print("未提供水域掩膜,将处理全图") - final_water_mask = None - - # 步骤3.1: 对0值像素进行插值(如果启用) - if interpolate_zeros: - print("\n" + "-"*80) - print("步骤3.1: 对0值像素进行插值") - print("-"*80) - interp_start_time = time.time() - try: - # 准备水域掩膜用于插值 - interp_water_mask = final_water_mask - if interp_water_mask is None and self.water_mask_path: - interp_water_mask = self.water_mask_path - - # 执行插值 - interpolated_img = self._interpolate_zero_pixels( - img_path=img_path, - interpolation_method=interpolation_method, - water_mask=interp_water_mask - ) - # 使用插值后的影像作为后续处理的输入 - img_path = interpolated_img - interp_end_time = time.time() - self._record_step_time("步骤3.1: 0值像素插值", interp_start_time, interp_end_time) - print(f"插值完成,使用插值后的影像: {img_path}") - except Exception as e: - print(f"警告: 0值像素插值失败: {e},将使用原始影像继续处理") - interp_end_time = time.time() - self._record_step_time("步骤3.1: 0值像素插值", interp_start_time, interp_end_time, - status="failed", error=str(e)) - - if method == "kutser": - print(f"使用方法: Kutser (氧吸收波段={oxy_band}, NIR波段={nir_band})") - - output_path = str(self.deglint_dir / "deglint_kutser.bsq") - bsq_path = output_path if output_path.endswith('.bsq') else output_path.replace('.dat', '.bsq').replace( - '.tif', '.bsq') - if Path(bsq_path).exists() or Path(output_path).exists(): - existing_path = bsq_path if Path(bsq_path).exists() else output_path - print(f"检测到已存在的去耀斑影像文件,直接使用: {existing_path}") - self.deglint_img_path = existing_path - self._record_step_time("步骤3: 去除耀斑", step_start_time, time.time(), status="skipped") - print(f"去耀斑影像已设置: {self.deglint_img_path}") - return self.deglint_img_path - - geotransform, projection, width, height, n_bands = self._get_image_geo_info(img_path) - print(f"影像尺寸: {width} x {height} x {n_bands}") - - mask_for_algorithm = self._prepare_water_mask_for_algorithm( - final_water_mask, (height, width), geotransform, projection, img_path - ) - - kutser = Kutser(img_path, shp_path=None, oxy_band=oxy_band, - lower_oxy=lower_oxy, upper_oxy=upper_oxy, - NIR_band=nir_band, water_mask=mask_for_algorithm, - output_path=output_path) - kutser.get_corrected_bands() - - if Path(bsq_path).exists() or Path(output_path).exists(): - self.deglint_img_path = bsq_path if Path(bsq_path).exists() else output_path - self._copy_hdr_info(img_path, self.deglint_img_path) - else: - raise RuntimeError(f"Kutser算法未生成输出文件: {bsq_path}") - - elif method == "goodman": - print(f"使用方法: Goodman (NIR波段范围: {nir_lower}-{nir_upper})") - output_path = str(self.deglint_dir / "deglint_goodman.bsq") - bsq_path = output_path if output_path.endswith('.bsq') else output_path.replace('.dat', '.bsq').replace( - '.tif', '.bsq') - if Path(bsq_path).exists() or Path(output_path).exists(): - existing_path = bsq_path if Path(bsq_path).exists() else output_path - print(f"检测到已存在的去耀斑影像文件,直接使用: {existing_path}") - self.deglint_img_path = existing_path - self._record_step_time("步骤3: 去除耀斑", step_start_time, time.time(), status="skipped") - return self.deglint_img_path - - geotransform, projection, width, height, n_bands = self._get_image_geo_info(img_path) - mask_for_algorithm = self._prepare_water_mask_for_algorithm( - final_water_mask, (height, width), geotransform, projection, img_path - ) - - image_array, geotransform, projection = self._load_image_as_array(img_path) - goodman = Goodman(img_path, NIR_lower=nir_lower, NIR_upper=nir_upper, - A=goodman_A, B=goodman_B, water_mask=mask_for_algorithm, - output_path=output_path) - corrected_bands = goodman.get_corrected_bands() - - if not Path(bsq_path).exists() and not Path(output_path).exists(): - self._save_bands_as_image(corrected_bands, output_path, geotransform, projection) - self.deglint_img_path = output_path - self._copy_hdr_info(img_path, output_path) - else: - self.deglint_img_path = bsq_path if Path(bsq_path).exists() else output_path - self._copy_hdr_info(img_path, self.deglint_img_path) - del corrected_bands - - elif method == "hedley": - print(f"使用方法: Hedley (NIR波段={hedley_nir_band})") - output_path = str(self.deglint_dir / "deglint_hedley.bsq") - bsq_path = output_path if output_path.endswith('.bsq') else output_path.replace('.dat', '.bsq').replace( - '.tif', '.bsq') - if Path(bsq_path).exists() or Path(output_path).exists(): - existing_path = bsq_path if Path(bsq_path).exists() else output_path - print(f"检测到已存在的去耀斑影像文件,直接使用: {existing_path}") - self.deglint_img_path = existing_path - self._record_step_time("步骤3: 去除耀斑", step_start_time, time.time(), status="skipped") - return self.deglint_img_path - - geotransform, projection, width, height, n_bands = self._get_image_geo_info(img_path) - mask_for_algorithm = self._prepare_water_mask_for_algorithm( - final_water_mask, (height, width), geotransform, projection, img_path - ) - - hedley = Hedley(img_path, shp_path=None, NIR_band=hedley_nir_band, - water_mask=mask_for_algorithm, output_path=output_path) - hedley.get_corrected_bands() - - if Path(bsq_path).exists() or Path(output_path).exists(): - self.deglint_img_path = bsq_path if Path(bsq_path).exists() else output_path - self._copy_hdr_info(img_path, self.deglint_img_path) - else: - raise RuntimeError(f"Hedley算法未生成输出文件: {bsq_path}") - - elif method == "sugar": - sugar_glint_mask_method_raw = str(sugar_glint_mask_method).lower() - if 'cdf' in sugar_glint_mask_method_raw or '累积' in sugar_glint_mask_method: - sugar_glint_mask_method_fixed = 'cdf' - elif 'otsu' in sugar_glint_mask_method_raw or '大津' in sugar_glint_mask_method: - sugar_glint_mask_method_fixed = 'otsu' - else: - sugar_glint_mask_method_fixed = 'cdf' - - print(f"使用方法: SUGAR (迭代次数={sugar_iter}, 掩膜方法={sugar_glint_mask_method_fixed})") - output_path = str(self.deglint_dir / "deglint_sugar.bsq") - - if Path(output_path).exists(): - print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") - self.deglint_img_path = output_path - self._record_step_time("步骤3: 去除耀斑", step_start_time, time.time(), status="skipped") - return self.deglint_img_path - - geotransform, projection, width, height, n_bands = self._get_image_geo_info(img_path) - - # 修复BUG:必须传入 (height, width) - mask_for_algorithm = self._prepare_water_mask_for_algorithm( - final_water_mask, (height, width), geotransform, projection, img_path - ) - - if sugar_bounds is None: - sugar_bounds = [(1, 2)] - - if sugar_iter is None: - correction_iterative( - img_path, iter=None, bounds=sugar_bounds, - estimate_background=sugar_estimate_background, - glint_mask_method=sugar_glint_mask_method_fixed, - termination_thresh=sugar_termination_thresh, - water_mask=mask_for_algorithm, - output_path=output_path - ) - else: - correction_iterative( - img_path, iter=sugar_iter, bounds=sugar_bounds, - estimate_background=sugar_estimate_background, - glint_mask_method=sugar_glint_mask_method_fixed, - water_mask=mask_for_algorithm, - output_path=output_path - ) - - bsq_path = output_path if output_path.endswith('.bsq') else output_path.replace('.dat', '.bsq').replace( - '.tif', '.bsq') - if Path(bsq_path).exists() or Path(output_path).exists(): - self.deglint_img_path = bsq_path if Path(bsq_path).exists() else output_path - self._copy_hdr_info(img_path, self.deglint_img_path) - else: - raise RuntimeError(f"SUGAR算法未生成输出文件: {bsq_path}") - else: - raise ValueError(f"不支持的方法: {method}。支持的方法: kutser, goodman, hedley, sugar") - + result = GlintRemovalStep.run( + img_path=img_path, + method=method, + start_wave=start_wave, + end_wave=end_wave, + json_path=json_path, + left_shoulder_wave=left_shoulder_wave, + valley_wave=valley_wave, + right_shoulder_wave=right_shoulder_wave, + water_mask=water_mask, + interpolate_zeros=interpolate_zeros, + interpolation_method=interpolation_method, + enabled=enabled, + kutser_shp_path=kutser_shp_path, + oxy_band=oxy_band, + lower_oxy=lower_oxy, + upper_oxy=upper_oxy, + nir_band=nir_band, + nir_lower=nir_lower, + nir_upper=nir_upper, + goodman_A=goodman_A, + goodman_B=goodman_B, + hedley_shp_path=hedley_shp_path, + hedley_nir_band=hedley_nir_band, + sugar_bounds=sugar_bounds, + sugar_sigma=sugar_sigma, + sugar_estimate_background=sugar_estimate_background, + sugar_glint_mask_method=sugar_glint_mask_method, + sugar_iter=sugar_iter, + sugar_termination_thresh=sugar_termination_thresh, + _get_image_geo_info=self._get_image_geo_info, + _load_image_as_array=self._load_image_as_array, + _save_bands_as_image=self._save_bands_as_image, + _copy_hdr_info=self._copy_hdr_info, + _prepare_water_mask_for_algorithm=self._prepare_water_mask_for_algorithm, + _interpolate_zero_pixels_batch=_interpolate_zero_pixels_batch, + deglint_dir=str(self.deglint_dir), + water_mask_dir=str(self.water_mask_dir), + callback=self._notify, + ) + self.deglint_img_path = result step_end_time = time.time() self._record_step_time("步骤3: 去除耀斑", step_start_time, step_end_time) - print(f"去耀斑影像已生成: {self.deglint_img_path}") - return self.deglint_img_path + return result except Exception as e: step_end_time = time.time() self._record_step_time("步骤3: 去除耀斑", step_start_time, step_end_time, - status="failed", error=str(e)) + status="failed", error=str(e)) raise - + def step4_process_csv(self, csv_path: str, skip_dependency_check: bool = False) -> str: """ 步骤4: 对csv文件进行处理,筛选剔除异常值