From 8c7c9959859d06756aef866d38909b92f063baed Mon Sep 17 00:00:00 2001 From: DXC Date: Sun, 10 May 2026 15:11:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=AD=A5=E9=AA=A43=E5=8E=BB?= =?UTF-8?q?=E8=80=80=E6=96=91=E8=B7=AF=E5=BE=84=E6=96=AD=E5=B1=82=20+=20UI?= =?UTF-8?q?=E9=BB=98=E8=AE=A4=E8=B7=AF=E5=BE=84=E6=A0=87=E5=87=86=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/steps/glint_removal_step.py | 154 ++++++++++++------ .../water_quality_inversion_pipeline_GUI.py | 2 + src/gui/panels/step3_panel.py | 4 +- 3 files changed, 111 insertions(+), 49 deletions(-) diff --git a/src/core/steps/glint_removal_step.py b/src/core/steps/glint_removal_step.py index a61fc4d..7e9cb49 100644 --- a/src/core/steps/glint_removal_step.py +++ b/src/core/steps/glint_removal_step.py @@ -18,6 +18,35 @@ from typing import Optional, List, Union, Callable import numpy as np +def _safe_rename(src_bsq: str, src_hdr: str, dest_bsq: str, dest_hdr: str) -> str: + """将底层硬编码生成的 .bsq + .hdr 文件对重命名到用户指定的 output_path + + 若 dest 路径已存在(可能是之前残留文件),先删除再移动,确保返回路径始终指向用户指定的文件。 + + Returns: + dest_bsq 路径 + """ + src_bsq_p = Path(src_bsq) + src_hdr_p = Path(src_hdr) + dest_bsq_p = Path(dest_bsq) + dest_hdr_p = Path(dest_hdr) + + if str(src_bsq_p.resolve()) == str(dest_bsq_p.resolve()): + return dest_bsq + + if dest_bsq_p.exists(): + os.remove(dest_bsq_p) + if dest_hdr_p.exists(): + os.remove(dest_hdr_p) + + if src_bsq_p.exists(): + os.replace(src_bsq_p, dest_bsq_p) + if src_hdr_p.exists(): + os.replace(src_hdr_p, dest_hdr_p) + + return dest_bsq + + class GlintRemovalStep: """去除耀斑步骤""" @@ -67,6 +96,7 @@ class GlintRemovalStep: deglint_dir: Union[str, Path] = "./3_deglint", water_mask_dir: Union[str, Path] = "./1_water_mask", callback: Optional[Callable] = None, + output_path: Optional[str] = None, ) -> str: """ 执行去除耀斑处理 @@ -186,12 +216,20 @@ class GlintRemovalStep: # ==================== Kutser ==================== if method == "kutser": print(f"使用方法: Kutser (氧吸收波段={oxy_band}, NIR波段={nir_band})") - output_path = str(deglint_dir / "deglint_kutser.bsq") + hardcoded_bsq = str(deglint_dir / "deglint_kutser.bsq") + hardcoded_hdr = hardcoded_bsq.replace(".bsq", ".hdr") + # 将用户指定的 output_path 标准化为 .bsq 路径 + if output_path: + final_bsq = output_path.replace('.dat', '.bsq').replace('.tif', '.bsq') + final_hdr = final_bsq.replace(".bsq", ".hdr") + else: + final_bsq = hardcoded_bsq + final_hdr = hardcoded_hdr - if Path(output_path).exists(): - print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") - notify("skipped", f"去耀斑影像已设置: {output_path}") - return output_path + if Path(hardcoded_bsq).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {hardcoded_bsq}") + notify("skipped", f"去耀斑影像已设置: {hardcoded_bsq}") + return _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) kutser = Kutser( img_path, @@ -201,25 +239,33 @@ class GlintRemovalStep: upper_oxy=upper_oxy, NIR_band=nir_band, water_mask=mask_for_algorithm, - output_path=output_path, + output_path=hardcoded_bsq, ) kutser.get_corrected_bands() - if Path(output_path).exists(): - _copy_hdr_info(img_path, output_path) - notify("completed", f"去耀斑影像已生成: {output_path}") - return output_path - raise RuntimeError(f"Kutser算法未生成输出文件: {output_path}") + if Path(hardcoded_bsq).exists(): + _copy_hdr_info(img_path, hardcoded_bsq) + final = _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) + notify("completed", f"去耀斑影像已生成: {final}") + return final + raise RuntimeError(f"Kutser算法未生成输出文件: {hardcoded_bsq}") # ==================== Goodman ==================== elif method == "goodman": print(f"使用方法: Goodman (NIR波段范围: {nir_lower}-{nir_upper})") - output_path = str(deglint_dir / "deglint_goodman.bsq") + hardcoded_bsq = str(deglint_dir / "deglint_goodman.bsq") + hardcoded_hdr = hardcoded_bsq.replace(".bsq", ".hdr") + if output_path: + final_bsq = output_path.replace('.dat', '.bsq').replace('.tif', '.bsq') + final_hdr = final_bsq.replace(".bsq", ".hdr") + else: + final_bsq = hardcoded_bsq + final_hdr = hardcoded_hdr - if Path(output_path).exists(): - print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") - notify("skipped", f"去耀斑影像已设置: {output_path}") - return output_path + if Path(hardcoded_bsq).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {hardcoded_bsq}") + notify("skipped", f"去耀斑影像已设置: {hardcoded_bsq}") + return _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) goodman = Goodman( img_path, @@ -228,48 +274,54 @@ class GlintRemovalStep: A=goodman_A, B=goodman_B, water_mask=mask_for_algorithm, - output_path=output_path, + output_path=hardcoded_bsq, ) corrected_bands = goodman.get_corrected_bands() - if not Path(output_path).exists(): - _save_bands_as_image(corrected_bands, output_path, geotransform, projection) - _copy_hdr_info(img_path, output_path) - else: - _copy_hdr_info(img_path, output_path) + if not Path(hardcoded_bsq).exists(): + _save_bands_as_image(corrected_bands, hardcoded_bsq, geotransform, projection) + _copy_hdr_info(img_path, hardcoded_bsq) del corrected_bands - notify("completed", f"去耀斑影像已生成: {output_path}") - return output_path + final = _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) + notify("completed", f"去耀斑影像已生成: {final}") + return final # ==================== Hedley ==================== elif method == "hedley": print(f"使用方法: Hedley (NIR波段={hedley_nir_band})") - output_path = str(deglint_dir / "deglint_hedley.bsq") + hardcoded_bsq = str(deglint_dir / "deglint_hedley.bsq") + hardcoded_hdr = hardcoded_bsq.replace(".bsq", ".hdr") + if output_path: + final_bsq = output_path.replace('.dat', '.bsq').replace('.tif', '.bsq') + final_hdr = final_bsq.replace(".bsq", ".hdr") + else: + final_bsq = hardcoded_bsq + final_hdr = hardcoded_hdr - if Path(output_path).exists(): - print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") - notify("skipped", f"去耀斑影像已设置: {output_path}") - return output_path + if Path(hardcoded_bsq).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {hardcoded_bsq}") + notify("skipped", f"去耀斑影像已设置: {hardcoded_bsq}") + return _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) hedley = Hedley( img_path, shp_path=None, NIR_band=hedley_nir_band, water_mask=mask_for_algorithm, - output_path=output_path, + output_path=hardcoded_bsq, ) hedley.get_corrected_bands() - if Path(output_path).exists(): - _copy_hdr_info(img_path, output_path) - notify("completed", f"去耀斑影像已生成: {output_path}") - return output_path - raise RuntimeError(f"Hedley算法未生成输出文件: {output_path}") + if Path(hardcoded_bsq).exists(): + _copy_hdr_info(img_path, hardcoded_bsq) + final = _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) + notify("completed", f"去耀斑影像已生成: {final}") + return final + raise RuntimeError(f"Hedley算法未生成输出文件: {hardcoded_bsq}") # ==================== SUGAR ==================== elif method == "sugar": - # 方法名标准化 glint_method_raw = str(sugar_glint_mask_method).lower() if "cdf" in glint_method_raw or "累积" in glint_method_raw: sugar_glint_mask_method_fixed = "cdf" @@ -281,12 +333,19 @@ class GlintRemovalStep: print( f"使用方法: SUGAR (迭代次数={sugar_iter}, 掩膜方法={sugar_glint_mask_method_fixed})" ) - output_path = str(deglint_dir / "deglint_sugar.bsq") + hardcoded_bsq = str(deglint_dir / "deglint_sugar.bsq") + hardcoded_hdr = hardcoded_bsq.replace(".bsq", ".hdr") + if output_path: + final_bsq = output_path.replace('.dat', '.bsq').replace('.tif', '.bsq') + final_hdr = final_bsq.replace(".bsq", ".hdr") + else: + final_bsq = hardcoded_bsq + final_hdr = hardcoded_hdr - if Path(output_path).exists(): - print(f"检测到已存在的去耀斑影像文件,直接使用: {output_path}") - notify("skipped", f"去耀斑影像已设置: {output_path}") - return output_path + if Path(hardcoded_bsq).exists(): + print(f"检测到已存在的去耀斑影像文件,直接使用: {hardcoded_bsq}") + notify("skipped", f"去耀斑影像已设置: {hardcoded_bsq}") + return _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) if sugar_bounds is None: sugar_bounds = [(1, 2)] @@ -299,14 +358,15 @@ class GlintRemovalStep: glint_mask_method=sugar_glint_mask_method_fixed, termination_thresh=sugar_termination_thresh, water_mask=mask_for_algorithm, - output_path=output_path, + output_path=hardcoded_bsq, ) - if Path(output_path).exists(): - _copy_hdr_info(img_path, output_path) - notify("completed", f"去耀斑影像已生成: {output_path}") - return output_path - raise RuntimeError(f"SUGAR算法未生成输出文件: {output_path}") + if Path(hardcoded_bsq).exists(): + _copy_hdr_info(img_path, hardcoded_bsq) + final = _safe_rename(hardcoded_bsq, hardcoded_hdr, final_bsq, final_hdr) + notify("completed", f"去耀斑影像已生成: {final}") + return final + raise RuntimeError(f"SUGAR算法未生成输出文件: {hardcoded_bsq}") else: raise ValueError( diff --git a/src/core/water_quality_inversion_pipeline_GUI.py b/src/core/water_quality_inversion_pipeline_GUI.py index d0c2c5f..1419c57 100644 --- a/src/core/water_quality_inversion_pipeline_GUI.py +++ b/src/core/water_quality_inversion_pipeline_GUI.py @@ -532,6 +532,7 @@ class WaterQualityInversionPipeline: sugar_glint_mask_method: str = "cdf", sugar_iter: Optional[int] = 3, sugar_termination_thresh: float = 20.0, + output_path: Optional[str] = None, skip_dependency_check: bool = False) -> str: """步骤3: 去除耀斑(Facade)""" step_start_time = time.time() @@ -575,6 +576,7 @@ class WaterQualityInversionPipeline: deglint_dir=str(self.deglint_dir), water_mask_dir=str(self.water_mask_dir), callback=self._notify, + output_path=output_path, ) self.deglint_img_path = result step_end_time = time.time() diff --git a/src/gui/panels/step3_panel.py b/src/gui/panels/step3_panel.py index 5311df5..c1eb23b 100644 --- a/src/gui/panels/step3_panel.py +++ b/src/gui/panels/step3_panel.py @@ -209,7 +209,7 @@ class Step3Panel(QWidget): "输出影像:", "Image Files (*.bsq *.dat *.tif);;All Files (*.*)" ) - self.output_file.line_edit.setPlaceholderText("deglint_image.dat") + self.output_file.line_edit.setPlaceholderText("deglint_image.bsq") layout.addWidget(self.output_file) # 启用步骤 @@ -301,7 +301,7 @@ class Step3Panel(QWidget): if self.work_dir: output_dir = os.path.join(self.work_dir, "3_deglint") os.makedirs(output_dir, exist_ok=True) - default_output_path = os.path.join(output_dir, "deglint_image.dat").replace('\\', '/') + default_output_path = os.path.join(output_dir, "deglint_image.bsq").replace('\\', '/') self.output_file.set_path(default_output_path) else: self.output_file.set_path("")