#!/usr/bin/env python # -*- coding: utf-8 -*- """ Step7 面板 - 采样点生成 """ import os from PyQt5.QtWidgets import ( QWidget, QVBoxLayout, QGroupBox, QFormLayout, QPushButton, QCheckBox, QSpinBox, QMessageBox, ) from src.gui.components.custom_widgets import FileSelectWidget from src.gui.styles import ModernStylesheet class Step7Panel(QWidget): """步骤7:采样点生成""" def __init__(self, parent=None): super().__init__(parent) self.init_ui() def init_ui(self): layout = QVBoxLayout() # 去耀斑影像文件(用于独立运行) self.deglint_img_file = FileSelectWidget( "去耀斑影像:", "Image Files (*.bsq *.dat *.tif);;All Files (*.*)" ) layout.addWidget(self.deglint_img_file) # 水域掩膜文件(可选,用于独立运行) self.water_mask_file = FileSelectWidget( "水域掩膜:", "Mask Files (*.dat *.tif);;All Files (*.*)" ) self.water_mask_file.label.setText("水域掩膜:") layout.addWidget(self.water_mask_file) # 参数设置 params_group = QGroupBox("采样参数") params_layout = QFormLayout() self.interval = QSpinBox() self.interval.setRange(10, 500) self.interval.setValue(50) params_layout.addRow("采样点间隔(像素):", self.interval) self.sample_radius = QSpinBox() self.sample_radius.setRange(1, 50) self.sample_radius.setValue(5) params_layout.addRow("采样半径(像素):", self.sample_radius) self.chunk_size = QSpinBox() self.chunk_size.setRange(100, 10000) self.chunk_size.setValue(1000) params_layout.addRow("处理块大小:", self.chunk_size) params_group.setLayout(params_layout) layout.addWidget(params_group) # 输出文件路径 self.output_file = FileSelectWidget( "输出采样点:", "CSV Files (*.csv);;All Files (*.*)" ) self.output_file.line_edit.setPlaceholderText("sampling_points.csv") layout.addWidget(self.output_file) # 启用步骤 self.enable_checkbox = QCheckBox("启用此步骤") self.enable_checkbox.setChecked(True) layout.addWidget(self.enable_checkbox) # 独立运行按钮 self.run_btn = QPushButton("独立运行此步骤") self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet('success')) self.run_btn.clicked.connect(self.run_step) layout.addWidget(self.run_btn) layout.addStretch() self.setLayout(layout) def get_config(self): """获取配置""" config = { 'interval': self.interval.value(), 'sample_radius': self.sample_radius.value(), 'chunk_size': self.chunk_size.value(), } deglint_img_path = self.deglint_img_file.get_path() if deglint_img_path: config['deglint_img_path'] = deglint_img_path water_mask_path = self.water_mask_file.get_path() if water_mask_path: config['water_mask_path'] = water_mask_path # 注意:step7_generate_sampling_points 不接受 output_path 参数,输出路径由 pipeline 内部自动生成 return config def set_config(self, config): """设置配置""" if 'interval' in config: self.interval.setValue(config['interval']) if 'sample_radius' in config: self.sample_radius.setValue(config['sample_radius']) if 'chunk_size' in config: self.chunk_size.setValue(config['chunk_size']) if 'deglint_img_path' in config: self.deglint_img_file.set_path(config['deglint_img_path']) if 'water_mask_path' in config: self.water_mask_file.set_path(config['water_mask_path']) if 'glint_mask_path' in config: self.glint_mask_file.set_path(config['glint_mask_path']) def update_from_config(self, work_dir=None, pipeline=None): """从全局配置自动填充去耀斑影像和掩膜路径 Args: work_dir: 工作目录路径 pipeline: Pipeline 实例(用于从 step_outputs 获取绝对路径) """ if work_dir: self.work_dir = work_dir elif hasattr(self, 'work_dir') and self.work_dir: pass else: self.work_dir = None main_window = self.window() # 1. 填充去耀斑影像路径(优先从 pipeline.step_outputs 获取绝对路径) deglint_path = None if pipeline and hasattr(pipeline, 'step_outputs'): step3_outputs = getattr(pipeline, 'step_outputs', {}).get('step3', {}) deglint_path = ( step3_outputs.get('deglint_image') or step3_outputs.get('output_path') or step3_outputs.get('output_file') or step3_outputs.get('deglint_img_path') ) # 回退:从 step3 面板 widget 直接读取(可能是相对路径) if not deglint_path and hasattr(main_window, 'step3_panel'): deglint_path = main_window.step3_panel.output_file.get_path() if deglint_path: # 若为相对路径,使用 work_dir 合成为绝对路径 if not os.path.isabs(deglint_path): deglint_path = os.path.join(self.work_dir or '', deglint_path).replace('\\', '/') self.deglint_img_file.set_path(deglint_path) # 2. 填充水域掩膜路径(优先级:pipeline.step_outputs > step1_panel > 1_water_mask > input-test) water_mask_path = None if pipeline and hasattr(pipeline, 'step_outputs'): step1_outputs = getattr(pipeline, 'step_outputs', {}).get('step1', {}) water_mask_path = ( step1_outputs.get('water_mask') or step1_outputs.get('output_path') or step1_outputs.get('output_file') ) # 回退:从 step1 面板 widget 直接读取 if not water_mask_path and hasattr(main_window, 'step1_panel'): water_mask_path = main_window.step1_panel.output_file.get_path() # 备选:扫描 1_water_mask 目录下的 .dat 文件 if not water_mask_path and self.work_dir: mask_dir = os.path.join(self.work_dir, "1_water_mask") if os.path.isdir(mask_dir): dat_files = [f for f in os.listdir(mask_dir) if f.lower().endswith('.dat')] if dat_files: water_mask_path = os.path.join(mask_dir, dat_files[0]).replace('\\', '/') # 备选:扫描 input-test 目录(优先匹配 water_mask_from_shp.dat) if not water_mask_path and self.work_dir: input_test_dir = os.path.join(self.work_dir, "input-test") if os.path.isdir(input_test_dir): dat_files = [f for f in os.listdir(input_test_dir) if f.lower().endswith('.dat')] # 优先匹配 water_mask_from_shp.dat for f in dat_files: if 'water_mask_from_shp' in f.lower(): water_mask_path = os.path.join(input_test_dir, f).replace('\\', '/') break # 否则取第一个 .dat 文件 if not water_mask_path and dat_files: water_mask_path = os.path.join(input_test_dir, dat_files[0]).replace('\\', '/') if water_mask_path: # 若为相对路径,使用 work_dir 合成为绝对路径 if not os.path.isabs(water_mask_path): water_mask_path = os.path.join(self.work_dir or '', water_mask_path).replace('\\', '/') self.water_mask_file.set_path(water_mask_path) # 3. 自动填充输出路径(绝对路径) if self.work_dir: output_path = os.path.join(self.work_dir, "10_sampling", "sampling_spectra.csv") os.makedirs(os.path.dirname(output_path), exist_ok=True) self.output_file.set_path(output_path.replace('\\', '/')) def run_step(self): """独立运行步骤7""" deglint_img_path = self.deglint_img_file.get_path() if not deglint_img_path: QMessageBox.warning(self, "输入错误", "请选择去耀斑影像文件!") return main_window = self.window() if hasattr(main_window, 'run_single_step'): config = {'step7': self.get_config()} main_window.run_single_step('step7', config)