#!/usr/bin/env python # -*- coding: utf-8 -*- """ Step14 面板 - 分布图生成 """ import os import traceback from pathlib import Path from typing import List, Optional from PyQt5.QtCore import Qt, QThread, pyqtSignal from PyQt5.QtWidgets import ( QWidget, QVBoxLayout, QGroupBox, QFormLayout, QHBoxLayout, QLabel, QCheckBox, QPushButton, QLineEdit, QDoubleSpinBox, QRadioButton, QButtonGroup, QMessageBox, QFileDialog, QComboBox, QProgressBar, ) from src.gui.components.custom_widgets import FileSelectWidget from src.gui.styles import ModernStylesheet # Pipeline 可用性(与 core/worker_thread.py 保持一致) try: from src.core.water_quality_inversion_pipeline_GUI import WaterQualityInversionPipeline PIPELINE_AVAILABLE = True except ImportError: PIPELINE_AVAILABLE = False class Step14BatchThread(QThread): """专题图:按文件夹内多个预测 CSV 批量生成分布图。""" finished_ok = pyqtSignal(int) failed = pyqtSignal(str) log_message = pyqtSignal(str, str) progress = pyqtSignal(int, int) # (current, total) def __init__(self, work_dir: str, csv_paths: List[str], step14_kwargs: dict, output_dir_optional: Optional[str]): super().__init__() self.work_dir = work_dir self.csv_paths = csv_paths self.step14_kwargs = step14_kwargs self.output_dir_optional = (output_dir_optional or "").strip() or None def run(self): mpl_prev = None try: import matplotlib mpl_prev = matplotlib.get_backend() except Exception: pass try: import matplotlib.pyplot as plt plt.switch_backend("Agg") except Exception: mpl_prev = None try: from src.core.water_quality_inversion_pipeline_GUI import WaterQualityInversionPipeline pipeline = WaterQualityInversionPipeline(work_dir=self.work_dir) n = len(self.csv_paths) for i, csv_p in enumerate(self.csv_paths): self.progress.emit(i + 1, n) self.log_message.emit(f"专题图 [{i + 1}/{n}] {csv_p}", "info") kw = {**self.step14_kwargs, "prediction_csv_path": csv_p, "skip_dependency_check": True} if self.output_dir_optional: stem = Path(csv_p).stem kw["output_image_path"] = str(Path(self.output_dir_optional) / f"{stem}_distribution.png") else: kw["output_image_path"] = None pipeline.step10_map(**kw) self.finished_ok.emit(n) except Exception as e: self.failed.emit(f"{e}\n{traceback.format_exc()}") finally: if mpl_prev: try: import matplotlib.pyplot as plt plt.switch_backend(mpl_prev) except Exception: pass class Step14GeoTIFFBatchThread(QThread): """GeoTIFF 批量渲染:遍历文件夹下所有 .tif/.bsq 逐一渲染成分布图 PNG。""" finished_ok = pyqtSignal(int) failed = pyqtSignal(str) log_message = pyqtSignal(str, str) progress = pyqtSignal(int, int) # (current, total) def __init__( self, tif_paths: List[str], output_dir: str, boundary_shp_path: Optional[str], input_crs: str, output_crs: str, ): super().__init__() self.tif_paths = tif_paths self.output_dir = output_dir self.boundary_shp_path = boundary_shp_path self.input_crs = input_crs self.output_crs = output_crs def run(self): mpl_prev = None try: import matplotlib mpl_prev = matplotlib.get_backend() except Exception: pass try: import matplotlib.pyplot as plt plt.switch_backend("Agg") except Exception: mpl_prev = None try: from src.postprocessing.map import ContentMapper mapper = ContentMapper() n = len(self.tif_paths) for i, tif_path in enumerate(self.tif_paths): self.progress.emit(i + 1, n) tif_stem = Path(tif_path).stem chinese_name = mapper._get_chinese_title(tif_stem) output_png = str(Path(self.output_dir) / f"{chinese_name}_专题图.png") self.log_message.emit(f"GeoTIFF 渲染 [{i + 1}/{n}] {tif_stem}", "info") try: mapper.visualize_raster( raster_tif_path=tif_path, output_file=output_png, boundary_shp_path=self.boundary_shp_path, nodata_value=-9999.0, figsize=(14, 10), alpha=0.9, ) except Exception as vis_err: self.log_message.emit(f" ⚠️ 渲染失败,跳过: {vis_err}", "warning") continue self.finished_ok.emit(n) except Exception as e: self.failed.emit(f"{e}\n{traceback.format_exc()}") finally: if mpl_prev: try: import matplotlib.pyplot as plt plt.switch_backend(mpl_prev) except Exception: pass class Step14Panel(QWidget): """步骤14:分布图生成""" def __init__(self, parent=None): super().__init__(parent) self._batch_thread = None self.init_ui() def init_ui(self): layout = QVBoxLayout() hint = QLabel( "独立运行:可选「单个 CSV」或「文件夹批量」(扫描目录下所有 .csv)。" "GeoTIFF 栅格模式下,亦支持批量渲染步骤8输出的所有水色指数 GeoTIFF 文件。" ) hint.setWordWrap(True) hint.setStyleSheet( f"color: {ModernStylesheet.COLORS.get('text_secondary', '#666')};" ) layout.addWidget(hint) mode_row = QHBoxLayout() self.mode_single_rb = QRadioButton("单个 CSV 文件") self.mode_folder_rb = QRadioButton("文件夹批量") self._mode_group = QButtonGroup(self) self._mode_group.addButton(self.mode_single_rb, 0) self._mode_group.addButton(self.mode_folder_rb, 1) mode_row.addWidget(self.mode_single_rb) mode_row.addWidget(self.mode_folder_rb) mode_row.addStretch() layout.addLayout(mode_row) # ---------- 渲染模式选择器(CSV vs GeoTIFF) ---------- render_row = QHBoxLayout() render_row.addWidget(QLabel("渲染模式:")) self.render_mode_combo = QComboBox() self.render_mode_combo.addItems(["CSV 插值模式", "GeoTIFF 栅格模式"]) self.render_mode_combo.setMinimumWidth(180) self.render_mode_combo.currentTextChanged.connect(self._toggle_input_mode) render_row.addWidget(self.render_mode_combo) render_row.addStretch() layout.addLayout(render_row) # ---------- RadioButton 美化样式(选中状态为方形实心块,贴合主界面风格) ---------- radio_style = """ QRadioButton { font-size: 14px; spacing: 8px; color: #333333; } QRadioButton::indicator { width: 16px; height: 16px; border: 2px solid #999999; border-radius: 3px; background-color: white; } QRadioButton::indicator:checked { border: 2px solid #0078d4; background-color: #0078d4; image: none; } QRadioButton::indicator:hover { border: 2px solid #005a9e; } """ self.mode_single_rb.setStyleSheet(radio_style) self.mode_folder_rb.setStyleSheet(radio_style) self.prediction_csv_file = FileSelectWidget( "预测结果CSV:", "CSV Files (*.csv);;All Files (*.*)" ) layout.addWidget(self.prediction_csv_file) folder_row = QHBoxLayout() self.prediction_csv_dir_label = QLabel("预测CSV目录:") self.prediction_csv_dir_label.setMinimumWidth(120) self.prediction_csv_dir_edit = QLineEdit() self.prediction_csv_dir_edit.setPlaceholderText("选择含多个预测结果 CSV 的文件夹…") pred_dir_btn = QPushButton("浏览…") pred_dir_btn.setMaximumWidth(80) pred_dir_btn.clicked.connect(self.browse_prediction_csv_dir) folder_row.addWidget(self.prediction_csv_dir_label) folder_row.addWidget(self.prediction_csv_dir_edit, 1) folder_row.addWidget(pred_dir_btn) self._folder_row_widget = QWidget() self._folder_row_widget.setLayout(folder_row) layout.addWidget(self._folder_row_widget) # ---------- GeoTIFF 栅格文件选择器 ---------- self.geotiff_file = FileSelectWidget( "水色指数 GeoTIFF:", "GeoTIFF Files (*.tif);;All Files (*.*)" ) self.geotiff_file.line_edit.setPlaceholderText("选择步骤8输出的水色指数 GeoTIFF 文件…") self.geotiff_file.setVisible(False) layout.addWidget(self.geotiff_file) # ---------- GeoTIFF 文件夹批量选择器(GeoTIFF + 文件夹模式时显示) ---------- geotiff_dir_row = QHBoxLayout() self.geotiff_dir_label = QLabel("水色指数目录:") self.geotiff_dir_label.setMinimumWidth(120) self.geotiff_dir_edit = QLineEdit() self.geotiff_dir_edit.setPlaceholderText("选择 10_WaterIndex_Images 文件夹(批量渲染)…") geotiff_dir_btn = QPushButton("浏览…") geotiff_dir_btn.setMaximumWidth(80) geotiff_dir_btn.clicked.connect(self.browse_geotiff_dir) geotiff_dir_row.addWidget(self.geotiff_dir_label) geotiff_dir_row.addWidget(self.geotiff_dir_edit, 1) geotiff_dir_row.addWidget(geotiff_dir_btn) self._geotiff_dir_widget = QWidget() self._geotiff_dir_widget.setLayout(geotiff_dir_row) self._geotiff_dir_widget.setVisible(False) layout.addWidget(self._geotiff_dir_widget) self.recursive_csv_cb = QCheckBox("包含子文件夹(递归扫描 *.csv)") layout.addWidget(self.recursive_csv_cb) self.boundary_file = FileSelectWidget( "边界文件:", "Shapefiles (*.shp);;All Files (*.*)" ) layout.addWidget(self.boundary_file) # 参数设置 params_group = QGroupBox("生成参数") params_layout = QFormLayout() self.resolution = QDoubleSpinBox() self.resolution.setRange(1, 1000) self.resolution.setValue(30) params_layout.addRow("分辨率(米):", self.resolution) self.input_crs = QLineEdit() self.input_crs.setText("EPSG:32651") params_layout.addRow("输入坐标系:", self.input_crs) self.output_crs = QLineEdit() self.output_crs.setText("EPSG:4326") params_layout.addRow("输出坐标系:", self.output_crs) self.show_points = QCheckBox("显示采样点") params_layout.addRow("", self.show_points) self.use_diffusion = QCheckBox("启用距离扩散") self.use_diffusion.setChecked(True) params_layout.addRow("", self.use_diffusion) params_group.setLayout(params_layout) layout.addWidget(params_group) # 输出目录 self.output_dir = FileSelectWidget( "输出分布图目录:", "Directories;;All Files (*.*)" ) self.output_dir.line_edit.setPlaceholderText("留空→工作目录/14_visualization") self.output_dir.browse_btn.clicked.disconnect() self.output_dir.browse_btn.clicked.connect(self.browse_output_dir) layout.addWidget(self.output_dir) # 启用步骤 self.enable_checkbox = QCheckBox("启用此步骤") self.enable_checkbox.setChecked(True) layout.addWidget(self.enable_checkbox) # 独立运行按钮 self.run_button = QPushButton("独立运行此步骤") self.run_button.setStyleSheet(ModernStylesheet.get_button_stylesheet('success')) self.run_button.clicked.connect(self.run_step) layout.addWidget(self.run_button) # 批量渲染进度条 self.progress_bar = QProgressBar() self.progress_bar.setVisible(False) self.progress_bar.setMinimum(0) self.progress_bar.setMaximum(100) self.progress_bar.setValue(0) layout.addWidget(self.progress_bar) layout.addStretch() self.setLayout(layout) # 信号绑定与初始状态 self.mode_single_rb.toggled.connect(self._toggle_input_mode) self.mode_folder_rb.toggled.connect(self._toggle_input_mode) self.mode_single_rb.setChecked(True) # 默认选中"单个 CSV" self._toggle_input_mode() # 根据默认值设置初始显示状态 def _toggle_input_mode(self): """槽函数:根据渲染模式和输入模式动态显示/隐藏对应的输入组件。""" geotiff_mode = self.render_mode_combo.currentText() == "GeoTIFF 栅格模式" folder_mode = self.mode_folder_rb.isChecked() # CSV 插值模式 if not geotiff_mode: self.prediction_csv_file.setVisible(not folder_mode) self._folder_row_widget.setVisible(folder_mode) self.recursive_csv_cb.setVisible(folder_mode) self.geotiff_file.setVisible(False) self._geotiff_dir_widget.setVisible(False) # GeoTIFF 栅格模式 else: self.prediction_csv_file.setVisible(False) self._folder_row_widget.setVisible(False) self.recursive_csv_cb.setVisible(False) # GeoTIFF + 文件夹批量 → 显示文件夹选择器;否则 → 显示单文件选择器 self.geotiff_file.setVisible(not folder_mode) self._geotiff_dir_widget.setVisible(folder_mode) def _get_default_work_dir(self): """获取 work_dir,优先用 panel 自身缓存的,否则尝试从主窗口取""" if hasattr(self, 'work_dir') and self.work_dir: return str(self.work_dir) mw = self.window() if mw and hasattr(mw, 'work_dir') and mw.work_dir: return str(mw.work_dir) return "" def browse_prediction_csv_dir(self): default = self._get_default_work_dir() if default: default = os.path.join(default, "11_12_13_predictions") d = QFileDialog.getExistingDirectory(self, "选择预测结果 CSV 所在文件夹", default) if d: self.prediction_csv_dir_edit.setText(d) def _collect_csv_paths_from_folder(self) -> List[str]: folder = (self.prediction_csv_dir_edit.text() or "").strip() if not folder or not os.path.isdir(folder): return [] root = Path(folder) if self.recursive_csv_cb.isChecked(): files = sorted(root.rglob("*.csv")) else: files = sorted(root.glob("*.csv")) return [str(p) for p in files if p.is_file()] def browse_geotiff_dir(self): """浏览 GeoTIFF 文件夹(批量模式)""" default = self._get_default_work_dir() if default: default = os.path.join(default, "10_WaterIndex_Images") d = QFileDialog.getExistingDirectory( self, "选择水色指数 GeoTIFF 文件夹", default ) if d: self.geotiff_dir_edit.setText(d) def _collect_tif_paths_from_folder(self) -> List[str]: """扫描所选文件夹,收集所有 .tif 和 .bsq 文件路径""" folder = (self.geotiff_dir_edit.text() or "").strip() if not folder or not os.path.isdir(folder): return [] root = Path(folder) tif_files = sorted(root.glob("*.tif")) bsq_files = sorted(root.glob("*.bsq")) return [str(p) for p in tif_files + bsq_files if p.is_file()] def _step14_base_pipeline_kwargs(self) -> dict: return { 'boundary_shp_path': self.boundary_file.get_path(), 'resolution': self.resolution.value(), 'input_crs': self.input_crs.text(), 'output_crs': self.output_crs.text(), 'show_sample_points': self.show_points.isChecked(), 'use_distance_diffusion': self.use_diffusion.isChecked(), } def get_config(self): pred_csv = (self.prediction_csv_file.get_path() or "").strip() folder_mode = self.mode_folder_rb.isChecked() pred_dir = (self.prediction_csv_dir_edit.text() or "").strip() geotiff_path = (self.geotiff_file.get_path() or "").strip() config = { 'step14_batch_mode': 'folder' if folder_mode else 'single', 'render_mode': self.render_mode_combo.currentText(), 'prediction_csv_dir': pred_dir if pred_dir else None, 'recursive_csv_scan': self.recursive_csv_cb.isChecked(), 'prediction_csv_path': None if folder_mode else (pred_csv if pred_csv else None), 'geotiff_path': geotiff_path if geotiff_path else None, 'geotiff_dir': (self.geotiff_dir_edit.text() or "").strip() or None, 'boundary_shp_path': self.boundary_file.get_path(), 'resolution': self.resolution.value(), 'input_crs': self.input_crs.text(), 'output_crs': self.output_crs.text(), 'show_sample_points': self.show_points.isChecked(), 'use_distance_diffusion': self.use_diffusion.isChecked(), } out_dir = (self.output_dir.get_path() or "").strip() if not folder_mode and pred_csv and out_dir: stem = Path(pred_csv).stem config['output_image_path'] = str(Path(out_dir) / f"{stem}_distribution.png") else: config['output_image_path'] = None return config def set_config(self, config): mode = config.get('step14_batch_mode', 'single') if mode == 'folder': self.mode_folder_rb.setChecked(True) else: self.mode_single_rb.setChecked(True) render_mode = config.get('render_mode', 'CSV 插值模式') idx = self.render_mode_combo.findText(render_mode) if idx >= 0: self.render_mode_combo.setCurrentIndex(idx) if config.get('prediction_csv_dir'): self.prediction_csv_dir_edit.setText(str(config['prediction_csv_dir'])) if 'recursive_csv_scan' in config: self.recursive_csv_cb.setChecked(bool(config['recursive_csv_scan'])) if 'prediction_csv_path' in config and config['prediction_csv_path']: self.prediction_csv_file.set_path(str(config['prediction_csv_path'])) if 'geotiff_path' in config and config['geotiff_path']: self.geotiff_file.set_path(str(config['geotiff_path'])) if 'geotiff_dir' in config and config['geotiff_dir']: self.geotiff_dir_edit.setText(str(config['geotiff_dir'])) if 'boundary_shp_path' in config: self.boundary_file.set_path(config['boundary_shp_path']) if 'resolution' in config: self.resolution.setValue(config['resolution']) if 'input_crs' in config: self.input_crs.setText(config['input_crs']) if 'output_crs' in config: self.output_crs.setText(config['output_crs']) if 'show_sample_points' in config: self.show_points.setChecked(config['show_sample_points']) if 'use_distance_diffusion' in config: self.use_diffusion.setChecked(config['use_distance_diffusion']) if 'output_dir' in config and config['output_dir']: self.output_dir.set_path(str(config['output_dir'])) elif config.get('output_image_path'): p = Path(str(config['output_image_path'])) if p.parent and str(p.parent) != '.': self.output_dir.set_path(str(p.parent)) def update_from_config(self, work_dir=None, pipeline=None): """从全局配置自动填充预测结果目录 优先使用 Step8(机器学习预测)的输出目录作为待预测 CSV 目录; 其次回退到 Step8.5(回归预测)或 Step8.75(自定义回归预测)的输出目录。 Args: work_dir: 工作目录路径 pipeline: Pipeline 实例(未使用,保留接口兼容性) """ try: import traceback if work_dir: self.work_dir = work_dir elif hasattr(self, 'work_dir') and self.work_dir: pass else: self.work_dir = None main_window = self.window() if not main_window: return # 1. 尝试从 Step8 界面读取机器学习预测输出目录(最优先) pred_dir = None if hasattr(main_window, 'step11_prediction_panel'): step8_widget = getattr(main_window.step11_prediction_panel, 'output_file', None) step10_output = "" if hasattr(step8_widget, 'get_path'): step10_output = step8_widget.get_path() or "" elif hasattr(step8_widget, 'text'): step10_output = step8_widget.text() or "" if step10_output: # 若为相对路径,使用 work_dir 合成为绝对路径 if not os.path.isabs(step10_output): step10_output = os.path.join(self.work_dir or '', step10_output).replace('\\', '/') # 提取父目录后追加 Machine_Learning_Prediction(最底层真实子目录) base_pred_dir = str(Path(step10_output).parent) ml_pred_dir = Path(base_pred_dir) / "Machine_Learning_Prediction" pred_dir = str(ml_pred_dir) if ml_pred_dir.exists() else base_pred_dir # 2. 备选:从 Step11 界面读取非经验预测输出目录 if not pred_dir and hasattr(main_window, 'step11_panel'): step8_5_widget = getattr(main_window.step11_panel, 'output_file', None) step8_5_output = "" if hasattr(step8_5_widget, 'get_path'): step8_5_output = step8_5_widget.get_path() or "" elif hasattr(step8_5_widget, 'text'): step8_5_output = step8_5_widget.text() or "" if step8_5_output: # 若为相对路径,使用 work_dir 合成为绝对路径 if not os.path.isabs(step8_5_output): step8_5_output = os.path.join(self.work_dir or '', step8_5_output).replace('\\', '/') pred_dir = str(Path(step8_5_output).parent) # 3. 备选:从 Step12 界面读取自定义回归预测输出目录 if not pred_dir and hasattr(main_window, 'step12_panel'): step8_75_widget = getattr(main_window.step12_panel, 'output_dir_widget', None) step8_75_output = "" if hasattr(step8_75_widget, 'get_path'): step8_75_output = step8_75_widget.get_path() or "" elif hasattr(step8_75_widget, 'text'): step8_75_output = step8_75_widget.text() or "" if step8_75_output: pred_dir = step8_75_output # 自动填入"预测CSV目录"(文件夹批量模式) if pred_dir: existing_dir = (self.prediction_csv_dir_edit.text() or "").strip() if not existing_dir: self.prediction_csv_dir_edit.setText(pred_dir) # 切换到文件夹批量模式 self.mode_folder_rb.setChecked(True) # 4. 自动填充输出目录(14_visualization) if self.work_dir: output_dir = os.path.join(self.work_dir, "14_visualization") os.makedirs(output_dir, exist_ok=True) existing_out = self.output_dir.get_path() if not existing_out or not existing_out.strip(): self.output_dir.set_path(output_dir) # 5. 自动探测原始矢量边界文件(.shp)作为专题图底图 # 优先回溯 input-test/roi.shp,geopandas.read_file 仅支持矢量格式 if self.work_dir: possible_shp = None candidates = [ Path(self.work_dir).parent / "input-test" / "roi.shp", Path(self.work_dir) / "roi.shp", Path(self.work_dir).parent / "roi.shp", ] for candidate in candidates: if candidate.exists() and candidate.suffix.lower() == ".shp": possible_shp = candidate break existing_boundary = (self.boundary_file.get_path() or "").strip() if not existing_boundary and possible_shp: self.boundary_file.set_path(str(possible_shp)) elif not existing_boundary: self.boundary_file.set_path("") print("⚠️ 提示:专题图生成模块需传入标准矢量边界文件 (.shp),请手动选择。") # 6. 自动探测 Step 8 输出的水色指数 GeoTIFF(GeoTIFF 渲染模式) step10_out_dir = Path(self.work_dir) / "10_WaterIndex_Images" if self.work_dir else None if step10_out_dir and step10_out_dir.is_dir(): # GeoTIFF 批量模式:填充目录供批量渲染 if not (self.geotiff_dir_edit.text() or "").strip(): self.geotiff_dir_edit.setText(str(step10_out_dir)) # GeoTIFF 单文件模式:默认选中第一个 tif_files = sorted(step10_out_dir.glob("*.tif")) if tif_files and not (self.geotiff_file.get_path() or "").strip(): self.geotiff_file.set_path(str(tif_files[0])) except Exception as e: import traceback print(f"【{self.__class__.__name__}】自动填充失败,跳过: {e}") traceback.print_exc() def browse_output_dir(self): """浏览输出目录""" default = self._get_default_work_dir() if default: default = os.path.join(default, "14_visualization") dir_path = QFileDialog.getExistingDirectory(self, "选择输出分布图目录", default) if dir_path: self.output_dir.set_path(dir_path) def _start_batch_run(self, csv_list, work_dir, base_kw, out_dir_opt, parent): """封装 CSV 批量启动逻辑,统一处理信号连接和进度条""" self.run_button.setEnabled(False) self.progress_bar.setVisible(True) self.progress_bar.setValue(0) self._batch_thread = Step14BatchThread(work_dir, csv_list, base_kw, out_dir_opt) main_win = parent def _batch_log(msg, lvl): if hasattr(main_win, "log_message"): main_win.log_message(msg, lvl) def _on_progress(cur, total): if total > 0: self.progress_bar.setMaximum(total) self.progress_bar.setValue(cur) self.progress_bar.setFormat(f"{cur}/{total} 张 (%p%)") self._batch_thread.log_message.connect(_batch_log, Qt.QueuedConnection) self._batch_thread.progress.connect(_on_progress, Qt.QueuedConnection) self._batch_thread.finished_ok.connect(self._on_step14_batch_ok, Qt.QueuedConnection) self._batch_thread.failed.connect(self._on_step14_batch_fail, Qt.QueuedConnection) self._batch_thread.finished.connect( lambda: (self.run_button.setEnabled(True), self.progress_bar.setVisible(False)), Qt.QueuedConnection, ) self._batch_thread.start() if hasattr(parent, "log_message"): parent.log_message(f"专题图批量:共 {len(csv_list)} 个 CSV,工作目录 {work_dir}", "info") def run_step(self): """独立运行步骤14""" if self._batch_thread and self._batch_thread.isRunning(): QMessageBox.information(self, "提示", "批量任务正在运行,请稍候。") return boundary_shp_path = self.boundary_file.get_path() if not boundary_shp_path: QMessageBox.warning(self, "输入验证失败", "请选择边界文件") return if not os.path.exists(boundary_shp_path): QMessageBox.warning(self, "输入验证失败", "边界文件不存在") return parent = self.parent() while parent and not hasattr(parent, 'run_single_step'): parent = parent.parent() if not parent or not hasattr(parent, 'run_single_step'): QMessageBox.critical(self, "错误", "无法找到父级GUI对象") return if self.mode_folder_rb.isChecked(): # -------- CSV 插值批量 -------- if self.render_mode_combo.currentText() != "GeoTIFF 栅格模式": csv_list = self._collect_csv_paths_from_folder() if not csv_list: QMessageBox.warning( self, "输入验证失败", "所选文件夹中未找到 .csv 文件,或目录无效。\n" "可勾选「包含子文件夹」以递归扫描。", ) return if not PIPELINE_AVAILABLE: QMessageBox.critical(self, "错误", "Pipeline 模块不可用,无法批量生成专题图。") return work_dir = getattr(parent, "work_dir", None) or "./work_dir" work_dir = str(work_dir) base_kw = self._step14_base_pipeline_kwargs() out_dir_opt = (self.output_dir.get_path() or "").strip() or None self._start_batch_run(csv_list, work_dir, base_kw, out_dir_opt, parent) return # -------- GeoTIFF 栅格批量 -------- tif_list = self._collect_tif_paths_from_folder() if not tif_list: QMessageBox.warning( self, "输入验证失败", "所选文件夹中未找到 .tif / .bsq 文件,\n" "请确认目录包含步骤8输出的水色指数 GeoTIFF 文件。", ) return out_dir = (self.output_dir.get_path() or "").strip() if not out_dir: out_dir = os.path.join(self._get_default_work_dir(), "14_visualization") os.makedirs(out_dir, exist_ok=True) self.run_button.setEnabled(False) self.progress_bar.setVisible(True) self.progress_bar.setValue(0) self._batch_thread = Step14GeoTIFFBatchThread( tif_paths=tif_list, output_dir=out_dir, boundary_shp_path=boundary_shp_path, input_crs=self.input_crs.text(), output_crs=self.output_crs.text(), ) main_win = parent def _batch_log(msg, lvl): if hasattr(main_win, "log_message"): main_win.log_message(msg, lvl) def _on_progress(cur, total): if total > 0: pct = int(cur / total * 100) self.progress_bar.setMaximum(total) self.progress_bar.setValue(cur) self.progress_bar.setFormat(f"{cur}/{total} 张 (%p%)") self._batch_thread.log_message.connect(_batch_log, Qt.QueuedConnection) self._batch_thread.progress.connect(_on_progress, Qt.QueuedConnection) self._batch_thread.finished_ok.connect(self._on_step14_batch_ok, Qt.QueuedConnection) self._batch_thread.failed.connect(self._on_step14_batch_fail, Qt.QueuedConnection) self._batch_thread.finished.connect( lambda: (self.run_button.setEnabled(True), self.progress_bar.setVisible(False)), Qt.QueuedConnection, ) self._batch_thread.start() if hasattr(parent, "log_message"): parent.log_message(f"GeoTIFF 批量渲染:共 {len(tif_list)} 个文件 → {out_dir}", "info") return # -------- GeoTIFF 栅格单文件模式 -------- if self.render_mode_combo.currentText() == "GeoTIFF 栅格模式": geotiff_path = (self.geotiff_file.get_path() or "").strip() if not geotiff_path: QMessageBox.warning(self, "输入验证失败", "请选择水色指数 GeoTIFF 文件") return if not os.path.isfile(geotiff_path): QMessageBox.warning(self, "输入验证失败", f"GeoTIFF 文件不存在:\n{geotiff_path}") return boundary_shp_path = self.boundary_file.get_path() input_crs = self.input_crs.text() output_crs = self.output_crs.text() # 构造输出路径 out_dir = (self.output_dir.get_path() or "").strip() if not out_dir: out_dir = os.path.join(self._get_default_work_dir(), "14_visualization") os.makedirs(out_dir, exist_ok=True) tif_stem = Path(geotiff_path).stem chinese_name = mapper._get_chinese_title(tif_stem) output_png = os.path.join(out_dir, f"{chinese_name}_专题图.png") self.run_button.setEnabled(False) try: from src.postprocessing.map import ContentMapper mapper = ContentMapper() result_path = mapper.visualize_raster( raster_tif_path=geotiff_path, output_file=output_png, boundary_shp_path=boundary_shp_path if boundary_shp_path else None, nodata_value=-9999.0, figsize=(14, 10), alpha=0.9, ) self.run_button.setEnabled(True) QMessageBox.information( self, "完成", f"GeoTIFF 栅格渲染完成!\n{result_path}" ) if hasattr(parent, "log_message"): parent.log_message(f"Step14 GeoTIFF 渲染完成 → {result_path}", "info") except Exception as e: self.run_button.setEnabled(True) QMessageBox.critical(self, "渲染失败", f"{e}\n{traceback.format_exc()[:500]}") if hasattr(parent, "log_message"): parent.log_message(str(e), "error") return prediction_csv_path = (self.prediction_csv_file.get_path() or "").strip() if not prediction_csv_path: QMessageBox.warning( self, "输入验证失败", "请选择「预测结果 CSV」文件,或切换到「文件夹批量」。", ) return if not os.path.isfile(prediction_csv_path): QMessageBox.warning(self, "输入验证失败", "预测结果 CSV 不存在或不是文件") return config = self.get_config() parent.run_single_step('step14', {'step14': config}) def _on_step14_batch_ok(self, n: int): self.progress_bar.setVisible(False) QMessageBox.information(self, "完成", f"已批量生成 {n} 个分布图。") parent = self.parent() while parent and not hasattr(parent, "log_message"): parent = parent.parent() if parent and hasattr(parent, "log_message"): parent.log_message(f"专题图批量完成,共 {n} 个文件。", "info") def _on_step14_batch_fail(self, err: str): self.progress_bar.setVisible(False) QMessageBox.critical(self, "失败", f"批量生成中断:\n{err[:900]}") parent = self.parent() while parent and not hasattr(parent, "log_message"): parent = parent.parent() if parent and hasattr(parent, "log_message"): parent.log_message(err, "error")