feat(step8): 新增 Step8 水色指数反演 GUI 面板 step8_waterindex_panel

This commit is contained in:
DXC
2026-06-10 17:13:37 +08:00
parent 320f2f18f2
commit 2671c0837a
3 changed files with 1307 additions and 0 deletions

View File

@ -0,0 +1,423 @@
import os
import sys
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QGroupBox, QGridLayout,
QHBoxLayout, QLabel, QCheckBox, QPushButton, QMessageBox,
QScrollArea, QListWidget, QListWidgetItem, QAbstractItemView,
QRadioButton, QButtonGroup
)
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QColor, QBrush, QFont
from src.gui.components.custom_widgets import FileSelectWidget
from src.gui.styles import ModernStylesheet
def get_resource_path(relative_path: str) -> str:
"""适配开发与 PyInstaller 环境的路径获取逻辑。"""
if hasattr(sys, '_MEIPASS'):
internal = os.path.join(sys._MEIPASS, '_internal', relative_path)
if os.path.exists(internal):
return internal
return os.path.join(sys._MEIPASS, relative_path)
exe_dir = os.path.dirname(sys.executable)
internal = os.path.join(exe_dir, '_internal', relative_path)
if os.path.exists(internal):
return internal
base_dir = Path(__file__).resolve().parent.parent / "model"
return str(base_dir / os.path.basename(relative_path))
class Step6Panel(QWidget):
COLOR_RATIO = QColor(255, 255, 255)
COLOR_CONCENTRATION = QColor(220, 240, 255)
COLOR_HEADER = QColor(245, 245, 245)
def __init__(self, parent=None):
super().__init__(parent)
self.index_checkboxes: Dict[str, QListWidgetItem] = {}
self.work_dir: Optional[str] = None
self.builtin_formula_path = get_resource_path("waterindex.csv")
self._formula_type_map: Dict[str, str] = {}
self._formula_color_map: Dict[str, QColor] = {}
self._formula_coef_map: Dict[str, List[float]] = {}
self.init_ui()
self._auto_load_formulas()
def init_ui(self):
main_layout = QVBoxLayout()
main_layout.setContentsMargins(20, 20, 20, 20)
main_layout.setSpacing(10)
# 1. 公式配置源 (只读)
path_group = QGroupBox("公式配置源 (内置)")
path_layout = QVBoxLayout()
self.formula_csv_widget = FileSelectWidget("内置CSV路径:", "CSV Files (*.csv)")
self.formula_csv_widget.set_path(self.builtin_formula_path)
self.formula_csv_widget.set_read_only(True)
self.formula_csv_widget.line_edit.setStyleSheet("background-color: #f0f0f0; color: #666;")
path_layout.addWidget(self.formula_csv_widget)
path_group.setLayout(path_layout)
main_layout.addWidget(path_group)
# 2. 训练数据输入
input_group = QGroupBox("输入样本数据")
input_layout = QVBoxLayout()
self.training_data_widget = FileSelectWidget("特征提取CSV:", "CSV Files (*.csv)")
input_layout.addWidget(self.training_data_widget)
input_group.setLayout(input_layout)
main_layout.addWidget(input_group)
# 3. 公式选择区 (分组 ListWidget)
self.formula_group = QGroupBox("待计算水质指数勾选")
formula_outer_layout = QVBoxLayout()
btn_layout = QHBoxLayout()
self.select_all_btn = QPushButton("全选")
self.deselect_all_btn = QPushButton("清空")
self.select_ratio_btn = QPushButton("仅选比值型")
self.select_conc_btn = QPushButton("仅选浓度型")
self.select_all_btn.clicked.connect(self.select_all_formulas)
self.deselect_all_btn.clicked.connect(self.deselect_all_formulas)
self.select_ratio_btn.clicked.connect(self._select_ratio_only)
self.select_conc_btn.clicked.connect(self._select_conc_only)
btn_layout.addWidget(self.select_all_btn)
btn_layout.addWidget(self.deselect_all_btn)
btn_layout.addWidget(self.select_ratio_btn)
btn_layout.addWidget(self.select_conc_btn)
btn_layout.addStretch()
self.refresh_button = QPushButton("重新加载")
self.refresh_button.clicked.connect(lambda: self.refresh_formulas(silent=False))
btn_layout.addWidget(self.refresh_button)
formula_outer_layout.addLayout(btn_layout)
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setMinimumHeight(280)
self.scroll_content = QWidget()
self.formula_layout = QVBoxLayout(self.scroll_content)
self.formula_layout.setContentsMargins(4, 4, 4, 4)
self.formula_layout.setSpacing(2)
self.formula_layout.setAlignment(Qt.AlignTop)
self.formula_list = QListWidget()
self.formula_list.setSelectionMode(QAbstractItemView.MultiSelection)
self.formula_list.itemChanged.connect(self._on_item_changed)
self.formula_layout.addWidget(self.formula_list)
scroll.setWidget(self.scroll_content)
formula_outer_layout.addWidget(scroll)
self.formula_group.setLayout(formula_outer_layout)
main_layout.addWidget(self.formula_group)
# 4. 输出选项
output_group = QGroupBox("输出模式")
output_layout = QVBoxLayout()
mode_layout = QHBoxLayout()
self.mode_group = QButtonGroup()
self.radio_both = QRadioButton("两者皆出")
self.radio_wide = QRadioButton("仅宽表")
self.radio_single = QRadioButton("仅单文件")
self.mode_group.addButton(self.radio_both, 0)
self.mode_group.addButton(self.radio_wide, 1)
self.mode_group.addButton(self.radio_single, 2)
self.radio_both.setChecked(True)
mode_layout.addWidget(self.radio_both)
mode_layout.addWidget(self.radio_wide)
mode_layout.addWidget(self.radio_single)
mode_layout.addStretch()
output_layout.addLayout(mode_layout)
self.enable_checkbox = QCheckBox("启用计算流程")
self.enable_checkbox.setChecked(True)
output_layout.addWidget(self.enable_checkbox)
output_group.setLayout(output_layout)
main_layout.addWidget(output_group)
# 5. 运行按钮
self.run_button = QPushButton("立即执行计算")
self.run_button.setStyleSheet(ModernStylesheet.get_button_stylesheet('success'))
self.run_button.setMinimumHeight(40)
self.run_button.clicked.connect(self.run_step)
main_layout.addWidget(self.run_button)
self.setLayout(main_layout)
def _on_item_changed(self, item: QListWidgetItem):
if item.checkState() == Qt.Checked:
bg_color = self.COLOR_RATIO
for name, ref_item in self.index_checkboxes.items():
if ref_item is item:
bg_color = self._formula_color_map.get(name, self.COLOR_RATIO)
break
item.setBackground(QBrush(bg_color))
else:
item.setBackground(QBrush(self.COLOR_RATIO))
def _auto_load_formulas(self):
if os.path.exists(self.builtin_formula_path):
self.refresh_formulas(silent=True)
else:
print(f"DEBUG: 自动加载失败,路径不存在: {self.builtin_formula_path}")
def refresh_formulas(self, silent=False):
path = self.builtin_formula_path
if not os.path.exists(path):
if not silent:
QMessageBox.warning(self, "错误", f"找不到内置公式文件:\n{path}")
return
try:
df = None
for enc in ('utf-8', 'gbk', 'utf-8-sig'):
try:
df = pd.read_csv(path, encoding=enc)
if 'Formula_Name' in df.columns:
break
except Exception:
continue
if df is None or 'Formula_Name' not in df.columns:
if not silent:
QMessageBox.critical(self, "错误", "CSV缺少 'Formula_Name'")
return
self._formula_type_map.clear()
self._formula_coef_map.clear()
for _, row in df.iterrows():
name = str(row['Formula_Name']).strip()
if not name:
continue
ftype = str(row.get('Formula_Type', 'ratio')).strip().lower()
self._formula_type_map[name] = ftype
# Parse Coefficient for concentration formulas
coef_str = str(row.get('Coefficient', '')).strip()
if coef_str:
try:
coeffs = [float(c.strip()) for c in coef_str.split(',') if c.strip()]
self._formula_coef_map[name] = coeffs
except Exception:
self._formula_coef_map[name] = []
else:
self._formula_coef_map[name] = []
self.formula_list.clear()
self.index_checkboxes.clear()
self._formula_color_map.clear()
for name, ftype in self._formula_type_map.items():
item = QListWidgetItem(name, self.formula_list)
item.setCheckState(Qt.Checked)
if ftype == 'concentration':
bg_color = QColor(220, 240, 255)
else:
bg_color = self.COLOR_RATIO
self._formula_color_map[name] = bg_color
item.setBackground(QBrush(bg_color))
self.index_checkboxes[name] = item
self.formula_list.adjustSize()
print(f"✅ 加载 {len(self.index_checkboxes)} 个公式")
except Exception as e:
if not silent:
QMessageBox.critical(self, "加载失败", f"原因: {str(e)}")
def _select_ratio_only(self):
for name, item in self.index_checkboxes.items():
ftype = self._formula_type_map.get(name, 'ratio')
item.setCheckState(Qt.Checked if ftype == 'ratio' else Qt.Unchecked)
def _select_conc_only(self):
for name, item in self.index_checkboxes.items():
ftype = self._formula_type_map.get(name, 'ratio')
item.setCheckState(Qt.Checked if ftype == 'concentration' else Qt.Unchecked)
def select_all_formulas(self):
for item in self.index_checkboxes.values():
item.setCheckState(Qt.Checked)
def deselect_all_formulas(self):
for item in self.index_checkboxes.values():
item.setCheckState(Qt.Unchecked)
def get_config(self) -> Dict:
selected = [
name for name, item in self.index_checkboxes.items()
if item.checkState() == Qt.Checked
]
# Build coefficient dict for selected formulas
formula_coefficients = {
name: self._formula_coef_map.get(name, [])
for name in selected
}
return {
'training_csv_path': self.training_data_widget.get_path(),
'formula_csv_file': self.builtin_formula_path,
'formula_names': selected,
'formula_coefficients': formula_coefficients,
'enabled': self.enable_checkbox.isChecked(),
'output_mode': self.mode_group.checkedId(),
}
def set_config(self, config: Dict):
if 'training_csv_path' in config:
self.training_data_widget.set_path(config['training_csv_path'])
if 'formula_names' in config:
sel = set(config['formula_names'])
for name, item in self.index_checkboxes.items():
item.setCheckState(Qt.Checked if name in sel else Qt.Unchecked)
self.enable_checkbox.setChecked(config.get('enabled', True))
if 'output_mode' in config:
btn = self.mode_group.button(config['output_mode'])
if btn:
btn.setChecked(True)
def update_from_config(self, work_dir=None, pipeline=None):
if work_dir:
self.work_dir = work_dir
main = self.window()
if hasattr(main, 'step5_panel'):
p5 = main.step5_panel.output_file.get_path()
if p5:
if not os.path.isabs(p5):
p5 = os.path.join(self.work_dir or '', p5)
p5 = p5.replace('\\', '/')
self.training_data_widget.set_path(p5)
def _get_work_dir(self) -> Optional[str]:
if self.work_dir:
return self.work_dir
main = self.window()
if hasattr(main, 'work_dir') and main.work_dir:
return main.work_dir
return None
def _get_coord_cols(self, df: pd.DataFrame) -> Tuple[str, str]:
coord_candidates = ['lon', 'lng', 'longitude', '经度', 'x', 'lon_utm', 'utm_x', 'pixel_x']
lat_candidates = ['lat', 'latitude', '纬度', 'y', 'lat_utm', 'utm_y', 'pixel_y']
x_col, y_col = None, None
for col in df.columns:
cl = col.lower()
if x_col is None and any(c in cl for c in coord_candidates):
x_col = col
if y_col is None and any(c in cl for c in lat_candidates):
y_col = col
if x_col is None and len(df.columns) >= 2:
x_col = df.columns[0]
if y_col is None and len(df.columns) >= 2:
y_col = df.columns[1]
return x_col or 'x_coord', y_col or 'y_coord'
def run_step(self):
config = self.get_config()
if not config['enabled']:
QMessageBox.information(self, "提示", "已禁用计算流程(启用计算流程未勾选)")
return
training_path = config['training_csv_path']
if not training_path or not os.path.exists(training_path):
QMessageBox.warning(self, "提示", "请先选择输入特征提取CSV文件")
return
formula_names = config['formula_names']
if not formula_names:
QMessageBox.warning(self, "提示", "请至少勾选一个公式")
return
output_mode = config['output_mode']
try:
from src.core.steps.data_preparation_step import DataPreparationStep
spec_df = pd.read_csv(training_path)
x_col, y_col = self._get_coord_cols(spec_df)
# 构建 formula_csv_path使用内置 waterindex.csv
formula_csv_path = self.builtin_formula_path
if not formula_csv_path or not os.path.exists(formula_csv_path):
# 尝试从 src/gui/model/ 目录找
possible_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'gui', 'model', 'waterindex.csv')
if os.path.exists(possible_path):
formula_csv_path = possible_path
work_dir = self._get_work_dir()
# 调用 DataPreparationStep 的静态方法计算水质指数(宽表输出)
indices_csv_path = DataPreparationStep.calculate_water_quality_indices(
training_csv_path=training_path,
formula_csv_file=formula_csv_path,
formula_names=formula_names,
output_file=None, # 不在此处指定输出,由下面的双轨输出逻辑接管
enabled=True,
output_dir=work_dir if work_dir else os.getcwd(),
)
# 读取计算结果(宽表)
if indices_csv_path and os.path.exists(indices_csv_path):
output_df = pd.read_csv(indices_csv_path)
else:
output_df = spec_df # fallback
track_a_path = None
track_b_dir = None
if output_mode in (0, 1):
track_a_dir = os.path.join(work_dir, "6_water_quality_indices") if work_dir else "6_water_quality_indices"
os.makedirs(track_a_dir, exist_ok=True)
track_a_path = os.path.join(track_a_dir, "training_spectra_indices.csv")
if output_mode in (0, 2):
track_b_dir = os.path.join(work_dir, "11_12_13_predictions", "Traditional_Indices") if work_dir else "11_12_13_predictions/Traditional_Indices"
os.makedirs(track_b_dir, exist_ok=True)
saved = []
if output_mode in (0, 1):
output_df.to_csv(track_a_path, index=False, float_format='%.6f')
saved.append(f"宽表: {track_a_path}")
if output_mode in (0, 2):
coord_x = spec_df[x_col].values if x_col in spec_df.columns else np.arange(len(spec_df))
coord_y = spec_df[y_col].values if y_col in spec_df.columns else np.zeros(len(spec_df))
for formula_name in formula_names:
if formula_name not in output_df.columns:
continue
single_df = pd.DataFrame({
'x_coord': coord_x,
'y_coord': coord_y,
'value': output_df[formula_name].values,
})
safe_name = formula_name.replace('/', '_').replace(' ', '_')
out_path = os.path.join(track_b_dir, f"{safe_name}_prediction.csv")
single_df.to_csv(out_path, index=False, float_format='%.6f')
saved.append(f"单文件目录: {track_b_dir}")
QMessageBox.information(
self, "计算完成",
f"已保存 {len(saved)} 个输出目标:\n" + "\n".join(saved)
)
except ImportError as e:
QMessageBox.critical(self, "依赖错误", f"无法导入模块:\n{e}")
except Exception as e:
import traceback
QMessageBox.critical(self, "计算失败", f"原因: {str(e)}\n{traceback.format_exc()}")

