Files
WQ_GUI/src/gui/panels/step10_panel.py

253 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Step10 面板 - 采样点生成
"""
import os
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QGroupBox, QFormLayout,
QPushButton, QCheckBox, QSpinBox, QMessageBox,
)
from src.gui.components.custom_widgets import FileSelectWidget
from src.gui.dialogs import SamplingViewerDialog
from src.gui.styles import ModernStylesheet
class Step10Panel(QWidget):
"""步骤10采样点生成"""
def __init__(self, parent=None):
super().__init__(parent)
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
# 去耀斑影像文件(用于独立运行)
self.deglint_img_file = FileSelectWidget(
"去耀斑影像:",
"Image Files (*.bsq *.dat *.tif);;All Files (*.*)"
)
layout.addWidget(self.deglint_img_file)
# 水域掩膜文件(可选,用于独立运行)
self.water_mask_file = FileSelectWidget(
"水域掩膜:",
"Mask Files (*.dat *.tif);;All Files (*.*)"
)
self.water_mask_file.label.setText("水域掩膜:")
layout.addWidget(self.water_mask_file)
# 参数设置
params_group = QGroupBox("采样参数")
params_layout = QFormLayout()
self.interval = QSpinBox()
self.interval.setRange(10, 500)
self.interval.setValue(50)
params_layout.addRow("采样点间隔(像素):", self.interval)
self.sample_radius = QSpinBox()
self.sample_radius.setRange(1, 50)
self.sample_radius.setValue(5)
params_layout.addRow("采样半径(像素):", self.sample_radius)
self.chunk_size = QSpinBox()
self.chunk_size.setRange(100, 10000)
self.chunk_size.setValue(1000)
params_layout.addRow("处理块大小:", self.chunk_size)
self.use_adaptive_sampling = QCheckBox("启用自适应采样")
self.use_adaptive_sampling.setChecked(True)
params_layout.addRow("采样模式:", self.use_adaptive_sampling)
params_group.setLayout(params_layout)
layout.addWidget(params_group)
# 输出文件路径
self.output_file = FileSelectWidget(
"输出采样点:",
"CSV Files (*.csv);;All Files (*.*)"
)
self.output_file.line_edit.setPlaceholderText("sampling_points.csv")
layout.addWidget(self.output_file)
# 启用步骤
self.enable_checkbox = QCheckBox("启用此步骤")
self.enable_checkbox.setChecked(True)
layout.addWidget(self.enable_checkbox)
# 独立运行按钮
self.run_btn = QPushButton("独立运行此步骤")
self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet('success'))
self.run_btn.clicked.connect(self.run_step)
layout.addWidget(self.run_btn)
# 交互式预览按钮
self.preview_btn = QPushButton("📊 交互式预览采样点与光谱")
self.preview_btn.setEnabled(False)
self.preview_btn.clicked.connect(self._open_sampling_viewer)
layout.addWidget(self.preview_btn)
layout.addStretch()
self.setLayout(layout)
# 监听输出路径变化,实时更新预览按钮状态
self.output_file.line_edit.textChanged.connect(self._on_output_changed)
def get_config(self):
"""获取配置"""
config = {
'interval': self.interval.value(),
'sample_radius': self.sample_radius.value(),
'chunk_size': self.chunk_size.value(),
'use_adaptive_sampling': self.use_adaptive_sampling.isChecked(),
}
deglint_img_path = self.deglint_img_file.get_path()
if deglint_img_path:
config['deglint_img_path'] = deglint_img_path
water_mask_path = self.water_mask_file.get_path()
if water_mask_path:
config['water_mask_path'] = water_mask_path
return config
def set_config(self, config):
"""设置配置"""
if 'interval' in config:
self.interval.setValue(config['interval'])
if 'sample_radius' in config:
self.sample_radius.setValue(config['sample_radius'])
if 'chunk_size' in config:
self.chunk_size.setValue(config['chunk_size'])
if 'use_adaptive_sampling' in config:
self.use_adaptive_sampling.setChecked(config['use_adaptive_sampling'])
if 'deglint_img_path' in config:
self.deglint_img_file.set_path(config['deglint_img_path'])
if 'water_mask_path' in config:
self.water_mask_file.set_path(config['water_mask_path'])
if 'glint_mask_path' in config:
self.glint_mask_file.set_path(config['glint_mask_path'])
def update_from_config(self, work_dir=None, pipeline=None):
"""从全局配置自动填充去耀斑影像和掩膜路径
Args:
work_dir: 工作目录路径
pipeline: Pipeline 实例(用于从 step_outputs 获取绝对路径)
"""
if work_dir:
self.work_dir = work_dir
elif hasattr(self, 'work_dir') and self.work_dir:
pass
else:
self.work_dir = None
main_window = self.window()
# 1. 填充去耀斑影像路径(优先从 pipeline.step_outputs 获取绝对路径)
deglint_path = None
if pipeline and hasattr(pipeline, 'step_outputs'):
step3_outputs = getattr(pipeline, 'step_outputs', {}).get('step3', {})
deglint_path = (
step3_outputs.get('deglint_image')
or step3_outputs.get('output_path')
or step3_outputs.get('output_file')
or step3_outputs.get('deglint_img_path')
)
# 回退:从 step3 面板 widget 直接读取(可能是相对路径)
if not deglint_path and hasattr(main_window, 'step3_panel'):
deglint_path = main_window.step3_panel.output_file.get_path()
if deglint_path:
# 若为相对路径,使用 work_dir 合成为绝对路径
if not os.path.isabs(deglint_path):
deglint_path = os.path.join(self.work_dir or '', deglint_path).replace('\\', '/')
self.deglint_img_file.set_path(deglint_path)
# 2. 填充水域掩膜路径优先级pipeline.step_outputs > step1_panel > 1_water_mask > input-test
water_mask_path = None
if pipeline and hasattr(pipeline, 'step_outputs'):
step1_outputs = getattr(pipeline, 'step_outputs', {}).get('step1', {})
water_mask_path = (
step1_outputs.get('water_mask')
or step1_outputs.get('output_path')
or step1_outputs.get('output_file')
)
# 回退:从 step1 面板 widget 直接读取
if not water_mask_path and hasattr(main_window, 'step1_panel'):
water_mask_path = main_window.step1_panel.output_file.get_path()
# 备选:扫描 1_water_mask 目录下的 .dat 文件
if not water_mask_path and self.work_dir:
mask_dir = os.path.join(self.work_dir, "1_water_mask")
if os.path.isdir(mask_dir):
dat_files = [f for f in os.listdir(mask_dir) if f.lower().endswith('.dat')]
if dat_files:
water_mask_path = os.path.join(mask_dir, dat_files[0]).replace('\\', '/')
# 备选:扫描 input-test 目录(优先匹配 water_mask_from_shp.dat
if not water_mask_path and self.work_dir:
input_test_dir = os.path.join(self.work_dir, "input-test")
if os.path.isdir(input_test_dir):
dat_files = [f for f in os.listdir(input_test_dir) if f.lower().endswith('.dat')]
# 优先匹配 water_mask_from_shp.dat
for f in dat_files:
if 'water_mask_from_shp' in f.lower():
water_mask_path = os.path.join(input_test_dir, f).replace('\\', '/')
break
# 否则取第一个 .dat 文件
if not water_mask_path and dat_files:
water_mask_path = os.path.join(input_test_dir, dat_files[0]).replace('\\', '/')
if water_mask_path:
# 若为相对路径,使用 work_dir 合成为绝对路径
if not os.path.isabs(water_mask_path):
water_mask_path = os.path.join(self.work_dir or '', water_mask_path).replace('\\', '/')
self.water_mask_file.set_path(water_mask_path)
# 3. 自动填充输出路径(绝对路径)
if self.work_dir:
output_path = os.path.join(self.work_dir, "10_sampling", "sampling_spectra.csv")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
self.output_file.set_path(output_path.replace('\\', '/'))
# 4. 同步更新预览按钮状态(路径可能已自动填充)
self._check_csv_exists()
def run_step(self):
"""独立运行步骤10"""
deglint_img_path = self.deglint_img_file.get_path()
if not deglint_img_path:
QMessageBox.warning(self, "输入错误", "请选择去耀斑影像文件!")
return
main_window = self.window()
if hasattr(main_window, 'run_single_step'):
config = {'step10': self.get_config()}
main_window.run_single_step('step10', config)
def _check_csv_exists(self):
"""检查 output csv 是否存在,驱动预览按钮启停"""
csv_path = self.output_file.get_path()
enabled = bool(csv_path and os.path.isabs(csv_path) and os.path.exists(csv_path))
self.preview_btn.setEnabled(enabled)
return enabled
def _on_output_changed(self, _text=None):
"""输出路径输入框内容变化时调用_text 为 line_edit.textChanged 信号参数)"""
self._check_csv_exists()
def _open_sampling_viewer(self):
"""打开交互式采样点查看器弹窗"""
csv_path = self.output_file.get_path()
if not csv_path or not os.path.exists(csv_path):
QMessageBox.warning(
self, "文件不存在",
f"采样点 CSV 文件不存在:{csv_path}\n请先运行步骤10生成数据。"
)
return
dialog = SamplingViewerDialog(csv_path, self)
dialog.exec_()
# 弹窗关闭后再次检查状态(可能文件被覆盖等)
self._check_csv_exists()