import numpy as np import pandas as pd import joblib import os from pathlib import Path from typing import List, Dict, Union, Tuple, Optional import warnings warnings.filterwarnings('ignore') # 导入预处理模块 - 动态添加路径支持 import sys import os from src.preprocessing.spectral_Preprocessing import Preprocessing # try: # from modeling import WaterQualityModeling # except ImportError: # from src.core.modeling.modeling_batch import WaterQualityModeling # 机器学习相关导入 from sklearn.model_selection import train_test_split class WaterQualityInference: """水质参数反演推理类""" def __init__(self, artifacts_dir: str = "models/artifacts"): """ 初始化推理类 Args: artifacts_dir: 模型保存目录 """ self.artifacts_dir = Path(artifacts_dir) if not self.artifacts_dir.exists(): print(f"警告: 模型目录不存在: {artifacts_dir},将在需要时创建") self.best_model_info = None self.loaded_model_data = None def load_sampling_data(self, csv_path: str) -> Tuple[pd.DataFrame, pd.DataFrame]: """ 加载sampling生成的CSV数据 Args: csv_path: CSV文件路径,前两列为经纬度,其余列为光谱数据 Returns: coords: 经纬度数据 (DataFrame) spectra: 光谱数据 (DataFrame) """ print(f"正在加载采样数据: {csv_path}") if not os.path.exists(csv_path): raise FileNotFoundError(f"采样数据文件不存在: {csv_path}") # 读取CSV文件 data = pd.read_csv(csv_path) print(f"采样数据加载完成:") print(f" 数据形状: {data.shape}") print(f" 列名: {list(data.columns[:5])}...") # 只显示前5列 # 检查数据列数 if data.shape[1] < 4: raise ValueError(f"数据列数不足,期望至少4列(经度、纬度、其他列、光谱数据),实际得到{data.shape[1]}列") # 前两列为经纬度 coords = data.iloc[:, :2].copy() coords.columns = ['longitude', 'latitude'] # 从第5列开始为光谱数据(跳过第2、3、4列的其他信息) spectra = data.iloc[:, 4:].copy() print(f" 经纬度数据形状: {coords.shape}") print(f" 光谱数据形状: {spectra.shape}") print(f" 经纬度范围: 经度[{coords['longitude'].min():.6f}, {coords['longitude'].max():.6f}], " f"纬度[{coords['latitude'].min():.6f}, {coords['latitude'].max():.6f}]") return coords, spectra def random(self, data, label, test_ratio=0.2, random_state=123): """ 随机划分数据集 Args: data: shape (n_samples, n_features) label: shape (n_sample, ) test_ratio: 测试集比例,默认: 0.2 random_state: 随机种子,默认: 123 Returns: X_train: (n_samples, n_features) X_test: (n_samples, n_features) y_train: (n_sample, ) y_test: (n_sample, ) """ X_train, X_test, y_train, y_test = train_test_split( data, label, test_size=test_ratio, random_state=random_state ) return X_train, X_test, y_train, y_test def spxy(self, data, label, test_size=0.2): """ SPXY算法划分数据集(考虑X和Y空间的距离) Args: data: shape (n_samples, n_features) label: shape (n_samples, ) test_size: 测试集比例,默认: 0.2 Returns: X_train: (n_samples, n_features) X_test: (n_samples, n_features) y_train: (n_samples, ) y_test: (n_samples, ) """ # 确保 data 和 label 是 NumPy 数组 data = data.to_numpy() if isinstance(data, pd.DataFrame) else data label = label.to_numpy() if isinstance(label, pd.Series) else label # 备份原始数据和标签 x_backup = data y_backup = label M = data.shape[0] N = round((1 - test_size) * M) samples = np.arange(M) # 归一化标签数据 label = (label - np.mean(label)) / np.std(label) D = np.zeros((M, M)) Dy = np.zeros((M, M)) # 计算样本之间的距离 for i in range(M - 1): xa = data[i, :] ya = label[i] for j in range((i + 1), M): xb = data[j, :] yb = label[j] D[i, j] = np.linalg.norm(xa - xb) Dy[i, j] = np.linalg.norm(ya - yb) # 距离归一化 Dmax = np.max(D) Dymax = np.max(Dy) D = D / Dmax + Dy / Dymax # 找到最远的两个点 maxD = D.max(axis=0) index_row = D.argmax(axis=0) index_column = maxD.argmax() m = np.zeros(N, dtype=int) m[0] = index_row[index_column] m[1] = index_column dminmax = np.zeros(N) dminmax[1] = D[m[0], m[1]] # 根据距离选择训练集 for i in range(2, N): pool = np.delete(samples, m[:i]) dmin = np.zeros(M - i) for j in range(M - i): indexa = pool[j] d = np.zeros(i) for k in range(i): indexb = m[k] if indexa < indexb: d[k] = D[indexa, indexb] else: d[k] = D[indexb, indexa] dmin[j] = np.min(d) dminmax[i] = np.max(dmin) index = np.argmax(dmin) m[i] = pool[index] m_complement = np.delete(samples, m) # 划分训练集和测试集 X_train = data[m, :] y_train = y_backup[m] X_test = data[m_complement, :] y_test = y_backup[m_complement] return X_train, X_test, y_train, y_test def ks(self, data, label, test_size=0.2): """ Kennard-Stone算法划分数据集 Args: data: shape (n_samples, n_features) label: shape (n_sample, ) test_size: 测试集比例,默认: 0.2 Returns: X_train: (n_samples, n_features) X_test: (n_samples, n_features) y_train: (n_samples, ) y_test: (n_samples, ) """ # 确保 data 和 label 是 NumPy 数组 data = data.to_numpy() if isinstance(data, pd.DataFrame) else data label = label.to_numpy() if isinstance(label, pd.Series) else label M = data.shape[0] N = round((1 - test_size) * M) samples = np.arange(M) D = np.zeros((M, M)) for i in range((M - 1)): xa = data[i, :] for j in range((i + 1), M): xb = data[j, :] D[i, j] = np.linalg.norm(xa - xb) maxD = np.max(D, axis=0) index_row = np.argmax(D, axis=0) index_column = np.argmax(maxD) m = np.zeros(N) m[0] = np.array(index_row[index_column]) m[1] = np.array(index_column) m = m.astype(int) dminmax = np.zeros(N) dminmax[1] = D[m[0], m[1]] for i in range(2, N): pool = np.delete(samples, m[:i]) dmin = np.zeros((M - i)) for j in range((M - i)): indexa = pool[j] d = np.zeros(i) for k in range(i): indexb = m[k] if indexa < indexb: d[k] = D[indexa, indexb] else: d[k] = D[indexb, indexa] dmin[j] = np.min(d) dminmax[i] = np.max(dmin) index = np.argmax(dmin) m[i] = pool[index] m_complement = np.delete(np.arange(data.shape[0]), m) X_train = data[m, :] y_train = label[m] X_test = data[m_complement, :] y_test = label[m_complement] return X_train, X_test, y_train, y_test def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random", test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ 根据指定方法划分数据集 Args: X: 特征数据 y: 目标值数据 method: 划分方法 ("random", "spxy", "ks") test_size: 测试集比例 random_state: 随机种子(仅对random方法有效) Returns: X_train, X_test, y_train, y_test """ print(f"使用 {method} 方法划分数据集") if method == "random": return self.random(X, y, test_ratio=test_size, random_state=random_state) elif method == "spxy": return self.spxy(X, y, test_size=test_size) elif method == "ks": return self.ks(X, y, test_size=test_size) else: raise ValueError(f"不支持的划分方法: {method}. 支持的方法: ['random', 'spxy', 'ks']") def get_best_model_from_summary(self, metric: str = 'test_r2') -> Tuple[str, str]: """ 从训练摘要中获取最佳模型信息 Args: metric: 评估指标(默认使用test_r2,回归任务的主要指标) Returns: preprocess_method: 预处理方法 model_name: 模型名称 """ # 获取当前artifacts_dir的文件夹名称(用作目标列名) folder_name = self.artifacts_dir.name # 尝试加载详细结果文件(使用新的命名格式) detailed_path = self.artifacts_dir / f"{folder_name}_detailed_results.csv" summary_path = self.artifacts_dir / f"{folder_name}_training_summary.csv" # 备用的旧格式文件路径 old_detailed_path = self.artifacts_dir / "detailed_results.csv" old_summary_path = self.artifacts_dir / "training_summary.csv" summary_df = None # 优先使用新格式的详细结果文件 if detailed_path.exists(): print(f"使用详细结果文件: {detailed_path}") summary_df = pd.read_csv(detailed_path) # 将中文列名映射到英文 metric_mapping = { 'test_r2': '测试集R²', 'train_r2': '训练集R²', 'test_rmse': '测试集RMSE', 'train_rmse': '训练集RMSE', 'cv_mean': 'CV均值' } if metric in metric_mapping and metric_mapping[metric] in summary_df.columns: metric_col = metric_mapping[metric] else: metric_col = metric elif summary_path.exists(): print(f"使用训练摘要文件: {summary_path}") summary_df = pd.read_csv(summary_path) metric_col = metric elif old_detailed_path.exists(): print(f"使用旧格式详细结果文件: {old_detailed_path}") summary_df = pd.read_csv(old_detailed_path) # 将中文列名映射到英文 metric_mapping = { 'test_r2': '测试集R²', 'train_r2': '训练集R²', 'test_rmse': '测试集RMSE', 'train_rmse': '训练集RMSE', 'cv_mean': 'CV均值' } if metric in metric_mapping and metric_mapping[metric] in summary_df.columns: metric_col = metric_mapping[metric] else: metric_col = metric elif old_summary_path.exists(): print(f"使用旧格式训练摘要文件: {old_summary_path}") summary_df = pd.read_csv(old_summary_path) metric_col = metric else: raise FileNotFoundError(f"训练摘要文件不存在,尝试的路径:\n" f" - {detailed_path}\n" f" - {summary_path}\n" f" - {old_detailed_path}\n" f" - {old_summary_path}") if summary_df.empty: raise ValueError("训练摘要为空") # 检查指标列是否存在 if metric_col not in summary_df.columns: available_cols = list(summary_df.columns) raise ValueError(f"指标 '{metric_col}' 不存在。可用列: {available_cols}") # 获取最佳模型(对于R²等指标,值越大越好) if 'r2' in metric.lower() or 'score' in metric.lower(): best_idx = summary_df[metric_col].idxmax() else: # 对于RMSE、MAE等,值越小越好 best_idx = summary_df[metric_col].idxmin() best_row = summary_df.loc[best_idx] # 根据文件类型解析模型信息 if (detailed_path.exists() or old_detailed_path.exists()) and '划分方法' in summary_df.columns: # 详细结果文件格式 split_method = best_row['划分方法'] preprocess_method = best_row['预处理方法'] model_name = best_row['建模方法'] # 处理 nan/NaN/None 值,转换为 "None" 字符串 if pd.isna(preprocess_method) or str(preprocess_method).lower() in ['nan', 'none', '']: preprocess_method = "None" best_combination = f"{split_method}_{preprocess_method}_{model_name}" else: # 简化结果文件格式 best_combination = best_row['combination'] # 解析组合名称(格式: split_method_preprocess_method_model_name) parts = best_combination.split('_') if len(parts) < 3: raise ValueError(f"无效的模型组合名称格式: {best_combination}") split_method = parts[0] preprocess_method = parts[1] model_name = '_'.join(parts[2:]) # 处理 nan/NaN/None 值,转换为 "None" 字符串 if pd.isna(preprocess_method) or str(preprocess_method).lower() in ['nan', 'none', '']: preprocess_method = "None" print(f"最佳模型组合: {best_combination}") print(f" 划分方法: {split_method}") print(f" 预处理方法: {preprocess_method}") print(f" 模型名称: {model_name}") print(f" {metric_col}: {best_row[metric_col]:.4f}") self.best_model_info = { 'combination': best_combination, 'split_method': split_method, 'preprocess_method': preprocess_method, 'model_name': model_name, 'metric_value': best_row[metric_col] } # 返回用于加载模型的文件名格式 model_file_prefix = f"{split_method}_{preprocess_method}" return model_file_prefix, model_name def load_best_model(self, metric: str = 'test_r2'): """ 加载最佳模型 Args: metric: 评估指标 """ model_file_prefix, model_name = self.get_best_model_from_summary(metric) # 获取当前artifacts_dir的文件夹名称(用作目标列名) folder_name = self.artifacts_dir.name # 构建模型文件路径(新格式:包含目标列名) filename = f"{folder_name}_{model_file_prefix}_{model_name}.joblib" filepath = self.artifacts_dir / filename # 如果新格式文件不存在,尝试旧格式 if not filepath.exists(): old_filename = f"{model_file_prefix}_{model_name}.joblib" old_filepath = self.artifacts_dir / old_filename if old_filepath.exists(): filepath = old_filepath filename = old_filename print(f"使用旧格式模型文件: {filepath}") else: raise FileNotFoundError(f"模型文件不存在,尝试的路径:\n" f" - {filepath}\n" f" - {old_filepath}") else: print(f"使用新格式模型文件: {filepath}") print(f"正在加载模型: {filepath}") # 加载模型数据 self.loaded_model_data = joblib.load(filepath) print("模型加载完成:") print(f" 预处理方法: {self.loaded_model_data['preprocess_method']}") print(f" 模型名称: {self.loaded_model_data['model_name']}") print(f" 模型类型: {type(self.loaded_model_data['model'])}") if 'metadata' in self.loaded_model_data: metadata = self.loaded_model_data['metadata'] print(f" 数据形状: {metadata.get('data_shape', 'Unknown')}") print(f" 目标范围: {metadata.get('target_range', 'Unknown')}") if 'test_r2' in metadata: print(f" 测试集R²: {metadata['test_r2']:.4f}") if 'test_rmse' in metadata: print(f" 测试集RMSE: {metadata['test_rmse']:.4f}") def load_specific_model(self, model_file_path: str): """ 加载指定的模型文件 Args: model_file_path: 模型文件路径 """ if not os.path.exists(model_file_path): raise FileNotFoundError(f"模型文件不存在: {model_file_path}") print(f"正在加载指定模型: {model_file_path}") # 加载模型数据 self.loaded_model_data = joblib.load(model_file_path) print("模型加载完成:") print(f" 预处理方法: {self.loaded_model_data['preprocess_method']}") print(f" 模型名称: {self.loaded_model_data['model_name']}") print(f" 模型类型: {type(self.loaded_model_data['model'])}") def preprocess_spectra(self, spectra: pd.DataFrame) -> np.ndarray: """ 对光谱数据进行预处理 Args: spectra: 原始光谱数据 Returns: 预处理后的光谱数据 """ if self.loaded_model_data is None: raise ValueError("请先加载模型") preprocess_method = self.loaded_model_data['preprocess_method'] # 处理 nan/NaN/None 值,转换为 "None" 字符串 if pd.isna(preprocess_method) or str(preprocess_method).lower() in ['nan', 'none', '']: preprocess_method = "None" # 解析预处理方法(可能包含划分方法前缀) if '_' in str(preprocess_method): parts = str(preprocess_method).split('_') # 假设格式为 split_method_preprocess_method actual_preprocess_method = '_'.join(parts[1:]) if len(parts) > 1 else parts[-1] else: actual_preprocess_method = str(preprocess_method) # 再次检查并转换 nan if actual_preprocess_method.lower() in ['nan', 'none', '']: actual_preprocess_method = "None" print(f"正在应用预处理方法: {actual_preprocess_method}") print(f"原始光谱数据形状: {spectra.shape}") try: # 应用预处理 spectra_processed = Preprocessing(actual_preprocess_method, spectra) # 确保返回numpy数组 if isinstance(spectra_processed, pd.DataFrame): spectra_processed = spectra_processed.values print(f"预处理后数据形状: {spectra_processed.shape}") return spectra_processed except Exception as e: print(f"预处理失败: {e}") print("使用原始数据") return spectra.values def predict(self, spectra_processed: np.ndarray) -> np.ndarray: """ 使用加载的模型进行预测 Args: spectra_processed: 预处理后的光谱数据 Returns: 预测结果 """ if self.loaded_model_data is None: raise ValueError("请先加载模型") model = self.loaded_model_data['model'] print(f"正在进行预测...") print(f"输入数据形状: {spectra_processed.shape}") try: # 清洗 NaN / Inf,防止 SVR 等模型报错 spectra_clean = np.nan_to_num(spectra_processed, nan=0.0, posinf=0.0, neginf=0.0) if np.any(np.isnan(spectra_clean)) or np.any(np.isinf(spectra_clean)): print("警告: 清洗后数据中仍存在 NaN/Inf,已重置为 0") spectra_clean = np.nan_to_num(spectra_clean, nan=0.0, posinf=0.0, neginf=0.0) predictions = model.predict(spectra_clean) print(f"预测完成,结果形状: {predictions.shape}") print(f"预测值范围: [{np.min(predictions):.4f}, {np.max(predictions):.4f}]") print(f"预测值统计: 均值={np.mean(predictions):.4f}, 标准差={np.std(predictions):.4f}") return predictions except Exception as e: print(f"预测失败: {e}") raise def save_predictions(self, coords: pd.DataFrame, predictions: np.ndarray, output_path: str, prediction_column: str = 'prediction'): """ 保存预测结果 Args: coords: 经纬度数据 predictions: 预测结果 output_path: 输出文件路径 prediction_column: 预测列名称 """ print(f"正在保存预测结果到: {output_path}") # 创建结果DataFrame result_df = coords.copy() result_df[prediction_column] = predictions # 确保输出目录存在 output_dir = os.path.dirname(output_path) if output_dir: os.makedirs(output_dir, exist_ok=True) # 根据文件扩展名选择保存格式 file_ext = Path(output_path).suffix.lower() if file_ext == '.xls': # 保存为Excel 97-2003格式 try: result_df.to_excel(output_path, index=False, engine='xlwt') print(f" 格式: Excel 97-2003 (.xls)") except ImportError: print("警告: xlwt库未安装,无法保存为.xls格式,改为保存CSV格式") csv_path = output_path.replace('.xls', '.csv') result_df.to_csv(csv_path, index=False, encoding='utf-8-sig') output_path = csv_path elif file_ext == '.xlsx': # 保存为Excel 2007+格式 try: result_df.to_excel(output_path, index=False, engine='openpyxl') print(f" 格式: Excel 2007+ (.xlsx)") except ImportError: print("警告: openpyxl库未安装,无法保存为.xlsx格式,改为保存CSV格式") csv_path = output_path.replace('.xlsx', '.csv') result_df.to_csv(csv_path, index=False, encoding='utf-8-sig') output_path = csv_path else: # 默认保存为CSV格式 result_df.to_csv(output_path, index=False, encoding='utf-8-sig') print(f" 格式: CSV (.csv)") print(f"预测结果保存完成:") print(f" 输出文件: {output_path}") print(f" 数据形状: {result_df.shape}") print(f" 列名: {list(result_df.columns)}") # 显示预测结果统计 print(f"\n预测结果统计:") print(result_df[prediction_column].describe()) return result_df def inference_pipeline(self, sampling_csv_path: str, output_csv_path: str, metric: str = 'test_r2', prediction_column: str = 'prediction', model_file_path: str = None): """ 完整的推理流程 Args: sampling_csv_path: 采样数据CSV路径 output_csv_path: 输出预测结果CSV路径 metric: 选择最佳模型的指标 prediction_column: 预测列名称 model_file_path: 指定模型文件路径(可选) """ print("=" * 80) print("开始水质参数反演推理流程") print("=" * 80) try: # 1. 加载模型 print("\n步骤1: 加载模型") print("-" * 40) if model_file_path: self.load_specific_model(model_file_path) else: self.load_best_model(metric=metric) # 2. 加载采样数据 print("\n步骤2: 加载采样数据") print("-" * 40) coords, spectra = self.load_sampling_data(sampling_csv_path) # 3. 数据预处理 print("\n步骤3: 数据预处理") print("-" * 40) spectra_processed = self.preprocess_spectra(spectra) # 4. 模型预测 print("\n步骤4: 模型预测") print("-" * 40) predictions = self.predict(spectra_processed) # 5. 保存预测结果 print("\n步骤5: 保存预测结果") print("-" * 40) result_df = self.save_predictions(coords, predictions, output_csv_path, prediction_column) print("\n" + "=" * 80) print("推理流程完成!") print("=" * 80) return predictions, result_df except Exception as e: print(f"\n推理流程失败: {e}") raise def get_model_info(self) -> Dict: """ 获取当前加载模型的信息 Returns: 模型信息字典 """ if self.loaded_model_data is None: return {"status": "no_model_loaded"} info = { "status": "model_loaded", "preprocess_method": self.loaded_model_data['preprocess_method'], "model_name": self.loaded_model_data['model_name'], "model_type": str(type(self.loaded_model_data['model'])), "metadata": self.loaded_model_data.get('metadata', {}) } if self.best_model_info: info.update(self.best_model_info) return info def batch_inference(self, input_dir: str, output_dir: str, metric: str = 'test_r2', prediction_column: str = 'prediction'): """ 批量推理多个采样文件 Args: input_dir: 输入目录,包含多个采样CSV文件 output_dir: 输出目录 metric: 选择最佳模型的指标 prediction_column: 预测列名称 """ input_path = Path(input_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 查找所有CSV文件 csv_files = list(input_path.glob("*.csv")) if not csv_files: print(f"在目录 {input_dir} 中未找到CSV文件") return print(f"找到 {len(csv_files)} 个CSV文件进行批量推理") # 加载模型(只需加载一次) self.load_best_model(metric=metric) results = {} for csv_file in csv_files: try: print(f"\n处理文件: {csv_file.name}") output_file = output_path / f"prediction_{csv_file.name}" # 执行推理 coords, spectra = self.load_sampling_data(str(csv_file)) spectra_processed = self.preprocess_spectra(spectra) predictions = self.predict(spectra_processed) result_df = self.save_predictions(coords, predictions, str(output_file), prediction_column) results[csv_file.name] = { 'output_file': str(output_file), 'sample_count': len(predictions), 'prediction_stats': { 'mean': np.mean(predictions), 'std': np.std(predictions), 'min': np.min(predictions), 'max': np.max(predictions) } } except Exception as e: print(f"处理文件 {csv_file.name} 失败: {e}") results[csv_file.name] = {'error': str(e)} print(f"\n批量推理完成,共处理 {len(csv_files)} 个文件") return results def batch_inference_multi_models(self, models_root_dir: str, sampling_csv_path: str, output_dir: str, metric: str = 'test_r2', prediction_column: str = 'prediction', output_format: str = 'csv'): """ 使用多个子文件夹中的模型进行批量推理 Args: models_root_dir: 包含多个子文件夹的根目录,每个子文件夹作为artifacts_dir sampling_csv_path: 采样数据CSV路径 output_dir: 输出目录 metric: 选择最佳模型的指标 prediction_column: 预测列名称 output_format: 输出文件格式 ('csv', 'xls', 'xlsx') """ models_root = Path(models_root_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 查找所有子文件夹 subdirs = [d for d in models_root.iterdir() if d.is_dir()] if not subdirs: print(f"在目录 {models_root_dir} 中未找到子文件夹") return print(f"找到 {len(subdirs)} 个模型子文件夹进行批量推理") print(f"输出格式: {output_format.upper()}") all_results = {} for subdir in subdirs: try: subdir_name = subdir.name print(f"\n{'='*60}") print(f"处理模型文件夹: {subdir_name}") print(f"{'='*60}") # 创建新的推理实例,使用当前子文件夹作为artifacts_dir model_inferencer = WaterQualityInference(str(subdir)) # 根据输出格式设置文件扩展名 file_ext = f".{output_format}" output_file = output_path / f"{subdir_name}{file_ext}" # 执行推理流程 predictions, result_df = model_inferencer.inference_pipeline( sampling_csv_path=sampling_csv_path, output_csv_path=str(output_file), metric=metric, prediction_column=prediction_column ) # 收集结果信息 model_info = model_inferencer.get_model_info() all_results[subdir_name] = { 'status': 'success', 'output_file': str(output_file), 'sample_count': len(predictions), 'model_info': model_info, 'prediction_stats': { 'mean': np.mean(predictions), 'std': np.std(predictions), 'min': np.min(predictions), 'max': np.max(predictions) } } print(f"子文件夹 {subdir_name} 处理完成") except Exception as e: print(f"处理子文件夹 {subdir_name} 失败: {e}") all_results[subdir_name] = { 'status': 'error', 'error': str(e) } print(f"\n{'='*80}") print(f"批量推理完成,共处理 {len(subdirs)} 个模型文件夹") print(f"{'='*80}") # 打印汇总信息 print("\n汇总结果:") for folder_name, result in all_results.items(): if result['status'] == 'success': print(f" ✓ {folder_name}: {result['sample_count']} 个预测值," f"均值={result['prediction_stats']['mean']:.4f}") else: print(f" ✗ {folder_name}: 失败 - {result['error']}") return all_results def batch_inference_multi_data(self, artifacts_dir: str, input_dir: str, output_dir: str, metric: str = 'test_r2', prediction_column: str = 'prediction', output_format: str = 'csv'): """ 使用一个模型对多个数据文件进行批量推理,输出文件名为数据文件名(不含扩展名) Args: artifacts_dir: 模型目录 input_dir: 输入目录,包含多个采样CSV文件 output_dir: 输出目录 metric: 选择最佳模型的指标 prediction_column: 预测列名称 output_format: 输出文件格式 ('csv', 'xls', 'xlsx') """ input_path = Path(input_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 查找所有CSV文件 csv_files = list(input_path.glob("*.csv")) if not csv_files: print(f"在目录 {input_dir} 中未找到CSV文件") return print(f"找到 {len(csv_files)} 个CSV文件进行批量推理") print(f"输出格式: {output_format.upper()}") # 初始化推理器并加载模型(只需加载一次) self.artifacts_dir = Path(artifacts_dir) self.load_best_model(metric=metric) results = {} for csv_file in csv_files: try: # 获取不含扩展名的文件名 file_stem = csv_file.stem print(f"\n处理文件: {csv_file.name}") # 根据输出格式设置文件扩展名 file_ext = f".{output_format}" output_file = output_path / f"{file_stem}{file_ext}" # 执行推理 coords, spectra = self.load_sampling_data(str(csv_file)) spectra_processed = self.preprocess_spectra(spectra) predictions = self.predict(spectra_processed) result_df = self.save_predictions(coords, predictions, str(output_file), prediction_column) results[file_stem] = { 'input_file': str(csv_file), 'output_file': str(output_file), 'sample_count': len(predictions), 'prediction_stats': { 'mean': np.mean(predictions), 'std': np.std(predictions), 'min': np.min(predictions), 'max': np.max(predictions) } } except Exception as e: print(f"处理文件 {csv_file.name} 失败: {e}") results[csv_file.stem] = {'error': str(e)} print(f"\n批量推理完成,共处理 {len(csv_files)} 个文件") return results def evaluate_with_split(self, data_csv_path: str, split_method: str = "random", test_size: float = 0.2, random_state: int = 42, target_column: int = 11, feature_start_column: int = 13, metric: str = 'test_r2', prediction_column: str = 'prediction'): """ 使用训练时相同的数据分割方法进行模型评估 Args: data_csv_path: 包含目标值的完整数据集CSV路径 split_method: 数据分割方法 ("random", "spxy", "ks") test_size: 测试集比例 random_state: 随机种子 target_column: 目标值列索引 feature_start_column: 特征开始列索引 metric: 选择模型的评估指标 prediction_column: 预测结果列名 Returns: 评估结果字典 """ print("=" * 80) print("开始数据分割评估流程") print("=" * 80) try: # 1. 加载完整数据集 print("\n步骤1: 加载完整数据集") print("-" * 40) data = pd.read_csv(data_csv_path) # 提取目标值和特征 y = data.iloc[:, target_column] X = data.iloc[:, feature_start_column:] # 去除目标值为空的行 mask = ~y.isna() data_cleaned = data[mask] y_cleaned = data_cleaned.iloc[:, target_column] X_cleaned = data_cleaned.iloc[:, feature_start_column:] print(f"数据加载完成:") print(f" 原始样本数: {len(data)}") print(f" 清理后样本数: {len(X_cleaned)}") print(f" 特征数量: {X_cleaned.shape[1]}") print(f" 目标值范围: {y_cleaned.min():.4f} ~ {y_cleaned.max():.4f}") # 2. 加载最佳模型 print("\n步骤2: 加载最佳模型") print("-" * 40) self.load_best_model(metric=metric) # 3. 数据预处理 print("\n步骤3: 数据预处理") print("-" * 40) X_processed = self.preprocess_spectra(X_cleaned) # 4. 数据分割 print("\n步骤4: 数据分割") print("-" * 40) X_train, X_test, y_train, y_test = self.split_data( X_processed, y_cleaned, method=split_method, test_size=test_size, random_state=random_state ) print(f"数据分割完成:") print(f" 训练集样本数: {X_train.shape[0]}") print(f" 测试集样本数: {X_test.shape[0]}") # 5. 模型预测 print("\n步骤5: 模型预测") print("-" * 40) # 训练集预测 y_train_pred = self.loaded_model_data['model'].predict(X_train) # 测试集预测 y_test_pred = self.loaded_model_data['model'].predict(X_test) # 6. 计算评估指标 print("\n步骤6: 计算评估指标") print("-" * 40) from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score # 训练集指标 train_mse = mean_squared_error(y_train, y_train_pred) train_mae = mean_absolute_error(y_train, y_train_pred) train_r2 = r2_score(y_train, y_train_pred) train_rmse = np.sqrt(train_mse) # 测试集指标 test_mse = mean_squared_error(y_test, y_test_pred) test_mae = mean_absolute_error(y_test, y_test_pred) test_r2 = r2_score(y_test, y_test_pred) test_rmse = np.sqrt(test_mse) results = { 'split_method': split_method, 'test_size': test_size, 'train_size': len(y_train), 'test_size_actual': len(y_test), 'train_metrics': { 'mse': train_mse, 'mae': train_mae, 'rmse': train_rmse, 'r2': train_r2 }, 'test_metrics': { 'mse': test_mse, 'mae': test_mae, 'rmse': test_rmse, 'r2': test_r2 }, 'predictions': { 'y_train_true': y_train, 'y_train_pred': y_train_pred, 'y_test_true': y_test, 'y_test_pred': y_test_pred } } print(f"评估完成:") print(f" 训练集指标:") print(f" R²: {train_r2:.4f}") print(f" RMSE: {train_rmse:.4f}") print(f" MAE: {train_mae:.4f}") print(f" 测试集指标:") print(f" R²: {test_r2:.4f}") print(f" RMSE: {test_rmse:.4f}") print(f" MAE: {test_mae:.4f}") print("\n" + "=" * 80) print("数据分割评估流程完成!") print("=" * 80) return results except Exception as e: print(f"\n数据分割评估失败: {e}") raise def main(): """主函数示例""" # 创建推理实例 artifacts_dir = r"E:\code\WQ\yaobao925\qvchuyaoban" inferencer = WaterQualityInference(artifacts_dir) # 配置文件路径 sampling_csv = r"E:\code\WQ\xiaogujia\使用腰堡模型\spectral_sampling_results.csv" # output_csv = r"E:\code\WQ\laodao\output" try: # # 示例1: 单个模型单个数据文件的推理 # print("示例1: 单个模型单个数据文件的推理") # predictions, result_df = inferencer.inference_pipeline( # sampling_csv_path=sampling_csv, # output_csv_path=output_csv, # metric='test_r2', # 使用测试集R²作为选择最佳模型的指标 # prediction_column='water_quality_prediction' # ) # # print(f"\n推理完成,共生成 {len(predictions)} 个预测值") # # # 显示模型信息 # model_info = inferencer.get_model_info() # print(f"\n使用的模型信息:") # print(f" 组合: {model_info.get('combination', 'Unknown')}") # print(f" 预处理: {model_info.get('preprocess_method', 'Unknown')}") # print(f" 算法: {model_info.get('model_name', 'Unknown')}") # 示例2: 批量推理多个模型(每个子文件夹作为不同的artifacts_dir) print(f"\n{'='*80}") print("示例2: 批量推理多个模型") models_root_dir = r"E:\code\WQ\yaobao925\qvchuyaoban" # 包含多个子文件夹的根目录 output_dir = r"E:\code\WQ\xiaogujia\使用腰堡模型\predict" all_results = inferencer.batch_inference_multi_models( models_root_dir=models_root_dir, sampling_csv_path=sampling_csv, output_dir=output_dir, metric='test_r2', prediction_column='water_quality_prediction' ) # 示例3: 使用数据分割方法进行模型评估(可选) # print(f"\n{'='*80}") # print("示例3: 数据分割评估") # complete_data_csv = r"E:\code\WQ\laodao\data\捞刀河-浏阳河-圭塘河.csv" # 包含目标值的完整数据集 # # # 使用SPXY方法进行数据分割评估 # eval_results = inferencer.evaluate_with_split( # data_csv_path=complete_data_csv, # split_method="spxy", # 可选: "random", "spxy", "ks" # test_size=0.2, # random_state=42, # target_column=11, # 目标值列索引 # feature_start_column=13, # 特征开始列索引 # metric='test_r2' # ) # # print(f"\n数据分割评估结果:") # print(f" 分割方法: {eval_results['split_method']}") # print(f" 训练集R²: {eval_results['train_metrics']['r2']:.4f}") # print(f" 测试集R²: {eval_results['test_metrics']['r2']:.4f}") # print(f" 训练集RMSE: {eval_results['train_metrics']['rmse']:.4f}") # print(f" 测试集RMSE: {eval_results['test_metrics']['rmse']:.4f}") except Exception as e: print(f"推理失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()