View File

@ -0,0 +1,315 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Step8 面板 - QAA 物理反演(非经验模型)
"""
import os
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QGroupBox, QFormLayout, QGridLayout,
QHBoxLayout, QLabel, QLineEdit, QComboBox, QCheckBox,
QPushButton, QFileDialog, QMessageBox,
)
from PyQt5.QtGui import QFont
from PyQt5.QtCore import Qt
from src.gui.components.custom_widgets import FileSelectWidget
from src.gui.styles import ModernStylesheet
from src.utils.water_owt_config import (
get_all_lake_names,
get_lake_config,
get_lambda_0,
get_default_lake,
)
from src.core.algorithms.qaa import QAABaselineSolver
class Step8QAAPanel(QWidget):
"""步骤8QAA 物理反演(非经验模型)"""
def __init__(self, parent=None):
super().__init__(parent)
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
title = QLabel("步骤8QAA 物理反演(非经验模型)")
title.setFont(QFont("Arial", 12, QFont.Bold))
layout.addWidget(title)
# 光谱 CSV 文件输入
self.spectrum_csv_file = FileSelectWidget(
"光谱 CSV 文件:",
"CSV Files (*.csv);;All Files (*.*)"
)
self.spectrum_csv_file.line_edit.setPlaceholderText(
"选择实测光谱或采样点光谱 CSV含波长列"
)
layout.addWidget(self.spectrum_csv_file)
# 水域类型选择
lake_group = QGroupBox("水域类型配置")
lake_layout = QFormLayout()
self.lake_combo = QComboBox()
lake_names = get_all_lake_names()
self.lake_combo.addItems(lake_names)
default_lake = get_default_lake()
if default_lake in lake_names:
self.lake_combo.setCurrentText(default_lake)
self.lake_combo.currentTextChanged.connect(self._on_lake_changed)
lake_layout.addRow("水域选择:", self.lake_combo)
# 参考波长显示
self.lambda_0_label = QLabel()
self.lambda_0_label.setStyleSheet(
f"color: {ModernStylesheet.COLORS['accent']}; "
f"font-weight: bold;"
)
lake_layout.addRow("参考波长 λ₀:", self.lambda_0_label)
# 算法提示
self.hint_label = QLabel()
self.hint_label.setWordWrap(True)
self.hint_label.setStyleSheet(
f"color: {ModernStylesheet.COLORS['text_secondary']}; "
"font-size: 12px;"
)
lake_layout.addRow("算法提示:", self.hint_label)
lake_group.setLayout(lake_layout)
layout.addWidget(lake_group)
# 输出路径
self.output_path = FileSelectWidget(
"输出文件:",
"CSV Files (*.csv);;All Files (*.*)",
mode="save"
)
self.output_path.line_edit.setPlaceholderText(
"自动生成到 8_QAA_Inversion或手动指定..."
)
self.output_path.browse_btn.clicked.disconnect()
self.output_path.browse_btn.clicked.connect(self.browse_output_path)
layout.addWidget(self.output_path)
# 启用步骤
self.enable_checkbox = QCheckBox("启用此步骤")
self.enable_checkbox.setChecked(False)
layout.addWidget(self.enable_checkbox)
# 独立运行按钮
self.run_btn = QPushButton("执行 QAA 反演")
self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet('success'))
self.run_btn.clicked.connect(self.run_step)
layout.addWidget(self.run_btn)
layout.addStretch()
self.setLayout(layout)
self._on_lake_changed(self.lake_combo.currentText())
def _on_lake_changed(self, lake_name: str):
"""当用户切换水域时更新显示"""
cfg = get_lake_config(lake_name)
if cfg:
self.lambda_0_label.setText(
f"{cfg['lambda_0']} nm{cfg['qaa_version']}"
)
self.hint_label.setText(cfg.get('notes', ''))
else:
self.lambda_0_label.setText("")
self.hint_label.setText("")
def _get_default_work_dir(self) -> str:
"""获取 work_dir优先用 panel 自身缓存的,否则尝试从主窗口取"""
if hasattr(self, 'work_dir') and self.work_dir:
return str(self.work_dir)
mw = self.window()
if mw and hasattr(mw, 'work_dir') and mw.work_dir:
return str(mw.work_dir)
return ""
def browse_output_path(self):
"""浏览输出文件路径"""
current = self.output_path.get_path().strip()
if current:
initial_dir = os.path.dirname(current)
initial_file = os.path.basename(current)
else:
initial_dir = ""
initial_file = ""
if not initial_dir or not os.path.isdir(initial_dir):
work_dir = self._get_default_work_dir()
initial_dir = os.path.join(work_dir, "8_QAA_Inversion") if work_dir else ""
if initial_dir and not os.path.isdir(initial_dir):
os.makedirs(initial_dir, exist_ok=True)
file_path, _ = QFileDialog.getSaveFileName(
self, "保存输出文件",
os.path.join(initial_dir, initial_file) if initial_file else initial_dir,
"CSV Files (*.csv);;All Files (*.*)"
)
if file_path:
self.output_path.set_path(file_path)
def get_config(self) -> dict:
"""获取面板配置"""
config = {
'lake_name': self.lake_combo.currentText(),
'lambda_0': get_lambda_0(self.lake_combo.currentText()),
'spectrum_csv_path': self.spectrum_csv_file.get_path(),
}
output_path = self.output_path.get_path()
if output_path:
config['output_path'] = output_path
return config
def set_config(self, config: dict):
"""设置面板配置"""
if 'lake_name' in config:
lake_name = config['lake_name']
idx = self.lake_combo.findText(lake_name)
if idx >= 0:
self.lake_combo.setCurrentIndex(idx)
if 'spectrum_csv_path' in config:
self.spectrum_csv_file.set_path(config['spectrum_csv_path'])
if 'output_path' in config:
self.output_path.set_path(config['output_path'])
def update_from_config(self, work_dir=None, pipeline=None):
"""从全局配置自动填充训练数据和输出路径"""
if work_dir:
self.work_dir = work_dir
elif hasattr(self, 'work_dir') and self.work_dir:
pass
else:
self.work_dir = None
main_window = self.window()
if main_window and hasattr(main_window, 'step5_panel'):
step5_output = main_window.step5_panel.output_file.get_path()
if step5_output:
if not os.path.isabs(step5_output):
step5_output = os.path.join(self.work_dir or '', step5_output).replace('\\', '/')
self.spectrum_csv_file.set_path(step5_output)
if self.work_dir:
qaa_dir = os.path.join(self.work_dir, "8_QAA_Inversion")
os.makedirs(qaa_dir, exist_ok=True)
output_path = os.path.join(qaa_dir, "a_lambda_results.csv").replace('\\', '/')
self.output_path.set_path(output_path)
else:
self.output_path.set_path("")
def run_step(self):
"""独立运行 QAA 反演"""
spectrum_path = self.spectrum_csv_file.get_path()
if not spectrum_path:
QMessageBox.warning(self, "输入错误", "请选择光谱 CSV 文件!")
return
main_window = self.window()
if hasattr(main_window, 'run_single_step'):
config = {'step8_qaa': self.get_config()}
main_window.run_single_step('step8_qaa', config)
else:
self._run_qaa_direct()
def _run_qaa_direct(self):
"""直接执行 QAA 反演(不依赖主窗口流水线)"""
spectrum_path = self.spectrum_csv_file.get_path()
output_path = self.output_path.get_path()
lake_name = self.lake_combo.currentText()
lambda_0 = get_lambda_0(lake_name)
if not output_path:
work_dir = self._get_default_work_dir()
qaa_dir = os.path.join(work_dir, "8_QAA_Inversion") if work_dir else ""
if qaa_dir and not os.path.isdir(qaa_dir):
os.makedirs(qaa_dir, exist_ok=True)
output_path = os.path.join(qaa_dir, "a_lambda_results.csv").replace('\\', '/')
try:
import numpy as np
import pandas as pd
df = pd.read_csv(spectrum_path, encoding="utf-8-sig")
col_names = df.columns.tolist()
wavelength_col_idx = None
for i, col in enumerate(col_names):
try:
float(col)
wavelength_col_idx = i
break
except (ValueError, TypeError):
pass
if wavelength_col_idx is None:
QMessageBox.warning(
self, "解析错误",
"无法从 CSV 列名中识别波长信息请确保列名包含数值型波长nm"
)
return
wavelengths = np.array(
[float(c) for c in col_names[wavelength_col_idx:]], dtype=np.float64
)
data_matrix = df.iloc[:, wavelength_col_idx:].values.astype(np.float64)
if data_matrix.ndim == 1:
data_matrix = data_matrix[np.newaxis, :]
solver = QAABaselineSolver()
raw_result = solver.run_inversion(wavelengths, data_matrix, lambda_0)
# run_inversion 返回:单样本 → dict多样本 → list[dict]
if isinstance(raw_result, list):
sample_results = raw_result
aw_0 = raw_result[0].get('aw_0', 0)
bbw_0 = raw_result[0].get('bbw_0', 0)
else:
sample_results = [raw_result]
aw_0 = raw_result.get('aw_0', 0)
bbw_0 = raw_result.get('bbw_0', 0)
rows_out = []
for i, sample_result in enumerate(sample_results):
wl_arr = wavelengths
a_arr = sample_result['a_lambda']
bb_arr = sample_result['bb_lambda']
for j, wl in enumerate(wl_arr):
rows_out.append({
'sample_id': f"sample_{i}",
'Wavelength': wl,
'a_lambda': a_arr[j],
'bb_lambda': bb_arr[j],
})
result_df = pd.DataFrame(rows_out)
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
result_df.to_csv(output_path, index=False, float_format='%.8f')
QMessageBox.information(
self, "执行成功",
f"QAA 反演完成!\n"
f"水域: {lake_name}\n"
f"参考波长 λ₀: {lambda_0} nm\n"
f"λ₀ 处 aw: {aw_0:.6f} m⁻¹\n"
f"λ₀ 处 bbw: {bbw_0:.6f} m⁻¹\n"
f"结果已保存到:\n{output_path}"
)
except Exception as e:
QMessageBox.critical(self, "执行错误", f"QAA 反演失败:\n{str(e)}")
def get_training_params(self) -> dict:
"""获取反演参数"""
return {
'pipeline_type': 'qaa_non_empirical',
'lake_name': self.lake_combo.currentText(),
'lambda_0': get_lambda_0(self.lake_combo.currentText()),
}

View File

@ -0,0 +1,569 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Step8 面板 - 水色指数反演(直接处理去耀斑 BSQ 影像)
将 waterindex.csv 中的公式直接应用于去耀斑高光谱影像,
输出各水质参数指数的 GeoTIFF 栅格图像。
"""
import os
import traceback
from pathlib import Path
from typing import Dict, List, Optional
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QFormLayout,
QGroupBox, QLabel, QLineEdit, QComboBox, QCheckBox, QPushButton,
QFileDialog, QMessageBox, QListWidget, QListWidgetItem,
QAbstractItemView, QProgressBar, QTextEdit, QFrame,
QScrollArea, QSizePolicy,
)
from PyQt5.QtGui import QFont
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from src.gui.components.custom_widgets import FileSelectWidget
from src.gui.styles import ModernStylesheet
class WaterIndexWorker(QThread):
"""后台线程:执行水色指数反演"""
finished_ok = pyqtSignal(dict)
failed = pyqtSignal(str)
progress = pyqtSignal(str, float) # message, percent
log = pyqtSignal(str)
def __init__(
self,
bsq_path: str,
hdr_path: str,
output_dir: str,
selected_formulas: List[str],
waterindex_csv: str,
water_mask_path: Optional[str] = None,
work_dir: Optional[str] = None,
):
super().__init__()
self.bsq_path = bsq_path
self.hdr_path = hdr_path
self.output_dir = output_dir
self.selected_formulas = selected_formulas
self.waterindex_csv = waterindex_csv
self.water_mask_path = water_mask_path
self.work_dir = work_dir
def run(self):
try:
from src.core.algorithms.waterindex_inversion import WaterIndexProcessor
self.progress.emit("正在初始化水色指数处理器…", 2)
processor = WaterIndexProcessor(self.waterindex_csv)
self.progress.emit("正在读取影像元数据…", 5)
# 获取影像元数据
meta = processor.get_image_metadata(self.bsq_path, self.hdr_path)
if not meta:
self.failed.emit("无法读取影像元数据,请检查 BSQ 和 HDR 文件是否匹配")
return
n_bands = meta.get('bands', 0)
wv_range = meta.get('wavelength_range', '未知')
self.log.emit(
f"影像信息: {meta['width']}×{meta['height']} 像素, "
f"{n_bands} 波段, {wv_range}"
)
if self.water_mask_path:
self.log.emit(f"使用水域掩膜: {self.water_mask_path}")
# 使用 run_inversion 入口(含掩膜拦截链路)
results = processor.run_inversion(
deglint_img_path=self.bsq_path,
work_dir=self.work_dir or self.output_dir,
formula_csv_path=self.waterindex_csv,
selected_formulas=self.selected_formulas,
water_mask_path=self.water_mask_path,
callback=self._on_progress,
)
self.progress.emit(f"完成!共生成 {len(results)} 个指数图", 100)
self.finished_ok.emit(results)
except Exception as e:
self.failed.emit(f"{e}\n{traceback.format_exc()}")
def _on_progress(self, msg: str, pct: float):
self.progress.emit(msg, pct)
class Step8WaterIndexPanel(QWidget):
"""步骤8水色指数反演直接处理 BSQ 影像)"""
def __init__(self, parent=None):
super().__init__(parent)
self._worker: Optional[WaterIndexWorker] = None
self._waterindex_csv = self._find_waterindex_csv()
self._categories: List[str] = []
self._all_formulas: List[Dict] = []
self._formula_list_widgets: Dict[str, QListWidgetItem] = {}
self.init_ui()
self._load_formulas()
def init_ui(self):
layout = QVBoxLayout()
# ---- 标题 ----
title = QLabel("步骤8水色指数反演高光谱影像直接处理")
title.setFont(QFont("Arial", 12, QFont.Bold))
layout.addWidget(title)
# ---- 说明 ----
hint = QLabel(
"将 waterindex.csv 中的公式直接应用于去耀斑高光谱影像BSQ"
"输出各水质参数指数的 GeoTIFF 栅格图像。"
"指数图可直接用于水质专题图生成。"
)
hint.setWordWrap(True)
hint.setStyleSheet(f"color: {ModernStylesheet.COLORS.get('text_secondary', '#666')};")
layout.addWidget(hint)
# ---- 输入影像选择 ----
input_group = QGroupBox("输入影像")
input_layout = QFormLayout()
self.bsq_file = FileSelectWidget(
"去耀斑 BSQ 影像:",
"BSQ Files (*.bsq);;DAT Files (*.dat);;All Files (*.*)"
)
self.bsq_file.line_edit.setPlaceholderText("选择去耀斑处理后的 BSQ 影像")
self.bsq_file.browse_btn.clicked.disconnect()
self.bsq_file.browse_btn.clicked.connect(self._browse_bsq)
input_layout.addRow("BSQ 影像:", self.bsq_file)
self.hdr_file = FileSelectWidget(
"ENVI 头文件:",
"HDR Files (*.hdr);;All Files (*.*)"
)
self.hdr_file.line_edit.setPlaceholderText("自动关联同路径 .hdr 文件")
self.hdr_file.browse_btn.clicked.disconnect()
self.hdr_file.browse_btn.clicked.connect(self._browse_hdr)
input_layout.addRow("HDR 文件:", self.hdr_file)
# 影像信息显示
self.meta_label = QLabel("未加载影像")
self.meta_label.setStyleSheet(
"background: #f0f0f0; padding: 4px 8px; border-radius: 4px; "
"font-size: 12px; color: #333;"
)
input_layout.addRow("影像信息:", self.meta_label)
input_group.setLayout(input_layout)
layout.addWidget(input_group)
# ---- 公式选择 ----
formula_group = QGroupBox("公式选择")
formula_layout = QGridLayout()
# 类别过滤
formula_layout.addWidget(QLabel("按类别筛选:"), 0, 0)
self.category_combo = QComboBox()
self.category_combo.currentTextChanged.connect(self._on_category_changed)
formula_layout.addWidget(self.category_combo, 0, 1, 1, 2)
# 全选/取消全选
select_btn_layout = QHBoxLayout()
self.select_all_btn = QPushButton("全选")
self.select_all_btn.setMaximumWidth(80)
self.select_all_btn.clicked.connect(self._select_all)
select_btn_layout.addWidget(self.select_all_btn)
self.deselect_all_btn = QPushButton("取消全选")
self.deselect_all_btn.setMaximumWidth(80)
self.deselect_all_btn.clicked.connect(self._deselect_all)
select_btn_layout.addWidget(self.deselect_all_btn)
select_btn_layout.addStretch()
formula_layout.addLayout(select_btn_layout, 0, 3)
# 公式列表
self.formula_list = QListWidget()
self.formula_list.setSelectionMode(QAbstractItemView.MultiSelection)
self.formula_list.setMinimumHeight(200)
self.formula_list.itemChanged.connect(self._on_item_changed)
formula_layout.addWidget(self.formula_list, 1, 0, 1, 4)
formula_group.setLayout(formula_layout)
layout.addWidget(formula_group)
# ---- 输出设置 ----
output_group = QGroupBox("输出设置")
output_layout = QFormLayout()
self.output_dir = FileSelectWidget(
"输出目录:",
"Directories"
)
self.output_dir.line_edit.setPlaceholderText("留空 → 工作目录/8_WaterIndex_Images")
self.output_dir.browse_btn.clicked.disconnect()
self.output_dir.browse_btn.clicked.connect(self._browse_output_dir)
output_layout.addRow("输出目录:", self.output_dir)
self.format_combo = QComboBox()
self.format_combo.addItems(["GTiff (GeoTIFF)", "ENVI", "PCI"])
self.format_combo.setCurrentIndex(0)
output_layout.addRow("输出格式:", self.format_combo)
output_group.setLayout(output_layout)
layout.addWidget(output_group)
# ---- 进度显示 ----
self.progress_bar = QProgressBar()
self.progress_bar.setMinimum(0)
self.progress_bar.setMaximum(100)
self.progress_bar.setValue(0)
self.progress_bar.setTextVisible(True)
layout.addWidget(self.progress_bar)
self.progress_label = QLabel("")
self.progress_label.setStyleSheet("font-size: 11px; color: #666;")
layout.addWidget(self.progress_label)
# ---- 启用 & 运行 ----
self.enable_checkbox = QCheckBox("启用此步骤")
self.enable_checkbox.setChecked(True)
layout.addWidget(self.enable_checkbox)
self.run_btn = QPushButton("▶ 执行水色指数反演")
self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet('success'))
self.run_btn.clicked.connect(self.run_step)
layout.addWidget(self.run_btn)
layout.addStretch()
self.setLayout(layout)
def _find_waterindex_csv(self) -> str:
"""查找 waterindex.csv 路径"""
candidates = [
Path(__file__).parent.parent.parent / "model" / "waterindex.csv",
Path(__file__).parent.parent.parent.parent / "src" / "gui" / "model" / "waterindex.csv",
]
for c in candidates:
if c.exists():
return str(c)
return ""
def _load_formulas(self):
"""加载 waterindex.csv 中的公式"""
if not self._waterindex_csv or not Path(self._waterindex_csv).exists():
self.meta_label.setText("⚠️ waterindex.csv 未找到")
return
import csv
self._all_formulas = []
try:
with open(self._waterindex_csv, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
self._all_formulas = list(reader)
except Exception as e:
self.meta_label.setText(f"⚠️ 加载公式失败: {e}")
return
# 提取所有类别
cats = set()
for f in self._all_formulas:
c = f.get('Category', '').strip()
if c:
cats.add(c)
self._categories = sorted(cats)
self.category_combo.clear()
self.category_combo.addItem("全部")
self.category_combo.addItems(self._categories)
self._populate_list("全部")
def _populate_list(self, category: str):
"""根据类别填充公式列表"""
self.formula_list.clear()
self._formula_list_widgets.clear()
formulas_to_show = (
[f for f in self._all_formulas if f.get('Category', '') == category]
if category != "全部"
else self._all_formulas
)
for f in formulas_to_show:
name = f.get('Formula_Name', '')
formula_str = f.get('Formula', '')
cat = f.get('Category', '')
ftype = f.get('Formula_Type', '')
item = QListWidgetItem()
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Checked)
item.setData(Qt.UserRole, name)
item.setText(
f"{name} [{cat}] ({ftype})\n {formula_str}"
)
item.setToolTip(f"{name}\n{category}\n{formula_str}")
self.formula_list.addItem(item)
self._formula_list_widgets[name] = item
def _on_category_changed(self, category: str):
self._populate_list(category)
def _select_all(self):
for item in self.formula_list.selectedItems():
item.setCheckState(Qt.Checked)
# 也全选当前显示的
for i in range(self.formula_list.count()):
it = self.formula_list.item(i)
it.setCheckState(Qt.Checked)
def _deselect_all(self):
for i in range(self.formula_list.count()):
it = self.formula_list.item(i)
it.setCheckState(Qt.Unchecked)
def _on_item_changed(self, item: QListWidgetItem):
pass # 可扩展:实时统计选中数量
def _browse_bsq(self):
path, _ = QFileDialog.getOpenFileName(
self, "选择去耀斑 BSQ 影像",
"",
"BSQ Files (*.bsq);;DAT Files (*.dat);;All Files (*.*)"
)
if path:
self.bsq_file.set_path(path)
# 自动关联同路径 hdr
hdr = Path(path).with_suffix('.hdr')
if hdr.exists():
self.hdr_file.set_path(str(hdr))
self._load_metadata(path, str(hdr) if hdr.exists() else "")
def _browse_hdr(self):
path, _ = QFileDialog.getOpenFileName(
self, "选择 ENVI 头文件",
"",
"HDR Files (*.hdr);;All Files (*.*)"
)
if path:
self.hdr_file.set_path(path)
bsq_path = self.bsq_file.get_path()
if bsq_path:
self._load_metadata(bsq_path, path)
def _browse_output_dir(self):
d = QFileDialog.getExistingDirectory(self, "选择输出目录", "")
if d:
self.output_dir.set_path(d)
def _load_metadata(self, bsq_path: str, hdr_path: str):
"""加载并显示影像元数据"""
if not bsq_path or not Path(bsq_path).exists():
self.meta_label.setText("⚠️ 影像文件不存在")
return
if not hdr_path or not Path(hdr_path).exists():
self.meta_label.setText("⚠️ 头文件不存在")
return
try:
from src.core.algorithms.waterindex_inversion import WaterIndexProcessor
processor = WaterIndexProcessor(self._waterindex_csv)
meta = processor.get_image_metadata(bsq_path, hdr_path)
if meta:
self.meta_label.setText(
f"{meta['width']}×{meta['height']} | "
f"{meta['bands']} 波段 | {meta.get('wavelength_range', '未知')} | "
f"驱动: {meta['driver']}"
)
else:
self.meta_label.setText("⚠️ 无法读取元数据")
except Exception as e:
self.meta_label.setText(f"⚠️ 元数据读取失败: {e}")
def _get_selected_formula_names(self) -> List[str]:
names = []
for i in range(self.formula_list.count()):
item = self.formula_list.item(i)
if item.checkState() == Qt.Checked:
name = item.data(Qt.UserRole)
if name:
names.append(name)
return names
def _get_default_work_dir(self) -> str:
if hasattr(self, 'work_dir') and self.work_dir:
return str(self.work_dir)
mw = self.window()
if mw and hasattr(mw, 'work_dir') and mw.work_dir:
return str(mw.work_dir)
return ""
def get_config(self) -> dict:
bsq = self.bsq_file.get_path()
return {
'bsq_path': bsq,
'hdr_path': self.hdr_file.get_path(),
'deglint_img_path': bsq,
'output_dir': self.output_dir.get_path(),
'output_format': self.format_combo.currentText().split()[0],
'selected_formulas': self._get_selected_formula_names(),
}
def set_config(self, config: dict):
if config.get('bsq_path'):
self.bsq_file.set_path(config['bsq_path'])
if config.get('hdr_path'):
self.hdr_file.set_path(config['hdr_path'])
if config.get('output_dir'):
self.output_dir.set_path(config['output_dir'])
if 'selected_formulas' in config:
names = set(config['selected_formulas'])
for i in range(self.formula_list.count()):
item = self.formula_list.item(i)
name = item.data(Qt.UserRole)
item.setCheckState(Qt.Checked if name in names else Qt.Unchecked)
def update_from_config(self, work_dir=None, pipeline=None):
if work_dir:
self.work_dir = work_dir
elif hasattr(self, 'work_dir') and self.work_dir:
pass
else:
self.work_dir = None
main_window = self.window()
# 自动填入去耀斑影像
if main_window and hasattr(main_window, 'step3_panel'):
deglint_path = main_window.step3_panel.output_file.get_path()
if deglint_path and not self.bsq_file.get_path():
if not os.path.isabs(deglint_path):
deglint_path = os.path.join(self.work_dir or '', deglint_path).replace('\\', '/')
self.bsq_file.set_path(deglint_path)
hdr = Path(deglint_path).with_suffix('.hdr')
if hdr.exists():
self.hdr_file.set_path(str(hdr))
self._load_metadata(deglint_path, str(hdr))
# 自动填入输出目录
if self.work_dir:
out_dir = os.path.join(self.work_dir, "8_WaterIndex_Images").replace('\\', '/')
os.makedirs(out_dir, exist_ok=True)
if not self.output_dir.get_path():
self.output_dir.set_path(out_dir)
def run_step(self):
bsq_path = self.bsq_file.get_path().strip()
hdr_path = self.hdr_file.get_path().strip()
output_dir = self.output_dir.get_path().strip()
# 验证输入
if not bsq_path:
QMessageBox.warning(self, "输入错误", "请选择去耀斑 BSQ 影像!")
return
if not Path(bsq_path).exists():
QMessageBox.warning(self, "输入错误", f"BSQ 影像不存在:\n{bsq_path}")
return
if not hdr_path:
# 尝试自动查找
auto_hdr = Path(bsq_path).with_suffix('.hdr')
if auto_hdr.exists():
hdr_path = str(auto_hdr)
self.hdr_file.set_path(hdr_path)
else:
QMessageBox.warning(self, "输入错误", "请选择 ENVI 头文件!")
return
if not Path(hdr_path).exists():
QMessageBox.warning(self, "输入错误", f"HDR 文件不存在:\n{hdr_path}")
return
if not output_dir:
work_dir = self._get_default_work_dir()
output_dir = os.path.join(work_dir, "8_WaterIndex_Images").replace('\\', '/')
os.makedirs(output_dir, exist_ok=True)
self.output_dir.set_path(output_dir)
selected = self._get_selected_formula_names()
if not selected:
QMessageBox.warning(self, "输入错误", "请至少选择一个公式!")
return
if self._waterindex_csv and not Path(self._waterindex_csv).exists():
QMessageBox.warning(self, "配置错误", f"waterindex.csv 不存在:\n{self._waterindex_csv}")
return
# ── 自动扫描工作目录下的水域掩膜文件 ────────────────────────────
work_dir = self.work_dir or str(Path(bsq_path).parent)
mask_dir = os.path.join(work_dir, "1_water_mask")
water_mask_path: Optional[str] = None
if os.path.isdir(mask_dir):
# ★★★ glob 智能扫描:取任意 .dat 或 .tif 文件 ★★★
for pattern in ("*.dat", "*.tif", "*.TIF", "*.DT"):
candidates = sorted(Path(mask_dir).glob(pattern))
if candidates:
water_mask_path = str(candidates[0])
break
if water_mask_path:
print(f"[Step8] 自动找到水域掩膜: {water_mask_path}")
else:
print(f"[Step8] 未找到水域掩膜,跳过陆地剔除(陆地将保留在指数图中)")
# 开始后台处理
self.run_btn.setEnabled(False)
self.progress_bar.setValue(0)
self.progress_label.setText("")
self._worker = WaterIndexWorker(
bsq_path=bsq_path,
hdr_path=hdr_path,
output_dir=output_dir,
selected_formulas=selected,
waterindex_csv=self._waterindex_csv,
water_mask_path=water_mask_path,
work_dir=work_dir,
)
self._worker.progress.connect(self._on_progress)
self._worker.finished_ok.connect(self._on_finished)
self._worker.failed.connect(self._on_failed)
self._worker.log.connect(lambda m: self.progress_label.setText(m))
self._worker.start()
def _on_progress(self, msg: str, pct: float):
self.progress_bar.setValue(int(pct))
self.progress_label.setText(msg)
def _on_finished(self, results: Dict[str, str]):
self.run_btn.setEnabled(True)
n = len(results)
QMessageBox.information(
self, "执行成功",
f"水色指数反演完成!\n"
f"共生成 {n} 个指数图GeoTIFF\n\n"
f"输出目录: {self.output_dir.get_path()}"
)
main_window = self.window()
if main_window and hasattr(main_window, 'log_message'):
main_window.log_message(f"步骤8水色指数反演完成生成 {n} 个指数图", "info")
def _on_failed(self, err: str):
self.run_btn.setEnabled(True)
self.progress_bar.setValue(0)
QMessageBox.critical(self, "执行错误", f"水色指数反演失败:\n\n{err[:500]}")
def get_output_dir(self) -> str:
return self.output_dir.get_path().strip() or ""
def get_output_tif_paths(self) -> List[str]:
"""获取输出目录下的所有 GeoTIFF 文件路径"""
out_dir = self.get_output_dir()
if not out_dir or not os.path.isdir(out_dir):
return []
return sorted(
str(p) for p in Path(out_dir).glob("*.tif")
if p.is_file()
)