From 2671c0837ad8b59779e24ad92a7dc6ccfe4ad8ad Mon Sep 17 00:00:00 2001 From: DXC Date: Wed, 10 Jun 2026 17:13:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(step8):=20=E6=96=B0=E5=A2=9E=20Step8=20?= =?UTF-8?q?=E6=B0=B4=E8=89=B2=E6=8C=87=E6=95=B0=E5=8F=8D=E6=BC=94=20GUI=20?= =?UTF-8?q?=E9=9D=A2=E6=9D=BF=20step8=5Fwaterindex=5Fpanel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gui/panels/step6_panel.py | 423 +++++++++++++++++ src/gui/panels/step8_qaa_panel.py | 315 +++++++++++++ src/gui/panels/step8_waterindex_panel.py | 569 +++++++++++++++++++++++ 3 files changed, 1307 insertions(+) create mode 100644 src/gui/panels/step6_panel.py create mode 100644 src/gui/panels/step8_qaa_panel.py create mode 100644 src/gui/panels/step8_waterindex_panel.py diff --git a/src/gui/panels/step6_panel.py b/src/gui/panels/step6_panel.py new file mode 100644 index 0000000..6afde8d --- /dev/null +++ b/src/gui/panels/step6_panel.py @@ -0,0 +1,423 @@ +import os +import sys +import pandas as pd +import numpy as np +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +from PyQt5.QtWidgets import ( + QWidget, QVBoxLayout, QGroupBox, QGridLayout, + QHBoxLayout, QLabel, QCheckBox, QPushButton, QMessageBox, + QScrollArea, QListWidget, QListWidgetItem, QAbstractItemView, + QRadioButton, QButtonGroup +) +from PyQt5.QtCore import Qt +from PyQt5.QtGui import QColor, QBrush, QFont + +from src.gui.components.custom_widgets import FileSelectWidget +from src.gui.styles import ModernStylesheet + + +def get_resource_path(relative_path: str) -> str: + """适配开发与 PyInstaller 环境的路径获取逻辑。""" + if hasattr(sys, '_MEIPASS'): + internal = os.path.join(sys._MEIPASS, '_internal', relative_path) + if os.path.exists(internal): + return internal + return os.path.join(sys._MEIPASS, relative_path) + + exe_dir = os.path.dirname(sys.executable) + internal = os.path.join(exe_dir, '_internal', relative_path) + if os.path.exists(internal): + return internal + + base_dir = Path(__file__).resolve().parent.parent / "model" + return str(base_dir / os.path.basename(relative_path)) + + +class Step6Panel(QWidget): + COLOR_RATIO = QColor(255, 255, 255) + COLOR_CONCENTRATION = QColor(220, 240, 255) + COLOR_HEADER = QColor(245, 245, 245) + + def __init__(self, parent=None): + super().__init__(parent) + self.index_checkboxes: Dict[str, QListWidgetItem] = {} + self.work_dir: Optional[str] = None + self.builtin_formula_path = get_resource_path("waterindex.csv") + self._formula_type_map: Dict[str, str] = {} + self._formula_color_map: Dict[str, QColor] = {} + self._formula_coef_map: Dict[str, List[float]] = {} + + self.init_ui() + self._auto_load_formulas() + + def init_ui(self): + main_layout = QVBoxLayout() + main_layout.setContentsMargins(20, 20, 20, 20) + main_layout.setSpacing(10) + + # 1. 公式配置源 (只读) + path_group = QGroupBox("公式配置源 (内置)") + path_layout = QVBoxLayout() + self.formula_csv_widget = FileSelectWidget("内置CSV路径:", "CSV Files (*.csv)") + self.formula_csv_widget.set_path(self.builtin_formula_path) + self.formula_csv_widget.set_read_only(True) + self.formula_csv_widget.line_edit.setStyleSheet("background-color: #f0f0f0; color: #666;") + path_layout.addWidget(self.formula_csv_widget) + path_group.setLayout(path_layout) + main_layout.addWidget(path_group) + + # 2. 训练数据输入 + input_group = QGroupBox("输入样本数据") + input_layout = QVBoxLayout() + self.training_data_widget = FileSelectWidget("特征提取CSV:", "CSV Files (*.csv)") + input_layout.addWidget(self.training_data_widget) + input_group.setLayout(input_layout) + main_layout.addWidget(input_group) + + # 3. 公式选择区 (分组 ListWidget) + self.formula_group = QGroupBox("待计算水质指数勾选") + formula_outer_layout = QVBoxLayout() + + btn_layout = QHBoxLayout() + self.select_all_btn = QPushButton("全选") + self.deselect_all_btn = QPushButton("清空") + self.select_ratio_btn = QPushButton("仅选比值型") + self.select_conc_btn = QPushButton("仅选浓度型") + self.select_all_btn.clicked.connect(self.select_all_formulas) + self.deselect_all_btn.clicked.connect(self.deselect_all_formulas) + self.select_ratio_btn.clicked.connect(self._select_ratio_only) + self.select_conc_btn.clicked.connect(self._select_conc_only) + btn_layout.addWidget(self.select_all_btn) + btn_layout.addWidget(self.deselect_all_btn) + btn_layout.addWidget(self.select_ratio_btn) + btn_layout.addWidget(self.select_conc_btn) + btn_layout.addStretch() + + self.refresh_button = QPushButton("重新加载") + self.refresh_button.clicked.connect(lambda: self.refresh_formulas(silent=False)) + btn_layout.addWidget(self.refresh_button) + + formula_outer_layout.addLayout(btn_layout) + + scroll = QScrollArea() + scroll.setWidgetResizable(True) + scroll.setMinimumHeight(280) + self.scroll_content = QWidget() + self.formula_layout = QVBoxLayout(self.scroll_content) + self.formula_layout.setContentsMargins(4, 4, 4, 4) + self.formula_layout.setSpacing(2) + self.formula_layout.setAlignment(Qt.AlignTop) + + self.formula_list = QListWidget() + self.formula_list.setSelectionMode(QAbstractItemView.MultiSelection) + self.formula_list.itemChanged.connect(self._on_item_changed) + self.formula_layout.addWidget(self.formula_list) + + scroll.setWidget(self.scroll_content) + formula_outer_layout.addWidget(scroll) + + self.formula_group.setLayout(formula_outer_layout) + main_layout.addWidget(self.formula_group) + + # 4. 输出选项 + output_group = QGroupBox("输出模式") + output_layout = QVBoxLayout() + + mode_layout = QHBoxLayout() + self.mode_group = QButtonGroup() + self.radio_both = QRadioButton("两者皆出") + self.radio_wide = QRadioButton("仅宽表") + self.radio_single = QRadioButton("仅单文件") + self.mode_group.addButton(self.radio_both, 0) + self.mode_group.addButton(self.radio_wide, 1) + self.mode_group.addButton(self.radio_single, 2) + self.radio_both.setChecked(True) + mode_layout.addWidget(self.radio_both) + mode_layout.addWidget(self.radio_wide) + mode_layout.addWidget(self.radio_single) + mode_layout.addStretch() + output_layout.addLayout(mode_layout) + + self.enable_checkbox = QCheckBox("启用计算流程") + self.enable_checkbox.setChecked(True) + output_layout.addWidget(self.enable_checkbox) + + output_group.setLayout(output_layout) + main_layout.addWidget(output_group) + + # 5. 运行按钮 + self.run_button = QPushButton("立即执行计算") + self.run_button.setStyleSheet(ModernStylesheet.get_button_stylesheet('success')) + self.run_button.setMinimumHeight(40) + self.run_button.clicked.connect(self.run_step) + main_layout.addWidget(self.run_button) + + self.setLayout(main_layout) + + def _on_item_changed(self, item: QListWidgetItem): + if item.checkState() == Qt.Checked: + bg_color = self.COLOR_RATIO + for name, ref_item in self.index_checkboxes.items(): + if ref_item is item: + bg_color = self._formula_color_map.get(name, self.COLOR_RATIO) + break + item.setBackground(QBrush(bg_color)) + else: + item.setBackground(QBrush(self.COLOR_RATIO)) + + def _auto_load_formulas(self): + if os.path.exists(self.builtin_formula_path): + self.refresh_formulas(silent=True) + else: + print(f"DEBUG: 自动加载失败,路径不存在: {self.builtin_formula_path}") + + def refresh_formulas(self, silent=False): + path = self.builtin_formula_path + if not os.path.exists(path): + if not silent: + QMessageBox.warning(self, "错误", f"找不到内置公式文件:\n{path}") + return + + try: + df = None + for enc in ('utf-8', 'gbk', 'utf-8-sig'): + try: + df = pd.read_csv(path, encoding=enc) + if 'Formula_Name' in df.columns: + break + except Exception: + continue + + if df is None or 'Formula_Name' not in df.columns: + if not silent: + QMessageBox.critical(self, "错误", "CSV缺少 'Formula_Name' 列") + return + + self._formula_type_map.clear() + self._formula_coef_map.clear() + for _, row in df.iterrows(): + name = str(row['Formula_Name']).strip() + if not name: + continue + ftype = str(row.get('Formula_Type', 'ratio')).strip().lower() + self._formula_type_map[name] = ftype + + # Parse Coefficient for concentration formulas + coef_str = str(row.get('Coefficient', '')).strip() + if coef_str: + try: + coeffs = [float(c.strip()) for c in coef_str.split(',') if c.strip()] + self._formula_coef_map[name] = coeffs + except Exception: + self._formula_coef_map[name] = [] + else: + self._formula_coef_map[name] = [] + + self.formula_list.clear() + self.index_checkboxes.clear() + + self._formula_color_map.clear() + for name, ftype in self._formula_type_map.items(): + item = QListWidgetItem(name, self.formula_list) + item.setCheckState(Qt.Checked) + if ftype == 'concentration': + bg_color = QColor(220, 240, 255) + else: + bg_color = self.COLOR_RATIO + self._formula_color_map[name] = bg_color + item.setBackground(QBrush(bg_color)) + self.index_checkboxes[name] = item + + self.formula_list.adjustSize() + print(f"✅ 加载 {len(self.index_checkboxes)} 个公式") + + except Exception as e: + if not silent: + QMessageBox.critical(self, "加载失败", f"原因: {str(e)}") + + def _select_ratio_only(self): + for name, item in self.index_checkboxes.items(): + ftype = self._formula_type_map.get(name, 'ratio') + item.setCheckState(Qt.Checked if ftype == 'ratio' else Qt.Unchecked) + + def _select_conc_only(self): + for name, item in self.index_checkboxes.items(): + ftype = self._formula_type_map.get(name, 'ratio') + item.setCheckState(Qt.Checked if ftype == 'concentration' else Qt.Unchecked) + + def select_all_formulas(self): + for item in self.index_checkboxes.values(): + item.setCheckState(Qt.Checked) + + def deselect_all_formulas(self): + for item in self.index_checkboxes.values(): + item.setCheckState(Qt.Unchecked) + + def get_config(self) -> Dict: + selected = [ + name for name, item in self.index_checkboxes.items() + if item.checkState() == Qt.Checked + ] + # Build coefficient dict for selected formulas + formula_coefficients = { + name: self._formula_coef_map.get(name, []) + for name in selected + } + return { + 'training_csv_path': self.training_data_widget.get_path(), + 'formula_csv_file': self.builtin_formula_path, + 'formula_names': selected, + 'formula_coefficients': formula_coefficients, + 'enabled': self.enable_checkbox.isChecked(), + 'output_mode': self.mode_group.checkedId(), + } + + def set_config(self, config: Dict): + if 'training_csv_path' in config: + self.training_data_widget.set_path(config['training_csv_path']) + if 'formula_names' in config: + sel = set(config['formula_names']) + for name, item in self.index_checkboxes.items(): + item.setCheckState(Qt.Checked if name in sel else Qt.Unchecked) + self.enable_checkbox.setChecked(config.get('enabled', True)) + if 'output_mode' in config: + btn = self.mode_group.button(config['output_mode']) + if btn: + btn.setChecked(True) + + def update_from_config(self, work_dir=None, pipeline=None): + if work_dir: + self.work_dir = work_dir + main = self.window() + if hasattr(main, 'step5_panel'): + p5 = main.step5_panel.output_file.get_path() + if p5: + if not os.path.isabs(p5): + p5 = os.path.join(self.work_dir or '', p5) + p5 = p5.replace('\\', '/') + self.training_data_widget.set_path(p5) + + def _get_work_dir(self) -> Optional[str]: + if self.work_dir: + return self.work_dir + main = self.window() + if hasattr(main, 'work_dir') and main.work_dir: + return main.work_dir + return None + + def _get_coord_cols(self, df: pd.DataFrame) -> Tuple[str, str]: + coord_candidates = ['lon', 'lng', 'longitude', '经度', 'x', 'lon_utm', 'utm_x', 'pixel_x'] + lat_candidates = ['lat', 'latitude', '纬度', 'y', 'lat_utm', 'utm_y', 'pixel_y'] + + x_col, y_col = None, None + for col in df.columns: + cl = col.lower() + if x_col is None and any(c in cl for c in coord_candidates): + x_col = col + if y_col is None and any(c in cl for c in lat_candidates): + y_col = col + + if x_col is None and len(df.columns) >= 2: + x_col = df.columns[0] + if y_col is None and len(df.columns) >= 2: + y_col = df.columns[1] + + return x_col or 'x_coord', y_col or 'y_coord' + + def run_step(self): + config = self.get_config() + + if not config['enabled']: + QMessageBox.information(self, "提示", "已禁用计算流程(启用计算流程未勾选)") + return + + training_path = config['training_csv_path'] + if not training_path or not os.path.exists(training_path): + QMessageBox.warning(self, "提示", "请先选择输入特征提取CSV文件") + return + + formula_names = config['formula_names'] + if not formula_names: + QMessageBox.warning(self, "提示", "请至少勾选一个公式") + return + + output_mode = config['output_mode'] + + try: + from src.core.steps.data_preparation_step import DataPreparationStep + + spec_df = pd.read_csv(training_path) + x_col, y_col = self._get_coord_cols(spec_df) + + # 构建 formula_csv_path(使用内置 waterindex.csv) + formula_csv_path = self.builtin_formula_path + if not formula_csv_path or not os.path.exists(formula_csv_path): + # 尝试从 src/gui/model/ 目录找 + possible_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'gui', 'model', 'waterindex.csv') + if os.path.exists(possible_path): + formula_csv_path = possible_path + + work_dir = self._get_work_dir() + + # 调用 DataPreparationStep 的静态方法计算水质指数(宽表输出) + indices_csv_path = DataPreparationStep.calculate_water_quality_indices( + training_csv_path=training_path, + formula_csv_file=formula_csv_path, + formula_names=formula_names, + output_file=None, # 不在此处指定输出,由下面的双轨输出逻辑接管 + enabled=True, + output_dir=work_dir if work_dir else os.getcwd(), + ) + + # 读取计算结果(宽表) + if indices_csv_path and os.path.exists(indices_csv_path): + output_df = pd.read_csv(indices_csv_path) + else: + output_df = spec_df # fallback + + track_a_path = None + track_b_dir = None + + if output_mode in (0, 1): + track_a_dir = os.path.join(work_dir, "6_water_quality_indices") if work_dir else "6_water_quality_indices" + os.makedirs(track_a_dir, exist_ok=True) + track_a_path = os.path.join(track_a_dir, "training_spectra_indices.csv") + + if output_mode in (0, 2): + track_b_dir = os.path.join(work_dir, "11_12_13_predictions", "Traditional_Indices") if work_dir else "11_12_13_predictions/Traditional_Indices" + os.makedirs(track_b_dir, exist_ok=True) + + saved = [] + if output_mode in (0, 1): + output_df.to_csv(track_a_path, index=False, float_format='%.6f') + saved.append(f"宽表: {track_a_path}") + + if output_mode in (0, 2): + coord_x = spec_df[x_col].values if x_col in spec_df.columns else np.arange(len(spec_df)) + coord_y = spec_df[y_col].values if y_col in spec_df.columns else np.zeros(len(spec_df)) + + for formula_name in formula_names: + if formula_name not in output_df.columns: + continue + single_df = pd.DataFrame({ + 'x_coord': coord_x, + 'y_coord': coord_y, + 'value': output_df[formula_name].values, + }) + safe_name = formula_name.replace('/', '_').replace(' ', '_') + out_path = os.path.join(track_b_dir, f"{safe_name}_prediction.csv") + single_df.to_csv(out_path, index=False, float_format='%.6f') + saved.append(f"单文件目录: {track_b_dir}") + + QMessageBox.information( + self, "计算完成", + f"已保存 {len(saved)} 个输出目标:\n" + "\n".join(saved) + ) + + except ImportError as e: + QMessageBox.critical(self, "依赖错误", f"无法导入模块:\n{e}") + except Exception as e: + import traceback + QMessageBox.critical(self, "计算失败", f"原因: {str(e)}\n{traceback.format_exc()}") \ No newline at end of file diff --git a/src/gui/panels/step8_qaa_panel.py b/src/gui/panels/step8_qaa_panel.py new file mode 100644 index 0000000..2a9ae37 --- /dev/null +++ b/src/gui/panels/step8_qaa_panel.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Step8 面板 - QAA 物理反演(非经验模型) +""" + +import os + +from PyQt5.QtWidgets import ( + QWidget, QVBoxLayout, QGroupBox, QFormLayout, QGridLayout, + QHBoxLayout, QLabel, QLineEdit, QComboBox, QCheckBox, + QPushButton, QFileDialog, QMessageBox, +) +from PyQt5.QtGui import QFont +from PyQt5.QtCore import Qt + +from src.gui.components.custom_widgets import FileSelectWidget +from src.gui.styles import ModernStylesheet +from src.utils.water_owt_config import ( + get_all_lake_names, + get_lake_config, + get_lambda_0, + get_default_lake, +) +from src.core.algorithms.qaa import QAABaselineSolver + + +class Step8QAAPanel(QWidget): + """步骤8:QAA 物理反演(非经验模型)""" + def __init__(self, parent=None): + super().__init__(parent) + self.init_ui() + + def init_ui(self): + layout = QVBoxLayout() + + title = QLabel("步骤8:QAA 物理反演(非经验模型)") + title.setFont(QFont("Arial", 12, QFont.Bold)) + layout.addWidget(title) + + # 光谱 CSV 文件输入 + self.spectrum_csv_file = FileSelectWidget( + "光谱 CSV 文件:", + "CSV Files (*.csv);;All Files (*.*)" + ) + self.spectrum_csv_file.line_edit.setPlaceholderText( + "选择实测光谱或采样点光谱 CSV(含波长列)" + ) + layout.addWidget(self.spectrum_csv_file) + + # 水域类型选择 + lake_group = QGroupBox("水域类型配置") + lake_layout = QFormLayout() + + self.lake_combo = QComboBox() + lake_names = get_all_lake_names() + self.lake_combo.addItems(lake_names) + default_lake = get_default_lake() + if default_lake in lake_names: + self.lake_combo.setCurrentText(default_lake) + self.lake_combo.currentTextChanged.connect(self._on_lake_changed) + lake_layout.addRow("水域选择:", self.lake_combo) + + # 参考波长显示 + self.lambda_0_label = QLabel() + self.lambda_0_label.setStyleSheet( + f"color: {ModernStylesheet.COLORS['accent']}; " + f"font-weight: bold;" + ) + lake_layout.addRow("参考波长 λ₀:", self.lambda_0_label) + + # 算法提示 + self.hint_label = QLabel() + self.hint_label.setWordWrap(True) + self.hint_label.setStyleSheet( + f"color: {ModernStylesheet.COLORS['text_secondary']}; " + "font-size: 12px;" + ) + lake_layout.addRow("算法提示:", self.hint_label) + + lake_group.setLayout(lake_layout) + layout.addWidget(lake_group) + + # 输出路径 + self.output_path = FileSelectWidget( + "输出文件:", + "CSV Files (*.csv);;All Files (*.*)", + mode="save" + ) + self.output_path.line_edit.setPlaceholderText( + "自动生成到 8_QAA_Inversion,或手动指定..." + ) + self.output_path.browse_btn.clicked.disconnect() + self.output_path.browse_btn.clicked.connect(self.browse_output_path) + layout.addWidget(self.output_path) + + # 启用步骤 + self.enable_checkbox = QCheckBox("启用此步骤") + self.enable_checkbox.setChecked(False) + layout.addWidget(self.enable_checkbox) + + # 独立运行按钮 + self.run_btn = QPushButton("执行 QAA 反演") + self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet('success')) + self.run_btn.clicked.connect(self.run_step) + layout.addWidget(self.run_btn) + + layout.addStretch() + self.setLayout(layout) + + self._on_lake_changed(self.lake_combo.currentText()) + + def _on_lake_changed(self, lake_name: str): + """当用户切换水域时更新显示""" + cfg = get_lake_config(lake_name) + if cfg: + self.lambda_0_label.setText( + f"{cfg['lambda_0']} nm({cfg['qaa_version']})" + ) + self.hint_label.setText(cfg.get('notes', '')) + else: + self.lambda_0_label.setText("—") + self.hint_label.setText("") + + def _get_default_work_dir(self) -> str: + """获取 work_dir,优先用 panel 自身缓存的,否则尝试从主窗口取""" + if hasattr(self, 'work_dir') and self.work_dir: + return str(self.work_dir) + mw = self.window() + if mw and hasattr(mw, 'work_dir') and mw.work_dir: + return str(mw.work_dir) + return "" + + def browse_output_path(self): + """浏览输出文件路径""" + current = self.output_path.get_path().strip() + if current: + initial_dir = os.path.dirname(current) + initial_file = os.path.basename(current) + else: + initial_dir = "" + initial_file = "" + + if not initial_dir or not os.path.isdir(initial_dir): + work_dir = self._get_default_work_dir() + initial_dir = os.path.join(work_dir, "8_QAA_Inversion") if work_dir else "" + if initial_dir and not os.path.isdir(initial_dir): + os.makedirs(initial_dir, exist_ok=True) + + file_path, _ = QFileDialog.getSaveFileName( + self, "保存输出文件", + os.path.join(initial_dir, initial_file) if initial_file else initial_dir, + "CSV Files (*.csv);;All Files (*.*)" + ) + if file_path: + self.output_path.set_path(file_path) + + def get_config(self) -> dict: + """获取面板配置""" + config = { + 'lake_name': self.lake_combo.currentText(), + 'lambda_0': get_lambda_0(self.lake_combo.currentText()), + 'spectrum_csv_path': self.spectrum_csv_file.get_path(), + } + output_path = self.output_path.get_path() + if output_path: + config['output_path'] = output_path + return config + + def set_config(self, config: dict): + """设置面板配置""" + if 'lake_name' in config: + lake_name = config['lake_name'] + idx = self.lake_combo.findText(lake_name) + if idx >= 0: + self.lake_combo.setCurrentIndex(idx) + if 'spectrum_csv_path' in config: + self.spectrum_csv_file.set_path(config['spectrum_csv_path']) + if 'output_path' in config: + self.output_path.set_path(config['output_path']) + + def update_from_config(self, work_dir=None, pipeline=None): + """从全局配置自动填充训练数据和输出路径""" + if work_dir: + self.work_dir = work_dir + elif hasattr(self, 'work_dir') and self.work_dir: + pass + else: + self.work_dir = None + + main_window = self.window() + + if main_window and hasattr(main_window, 'step5_panel'): + step5_output = main_window.step5_panel.output_file.get_path() + if step5_output: + if not os.path.isabs(step5_output): + step5_output = os.path.join(self.work_dir or '', step5_output).replace('\\', '/') + self.spectrum_csv_file.set_path(step5_output) + + if self.work_dir: + qaa_dir = os.path.join(self.work_dir, "8_QAA_Inversion") + os.makedirs(qaa_dir, exist_ok=True) + output_path = os.path.join(qaa_dir, "a_lambda_results.csv").replace('\\', '/') + self.output_path.set_path(output_path) + else: + self.output_path.set_path("") + + def run_step(self): + """独立运行 QAA 反演""" + spectrum_path = self.spectrum_csv_file.get_path() + if not spectrum_path: + QMessageBox.warning(self, "输入错误", "请选择光谱 CSV 文件!") + return + + main_window = self.window() + if hasattr(main_window, 'run_single_step'): + config = {'step8_qaa': self.get_config()} + main_window.run_single_step('step8_qaa', config) + else: + self._run_qaa_direct() + + def _run_qaa_direct(self): + """直接执行 QAA 反演(不依赖主窗口流水线)""" + spectrum_path = self.spectrum_csv_file.get_path() + output_path = self.output_path.get_path() + lake_name = self.lake_combo.currentText() + lambda_0 = get_lambda_0(lake_name) + + if not output_path: + work_dir = self._get_default_work_dir() + qaa_dir = os.path.join(work_dir, "8_QAA_Inversion") if work_dir else "" + if qaa_dir and not os.path.isdir(qaa_dir): + os.makedirs(qaa_dir, exist_ok=True) + output_path = os.path.join(qaa_dir, "a_lambda_results.csv").replace('\\', '/') + + try: + import numpy as np + import pandas as pd + df = pd.read_csv(spectrum_path, encoding="utf-8-sig") + col_names = df.columns.tolist() + + wavelength_col_idx = None + for i, col in enumerate(col_names): + try: + float(col) + wavelength_col_idx = i + break + except (ValueError, TypeError): + pass + + if wavelength_col_idx is None: + QMessageBox.warning( + self, "解析错误", + "无法从 CSV 列名中识别波长信息,请确保列名包含数值型波长(nm)" + ) + return + + wavelengths = np.array( + [float(c) for c in col_names[wavelength_col_idx:]], dtype=np.float64 + ) + data_matrix = df.iloc[:, wavelength_col_idx:].values.astype(np.float64) + + if data_matrix.ndim == 1: + data_matrix = data_matrix[np.newaxis, :] + + solver = QAABaselineSolver() + raw_result = solver.run_inversion(wavelengths, data_matrix, lambda_0) + + # run_inversion 返回:单样本 → dict,多样本 → list[dict] + if isinstance(raw_result, list): + sample_results = raw_result + aw_0 = raw_result[0].get('aw_0', 0) + bbw_0 = raw_result[0].get('bbw_0', 0) + else: + sample_results = [raw_result] + aw_0 = raw_result.get('aw_0', 0) + bbw_0 = raw_result.get('bbw_0', 0) + + rows_out = [] + for i, sample_result in enumerate(sample_results): + wl_arr = wavelengths + a_arr = sample_result['a_lambda'] + bb_arr = sample_result['bb_lambda'] + for j, wl in enumerate(wl_arr): + rows_out.append({ + 'sample_id': f"sample_{i}", + 'Wavelength': wl, + 'a_lambda': a_arr[j], + 'bb_lambda': bb_arr[j], + }) + + result_df = pd.DataFrame(rows_out) + os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) + result_df.to_csv(output_path, index=False, float_format='%.8f') + + QMessageBox.information( + self, "执行成功", + f"QAA 反演完成!\n" + f"水域: {lake_name}\n" + f"参考波长 λ₀: {lambda_0} nm\n" + f"λ₀ 处 aw: {aw_0:.6f} m⁻¹\n" + f"λ₀ 处 bbw: {bbw_0:.6f} m⁻¹\n" + f"结果已保存到:\n{output_path}" + ) + + except Exception as e: + QMessageBox.critical(self, "执行错误", f"QAA 反演失败:\n{str(e)}") + + def get_training_params(self) -> dict: + """获取反演参数""" + return { + 'pipeline_type': 'qaa_non_empirical', + 'lake_name': self.lake_combo.currentText(), + 'lambda_0': get_lambda_0(self.lake_combo.currentText()), + } \ No newline at end of file diff --git a/src/gui/panels/step8_waterindex_panel.py b/src/gui/panels/step8_waterindex_panel.py new file mode 100644 index 0000000..55dabdf --- /dev/null +++ b/src/gui/panels/step8_waterindex_panel.py @@ -0,0 +1,569 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Step8 面板 - 水色指数反演(直接处理去耀斑 BSQ 影像) + +将 waterindex.csv 中的公式直接应用于去耀斑高光谱影像, +输出各水质参数指数的 GeoTIFF 栅格图像。 +""" + +import os +import traceback +from pathlib import Path +from typing import Dict, List, Optional + +from PyQt5.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QFormLayout, + QGroupBox, QLabel, QLineEdit, QComboBox, QCheckBox, QPushButton, + QFileDialog, QMessageBox, QListWidget, QListWidgetItem, + QAbstractItemView, QProgressBar, QTextEdit, QFrame, + QScrollArea, QSizePolicy, +) +from PyQt5.QtGui import QFont +from PyQt5.QtCore import Qt, QThread, pyqtSignal + +from src.gui.components.custom_widgets import FileSelectWidget +from src.gui.styles import ModernStylesheet + + +class WaterIndexWorker(QThread): + """后台线程:执行水色指数反演""" + finished_ok = pyqtSignal(dict) + failed = pyqtSignal(str) + progress = pyqtSignal(str, float) # message, percent + log = pyqtSignal(str) + + def __init__( + self, + bsq_path: str, + hdr_path: str, + output_dir: str, + selected_formulas: List[str], + waterindex_csv: str, + water_mask_path: Optional[str] = None, + work_dir: Optional[str] = None, + ): + super().__init__() + self.bsq_path = bsq_path + self.hdr_path = hdr_path + self.output_dir = output_dir + self.selected_formulas = selected_formulas + self.waterindex_csv = waterindex_csv + self.water_mask_path = water_mask_path + self.work_dir = work_dir + + def run(self): + try: + from src.core.algorithms.waterindex_inversion import WaterIndexProcessor + + self.progress.emit("正在初始化水色指数处理器…", 2) + + processor = WaterIndexProcessor(self.waterindex_csv) + + self.progress.emit("正在读取影像元数据…", 5) + + # 获取影像元数据 + meta = processor.get_image_metadata(self.bsq_path, self.hdr_path) + if not meta: + self.failed.emit("无法读取影像元数据,请检查 BSQ 和 HDR 文件是否匹配") + return + + n_bands = meta.get('bands', 0) + wv_range = meta.get('wavelength_range', '未知') + self.log.emit( + f"影像信息: {meta['width']}×{meta['height']} 像素, " + f"{n_bands} 波段, {wv_range}" + ) + + if self.water_mask_path: + self.log.emit(f"使用水域掩膜: {self.water_mask_path}") + + # 使用 run_inversion 入口(含掩膜拦截链路) + results = processor.run_inversion( + deglint_img_path=self.bsq_path, + work_dir=self.work_dir or self.output_dir, + formula_csv_path=self.waterindex_csv, + selected_formulas=self.selected_formulas, + water_mask_path=self.water_mask_path, + callback=self._on_progress, + ) + + self.progress.emit(f"完成!共生成 {len(results)} 个指数图", 100) + self.finished_ok.emit(results) + + except Exception as e: + self.failed.emit(f"{e}\n{traceback.format_exc()}") + + def _on_progress(self, msg: str, pct: float): + self.progress.emit(msg, pct) + + +class Step8WaterIndexPanel(QWidget): + """步骤8:水色指数反演(直接处理 BSQ 影像)""" + + def __init__(self, parent=None): + super().__init__(parent) + self._worker: Optional[WaterIndexWorker] = None + self._waterindex_csv = self._find_waterindex_csv() + self._categories: List[str] = [] + self._all_formulas: List[Dict] = [] + self._formula_list_widgets: Dict[str, QListWidgetItem] = {} + self.init_ui() + self._load_formulas() + + def init_ui(self): + layout = QVBoxLayout() + + # ---- 标题 ---- + title = QLabel("步骤8:水色指数反演(高光谱影像直接处理)") + title.setFont(QFont("Arial", 12, QFont.Bold)) + layout.addWidget(title) + + # ---- 说明 ---- + hint = QLabel( + "将 waterindex.csv 中的公式直接应用于去耀斑高光谱影像(BSQ)," + "输出各水质参数指数的 GeoTIFF 栅格图像。" + "指数图可直接用于水质专题图生成。" + ) + hint.setWordWrap(True) + hint.setStyleSheet(f"color: {ModernStylesheet.COLORS.get('text_secondary', '#666')};") + layout.addWidget(hint) + + # ---- 输入影像选择 ---- + input_group = QGroupBox("输入影像") + input_layout = QFormLayout() + + self.bsq_file = FileSelectWidget( + "去耀斑 BSQ 影像:", + "BSQ Files (*.bsq);;DAT Files (*.dat);;All Files (*.*)" + ) + self.bsq_file.line_edit.setPlaceholderText("选择去耀斑处理后的 BSQ 影像") + self.bsq_file.browse_btn.clicked.disconnect() + self.bsq_file.browse_btn.clicked.connect(self._browse_bsq) + input_layout.addRow("BSQ 影像:", self.bsq_file) + + self.hdr_file = FileSelectWidget( + "ENVI 头文件:", + "HDR Files (*.hdr);;All Files (*.*)" + ) + self.hdr_file.line_edit.setPlaceholderText("自动关联同路径 .hdr 文件") + self.hdr_file.browse_btn.clicked.disconnect() + self.hdr_file.browse_btn.clicked.connect(self._browse_hdr) + input_layout.addRow("HDR 文件:", self.hdr_file) + + # 影像信息显示 + self.meta_label = QLabel("未加载影像") + self.meta_label.setStyleSheet( + "background: #f0f0f0; padding: 4px 8px; border-radius: 4px; " + "font-size: 12px; color: #333;" + ) + input_layout.addRow("影像信息:", self.meta_label) + + input_group.setLayout(input_layout) + layout.addWidget(input_group) + + # ---- 公式选择 ---- + formula_group = QGroupBox("公式选择") + formula_layout = QGridLayout() + + # 类别过滤 + formula_layout.addWidget(QLabel("按类别筛选:"), 0, 0) + self.category_combo = QComboBox() + self.category_combo.currentTextChanged.connect(self._on_category_changed) + formula_layout.addWidget(self.category_combo, 0, 1, 1, 2) + + # 全选/取消全选 + select_btn_layout = QHBoxLayout() + self.select_all_btn = QPushButton("全选") + self.select_all_btn.setMaximumWidth(80) + self.select_all_btn.clicked.connect(self._select_all) + select_btn_layout.addWidget(self.select_all_btn) + + self.deselect_all_btn = QPushButton("取消全选") + self.deselect_all_btn.setMaximumWidth(80) + self.deselect_all_btn.clicked.connect(self._deselect_all) + select_btn_layout.addWidget(self.deselect_all_btn) + select_btn_layout.addStretch() + formula_layout.addLayout(select_btn_layout, 0, 3) + + # 公式列表 + self.formula_list = QListWidget() + self.formula_list.setSelectionMode(QAbstractItemView.MultiSelection) + self.formula_list.setMinimumHeight(200) + self.formula_list.itemChanged.connect(self._on_item_changed) + formula_layout.addWidget(self.formula_list, 1, 0, 1, 4) + + formula_group.setLayout(formula_layout) + layout.addWidget(formula_group) + + # ---- 输出设置 ---- + output_group = QGroupBox("输出设置") + output_layout = QFormLayout() + + self.output_dir = FileSelectWidget( + "输出目录:", + "Directories" + ) + self.output_dir.line_edit.setPlaceholderText("留空 → 工作目录/8_WaterIndex_Images") + self.output_dir.browse_btn.clicked.disconnect() + self.output_dir.browse_btn.clicked.connect(self._browse_output_dir) + output_layout.addRow("输出目录:", self.output_dir) + + self.format_combo = QComboBox() + self.format_combo.addItems(["GTiff (GeoTIFF)", "ENVI", "PCI"]) + self.format_combo.setCurrentIndex(0) + output_layout.addRow("输出格式:", self.format_combo) + + output_group.setLayout(output_layout) + layout.addWidget(output_group) + + # ---- 进度显示 ---- + self.progress_bar = QProgressBar() + self.progress_bar.setMinimum(0) + self.progress_bar.setMaximum(100) + self.progress_bar.setValue(0) + self.progress_bar.setTextVisible(True) + layout.addWidget(self.progress_bar) + + self.progress_label = QLabel("") + self.progress_label.setStyleSheet("font-size: 11px; color: #666;") + layout.addWidget(self.progress_label) + + # ---- 启用 & 运行 ---- + self.enable_checkbox = QCheckBox("启用此步骤") + self.enable_checkbox.setChecked(True) + layout.addWidget(self.enable_checkbox) + + self.run_btn = QPushButton("▶ 执行水色指数反演") + self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet('success')) + self.run_btn.clicked.connect(self.run_step) + layout.addWidget(self.run_btn) + + layout.addStretch() + self.setLayout(layout) + + def _find_waterindex_csv(self) -> str: + """查找 waterindex.csv 路径""" + candidates = [ + Path(__file__).parent.parent.parent / "model" / "waterindex.csv", + Path(__file__).parent.parent.parent.parent / "src" / "gui" / "model" / "waterindex.csv", + ] + for c in candidates: + if c.exists(): + return str(c) + return "" + + def _load_formulas(self): + """加载 waterindex.csv 中的公式""" + if not self._waterindex_csv or not Path(self._waterindex_csv).exists(): + self.meta_label.setText("⚠️ waterindex.csv 未找到") + return + + import csv + self._all_formulas = [] + try: + with open(self._waterindex_csv, 'r', encoding='utf-8-sig') as f: + reader = csv.DictReader(f) + self._all_formulas = list(reader) + except Exception as e: + self.meta_label.setText(f"⚠️ 加载公式失败: {e}") + return + + # 提取所有类别 + cats = set() + for f in self._all_formulas: + c = f.get('Category', '').strip() + if c: + cats.add(c) + + self._categories = sorted(cats) + self.category_combo.clear() + self.category_combo.addItem("全部") + self.category_combo.addItems(self._categories) + + self._populate_list("全部") + + def _populate_list(self, category: str): + """根据类别填充公式列表""" + self.formula_list.clear() + self._formula_list_widgets.clear() + + formulas_to_show = ( + [f for f in self._all_formulas if f.get('Category', '') == category] + if category != "全部" + else self._all_formulas + ) + + for f in formulas_to_show: + name = f.get('Formula_Name', '') + formula_str = f.get('Formula', '') + cat = f.get('Category', '') + ftype = f.get('Formula_Type', '') + + item = QListWidgetItem() + item.setFlags(item.flags() | Qt.ItemIsUserCheckable) + item.setCheckState(Qt.Checked) + item.setData(Qt.UserRole, name) + item.setText( + f"☑ {name} [{cat}] ({ftype})\n {formula_str}" + ) + item.setToolTip(f"{name}\n{category}\n{formula_str}") + self.formula_list.addItem(item) + self._formula_list_widgets[name] = item + + def _on_category_changed(self, category: str): + self._populate_list(category) + + def _select_all(self): + for item in self.formula_list.selectedItems(): + item.setCheckState(Qt.Checked) + # 也全选当前显示的 + for i in range(self.formula_list.count()): + it = self.formula_list.item(i) + it.setCheckState(Qt.Checked) + + def _deselect_all(self): + for i in range(self.formula_list.count()): + it = self.formula_list.item(i) + it.setCheckState(Qt.Unchecked) + + def _on_item_changed(self, item: QListWidgetItem): + pass # 可扩展:实时统计选中数量 + + def _browse_bsq(self): + path, _ = QFileDialog.getOpenFileName( + self, "选择去耀斑 BSQ 影像", + "", + "BSQ Files (*.bsq);;DAT Files (*.dat);;All Files (*.*)" + ) + if path: + self.bsq_file.set_path(path) + # 自动关联同路径 hdr + hdr = Path(path).with_suffix('.hdr') + if hdr.exists(): + self.hdr_file.set_path(str(hdr)) + self._load_metadata(path, str(hdr) if hdr.exists() else "") + + def _browse_hdr(self): + path, _ = QFileDialog.getOpenFileName( + self, "选择 ENVI 头文件", + "", + "HDR Files (*.hdr);;All Files (*.*)" + ) + if path: + self.hdr_file.set_path(path) + bsq_path = self.bsq_file.get_path() + if bsq_path: + self._load_metadata(bsq_path, path) + + def _browse_output_dir(self): + d = QFileDialog.getExistingDirectory(self, "选择输出目录", "") + if d: + self.output_dir.set_path(d) + + def _load_metadata(self, bsq_path: str, hdr_path: str): + """加载并显示影像元数据""" + if not bsq_path or not Path(bsq_path).exists(): + self.meta_label.setText("⚠️ 影像文件不存在") + return + if not hdr_path or not Path(hdr_path).exists(): + self.meta_label.setText("⚠️ 头文件不存在") + return + + try: + from src.core.algorithms.waterindex_inversion import WaterIndexProcessor + processor = WaterIndexProcessor(self._waterindex_csv) + meta = processor.get_image_metadata(bsq_path, hdr_path) + if meta: + self.meta_label.setText( + f"✅ {meta['width']}×{meta['height']} | " + f"{meta['bands']} 波段 | {meta.get('wavelength_range', '未知')} | " + f"驱动: {meta['driver']}" + ) + else: + self.meta_label.setText("⚠️ 无法读取元数据") + except Exception as e: + self.meta_label.setText(f"⚠️ 元数据读取失败: {e}") + + def _get_selected_formula_names(self) -> List[str]: + names = [] + for i in range(self.formula_list.count()): + item = self.formula_list.item(i) + if item.checkState() == Qt.Checked: + name = item.data(Qt.UserRole) + if name: + names.append(name) + return names + + def _get_default_work_dir(self) -> str: + if hasattr(self, 'work_dir') and self.work_dir: + return str(self.work_dir) + mw = self.window() + if mw and hasattr(mw, 'work_dir') and mw.work_dir: + return str(mw.work_dir) + return "" + + def get_config(self) -> dict: + bsq = self.bsq_file.get_path() + return { + 'bsq_path': bsq, + 'hdr_path': self.hdr_file.get_path(), + 'deglint_img_path': bsq, + 'output_dir': self.output_dir.get_path(), + 'output_format': self.format_combo.currentText().split()[0], + 'selected_formulas': self._get_selected_formula_names(), + } + + def set_config(self, config: dict): + if config.get('bsq_path'): + self.bsq_file.set_path(config['bsq_path']) + if config.get('hdr_path'): + self.hdr_file.set_path(config['hdr_path']) + if config.get('output_dir'): + self.output_dir.set_path(config['output_dir']) + if 'selected_formulas' in config: + names = set(config['selected_formulas']) + for i in range(self.formula_list.count()): + item = self.formula_list.item(i) + name = item.data(Qt.UserRole) + item.setCheckState(Qt.Checked if name in names else Qt.Unchecked) + + def update_from_config(self, work_dir=None, pipeline=None): + if work_dir: + self.work_dir = work_dir + elif hasattr(self, 'work_dir') and self.work_dir: + pass + else: + self.work_dir = None + + main_window = self.window() + + # 自动填入去耀斑影像 + if main_window and hasattr(main_window, 'step3_panel'): + deglint_path = main_window.step3_panel.output_file.get_path() + if deglint_path and not self.bsq_file.get_path(): + if not os.path.isabs(deglint_path): + deglint_path = os.path.join(self.work_dir or '', deglint_path).replace('\\', '/') + self.bsq_file.set_path(deglint_path) + hdr = Path(deglint_path).with_suffix('.hdr') + if hdr.exists(): + self.hdr_file.set_path(str(hdr)) + self._load_metadata(deglint_path, str(hdr)) + + # 自动填入输出目录 + if self.work_dir: + out_dir = os.path.join(self.work_dir, "8_WaterIndex_Images").replace('\\', '/') + os.makedirs(out_dir, exist_ok=True) + if not self.output_dir.get_path(): + self.output_dir.set_path(out_dir) + + def run_step(self): + bsq_path = self.bsq_file.get_path().strip() + hdr_path = self.hdr_file.get_path().strip() + output_dir = self.output_dir.get_path().strip() + + # 验证输入 + if not bsq_path: + QMessageBox.warning(self, "输入错误", "请选择去耀斑 BSQ 影像!") + return + if not Path(bsq_path).exists(): + QMessageBox.warning(self, "输入错误", f"BSQ 影像不存在:\n{bsq_path}") + return + if not hdr_path: + # 尝试自动查找 + auto_hdr = Path(bsq_path).with_suffix('.hdr') + if auto_hdr.exists(): + hdr_path = str(auto_hdr) + self.hdr_file.set_path(hdr_path) + else: + QMessageBox.warning(self, "输入错误", "请选择 ENVI 头文件!") + return + if not Path(hdr_path).exists(): + QMessageBox.warning(self, "输入错误", f"HDR 文件不存在:\n{hdr_path}") + return + if not output_dir: + work_dir = self._get_default_work_dir() + output_dir = os.path.join(work_dir, "8_WaterIndex_Images").replace('\\', '/') + os.makedirs(output_dir, exist_ok=True) + self.output_dir.set_path(output_dir) + + selected = self._get_selected_formula_names() + if not selected: + QMessageBox.warning(self, "输入错误", "请至少选择一个公式!") + return + + if self._waterindex_csv and not Path(self._waterindex_csv).exists(): + QMessageBox.warning(self, "配置错误", f"waterindex.csv 不存在:\n{self._waterindex_csv}") + return + + # ── 自动扫描工作目录下的水域掩膜文件 ──────────────────────────── + work_dir = self.work_dir or str(Path(bsq_path).parent) + mask_dir = os.path.join(work_dir, "1_water_mask") + water_mask_path: Optional[str] = None + if os.path.isdir(mask_dir): + # ★★★ glob 智能扫描:取任意 .dat 或 .tif 文件 ★★★ + for pattern in ("*.dat", "*.tif", "*.TIF", "*.DT"): + candidates = sorted(Path(mask_dir).glob(pattern)) + if candidates: + water_mask_path = str(candidates[0]) + break + + if water_mask_path: + print(f"[Step8] 自动找到水域掩膜: {water_mask_path}") + else: + print(f"[Step8] 未找到水域掩膜,跳过陆地剔除(陆地将保留在指数图中)") + + # 开始后台处理 + self.run_btn.setEnabled(False) + self.progress_bar.setValue(0) + self.progress_label.setText("") + + self._worker = WaterIndexWorker( + bsq_path=bsq_path, + hdr_path=hdr_path, + output_dir=output_dir, + selected_formulas=selected, + waterindex_csv=self._waterindex_csv, + water_mask_path=water_mask_path, + work_dir=work_dir, + ) + self._worker.progress.connect(self._on_progress) + self._worker.finished_ok.connect(self._on_finished) + self._worker.failed.connect(self._on_failed) + self._worker.log.connect(lambda m: self.progress_label.setText(m)) + self._worker.start() + + def _on_progress(self, msg: str, pct: float): + self.progress_bar.setValue(int(pct)) + self.progress_label.setText(msg) + + def _on_finished(self, results: Dict[str, str]): + self.run_btn.setEnabled(True) + n = len(results) + QMessageBox.information( + self, "执行成功", + f"水色指数反演完成!\n" + f"共生成 {n} 个指数图(GeoTIFF)。\n\n" + f"输出目录: {self.output_dir.get_path()}" + ) + main_window = self.window() + if main_window and hasattr(main_window, 'log_message'): + main_window.log_message(f"步骤8:水色指数反演完成,生成 {n} 个指数图", "info") + + def _on_failed(self, err: str): + self.run_btn.setEnabled(True) + self.progress_bar.setValue(0) + QMessageBox.critical(self, "执行错误", f"水色指数反演失败:\n\n{err[:500]}") + + def get_output_dir(self) -> str: + return self.output_dir.get_path().strip() or "" + + def get_output_tif_paths(self) -> List[str]: + """获取输出目录下的所有 GeoTIFF 文件路径""" + out_dir = self.get_output_dir() + if not out_dir or not os.path.isdir(out_dir): + return [] + return sorted( + str(p) for p in Path(out_dir).glob("*.tif") + if p.is_file() + ) \ No newline at end of file