# -*- coding: utf-8 -*- """ Optuna + 智能子采样 AutoML 训练器(路线 B 防爆引擎)。 为什么需要这个: - 老路径:11 预处理 × 4 模型 × 3 划分 = 132 组 GridSearchCV 对中小数据集 10 分钟+,对大数据集 5w+ 行 直接 OOM - AutoML 路径:1 预处理 × N 模型(Optuna 调超参),用智能子采样避开 OOM 再用最优超参在**全量数据**上 refit,最终保存单一模型 设计要点: - 入口 train_with_automl(csv, feature_start_column, model_names, ...) - AutoMLResult dataclass 返回(每个目标列一份) - smart_subsample:N > max_samples 时随机下采样 - 失败兜底:optuna 未装 / 全 trial 失败 → fallback 到 WaterQualityModelingBatch - 文件命名规范:{target}_{preprocess}_{model}_AUTOML.joblib - save_data["metadata"]["automl"] = True 标记 调用: from src.core.prediction.automl_trainer import train_with_automl results = train_with_automl( training_csv_path=".../training_spectra.csv", feature_start_column="374.285004", model_names=["RF", "SVR", "Ridge"], n_trials=20, timeout_sec=300, ) """ from __future__ import annotations import json import time from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np import pandas as pd # ============================================================ # 常量 # ============================================================ # AutoML 寻优阶段允许的最大样本数(避免 OOM) # 5000 样本对 RF/SVR/Ridge 的 Optuna 寻优足够给出稳定 CV DEFAULT_MAX_SAMPLES = 5000 # 单次 Optuna trial 的默认超时(秒) DEFAULT_TIMEOUT = 300.0 # 默认 trial 数 DEFAULT_N_TRIALS = 20 # AutoML 输出目录名后缀 AUTOML_DIR_SUFFIX = "_AutoML" # ============================================================ # 数据类 # ============================================================ @dataclass class AutoMLResult: """单个目标列的 AutoML 训练结果""" success: bool = False model_path: Optional[str] = None cv_score: float = -float("inf") best_params: Optional[Dict[str, Any]] = None target_column: str = "" preprocessing: str = "" model_name: str = "" n_trials_done: int = 0 n_samples_used: int = 0 fallback_used: bool = False elapsed_sec: float = 0.0 error: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) # ============================================================ # 智能子采样 # ============================================================ def smart_subsample( X: np.ndarray, y: np.ndarray, max_samples: int = DEFAULT_MAX_SAMPLES, random_state: int = 42, ) -> Tuple[np.ndarray, np.ndarray, bool]: """当 N > max_samples 时随机下采样;否则原样返回。 Returns: (X_sub, y_sub, was_subsampled) """ n = X.shape[0] if n <= max_samples: return X, y, False rng = np.random.default_rng(random_state) idx = rng.choice(n, size=max_samples, replace=False) return X[idx], y[idx], True # ============================================================ # 模型工厂 # ============================================================ def _build_model(model_name: str, random_state: int = 42): """根据英文模型键名构造 sklearn-compatible 模型实例(factory)。""" from sklearn.ensemble import ( AdaBoostRegressor, ExtraTreesRegressor, GradientBoostingRegressor, RandomForestRegressor, ) from sklearn.linear_model import ( ElasticNet, Lasso, LinearRegression, Ridge, ) from sklearn.neighbors import KNeighborsRegressor from sklearn.neural_network import MLPRegressor from sklearn.svm import SVR from sklearn.tree import DecisionTreeRegressor factory = { "RF": lambda **kw: RandomForestRegressor(random_state=random_state, n_jobs=1, **kw), "ET": lambda **kw: ExtraTreesRegressor(random_state=random_state, n_jobs=1, **kw), "GradientBoosting": lambda **kw: GradientBoostingRegressor(random_state=random_state, **kw), "AdaBoost": lambda **kw: AdaBoostRegressor(random_state=random_state, **kw), "Ridge": lambda **kw: Ridge(**kw), "Lasso": lambda **kw: Lasso(max_iter=5000, **kw), "ElasticNet": lambda **kw: ElasticNet(max_iter=5000, **kw), "LinearRegression": lambda **kw: LinearRegression(**kw), "SVR": lambda **kw: SVR(**kw), "KNN": lambda **kw: KNeighborsRegressor(n_jobs=1, **kw), "MLP": lambda **kw: MLPRegressor(max_iter=500, random_state=random_state, **kw), "DecisionTree": lambda **kw: DecisionTreeRegressor(random_state=random_state, **kw), "PLS": None, # sklearn.cross_decomposition.PLSRegression 暂未集成 } builder = factory.get(model_name) if builder is None: return None return builder # ============================================================ # Optuna 超参 search space # ============================================================ def _get_search_space(model_name: str, trial) -> Dict[str, Any]: """按模型名返回 Optuna 超参 search space。""" sp: Dict[str, Any] = {} if model_name == "RF": sp["n_estimators"] = trial.suggest_int("n_estimators", 50, 300, step=50) sp["max_depth"] = trial.suggest_int("max_depth", 3, 20) sp["min_samples_split"] = trial.suggest_int("min_samples_split", 2, 10) sp["min_samples_leaf"] = trial.suggest_int("min_samples_leaf", 1, 5) elif model_name == "ET": sp["n_estimators"] = trial.suggest_int("n_estimators", 50, 300, step=50) sp["max_depth"] = trial.suggest_int("max_depth", 3, 20) elif model_name == "GradientBoosting": sp["n_estimators"] = trial.suggest_int("n_estimators", 50, 300, step=50) sp["max_depth"] = trial.suggest_int("max_depth", 3, 8) sp["learning_rate"] = trial.suggest_float("learning_rate", 0.01, 0.3, log=True) elif model_name == "SVR": sp["C"] = trial.suggest_float("C", 0.1, 100.0, log=True) sp["epsilon"] = trial.suggest_float("epsilon", 0.001, 1.0, log=True) sp["kernel"] = trial.suggest_categorical("kernel", ["rbf", "linear"]) elif model_name == "KNN": sp["n_neighbors"] = trial.suggest_int("n_neighbors", 3, 20) sp["weights"] = trial.suggest_categorical("weights", ["uniform", "distance"]) elif model_name in ("Ridge", "Lasso", "ElasticNet"): sp["alpha"] = trial.suggest_float("alpha", 0.01, 100.0, log=True) if model_name == "ElasticNet": sp["l1_ratio"] = trial.suggest_float("l1_ratio", 0.0, 1.0) elif model_name == "MLP": sp["hidden_layer_sizes"] = trial.suggest_categorical( "hidden_layer_sizes", [(50,), (100,), (50, 50), (100, 50)] ) sp["alpha"] = trial.suggest_float("alpha", 1e-5, 1e-1, log=True) sp["learning_rate_init"] = trial.suggest_float("learning_rate_init", 1e-4, 1e-2, log=True) elif model_name == "DecisionTree": sp["max_depth"] = trial.suggest_int("max_depth", 3, 20) sp["min_samples_split"] = trial.suggest_int("min_samples_split", 2, 10) elif model_name == "AdaBoost": sp["n_estimators"] = trial.suggest_int("n_estimators", 30, 200, step=30) sp["learning_rate"] = trial.suggest_float("learning_rate", 0.01, 1.0, log=True) else: sp["n_estimators"] = trial.suggest_int("n_estimators", 50, 200, step=50) return sp def _make_objective(model_name: str, X: np.ndarray, y: np.ndarray, cv_folds: int, random_state: int): """构造 Optuna objective(5 折 CV R²)。""" from sklearn.model_selection import KFold, cross_val_score def objective(trial): params = _get_search_space(model_name, trial) try: builder = _build_model(model_name, random_state=random_state) if builder is None: return -1.0 model = builder(**params) kf = KFold(n_splits=cv_folds, shuffle=True, random_state=random_state) scores = cross_val_score(model, X, y, cv=kf, scoring="r2", n_jobs=1) return float(np.mean(scores)) except Exception: return -1.0 return objective def _refit_full(model_name: str, best_params: Dict[str, Any], X: np.ndarray, y: np.ndarray, random_state: int): """用 best params 在**全量数据**上 refit。""" builder = _build_model(model_name, random_state=random_state) if builder is None: return None model = builder(**best_params) model.fit(X, y) return model # ============================================================ # 失败兜底(回退到老 GridSearchCV 路径) # ============================================================ def _fallback_train( training_csv_path: str, feature_start_column, preprocessing: str, model_name: str, split_method: str, cv_folds: int, output_dir: Path, target_column: str, ) -> AutoMLResult: """AutoML 失败时调老 WaterQualityModelingBatch。 返回的 AutoMLResult.fallback_used=True。 """ try: from src.core.modeling.modeling_batch import WaterQualityModelingBatch except ImportError as e: return AutoMLResult( success=False, error=f"fallback 导入失败: {e!r}", fallback_used=True, target_column=target_column, preprocessing=preprocessing, model_name=model_name, ) try: out_dir = output_dir / preprocessing out_dir.mkdir(parents=True, exist_ok=True) modeler = WaterQualityModelingBatch(str(out_dir)) modeler.train_models_batch( csv_path=training_csv_path, feature_start_column=feature_start_column, preprocessing_methods=[preprocessing], model_names=[model_name], split_methods=[split_method], cv_folds=cv_folds, ) # 找产出 candidates = list(out_dir.rglob(f"{target_column}_{preprocessing}_{model_name}.joblib")) model_path = str(candidates[0]) if candidates else None return AutoMLResult( success=model_path is not None, model_path=model_path, target_column=target_column, preprocessing=preprocessing, model_name=model_name, fallback_used=True, metadata={"source": "WaterQualityModelingBatch"}, ) except Exception as e: return AutoMLResult( success=False, error=f"fallback 失败: {e!r}", fallback_used=True, target_column=target_column, preprocessing=preprocessing, model_name=model_name, ) # ============================================================ # 主入口 # ============================================================ def train_with_automl( training_csv_path: str, feature_start_column, preprocessing_methods: Optional[List[str]] = None, model_names: Optional[List[str]] = None, split_methods: Optional[List[str]] = None, cv_folds: int = 5, output_dir: Optional[str] = None, n_trials: int = DEFAULT_N_TRIALS, timeout_sec: float = DEFAULT_TIMEOUT, max_samples: int = DEFAULT_MAX_SAMPLES, random_state: int = 42, callback: Optional[Callable[[str, str, str], None]] = None, ) -> List[AutoMLResult]: """用 Optuna + 子采样跑 AutoML。失败时自动回退到 GridSearchCV。 Args: training_csv_path: 训练用 CSV(Step 5 产物 training_spectra.csv) feature_start_column: 特征起始列名或索引(之前所有列视为目标 y) preprocessing_methods: 候选预处理列表(**仅用第 1 个**,避免笛卡尔爆炸) model_names: 候选模型列表(每个都会跑一遍 Optuna) split_methods: 候选数据划分列表(AutoML 仅用第 1 个) cv_folds: 交叉验证折数 output_dir: 输出目录(默认 _AutoML) n_trials: 单模型 Optuna trial 数 timeout_sec: 单模型超时(秒),到时强制停止 max_samples: 寻优阶段允许的最大样本数 callback: 状态回调 callback(step_name, status, message) Returns: List[AutoMLResult],每个目标列一份结果 """ def notify(status: str, msg: str = "") -> None: if callback: callback("步骤6_AutoML", status, msg) # ---- 1) 参数默认值 ---- if preprocessing_methods is None: preprocessing_methods = ["MMS"] if model_names is None: model_names = ["RF", "SVR", "Ridge"] if split_methods is None: split_methods = ["spxy"] # 决策:仅用第一个预处理 + 第一个划分,避免笛卡尔爆炸 preproc = preprocessing_methods[0] split_method = split_methods[0] if output_dir is None: output_dir = "./7_Supervised_Model_Training_AutoML" out_dir = Path(output_dir) out_dir.mkdir(parents=True, exist_ok=True) preproc_dir = out_dir / preproc preproc_dir.mkdir(parents=True, exist_ok=True) # ---- 2) 加载数据 ---- notify("start", f"AutoML 训练开始 (n_trials={n_trials}, timeout={timeout_sec}s, max_samples={max_samples})") if not Path(training_csv_path).exists(): return [AutoMLResult(success=False, error=f"训练 CSV 不存在: {training_csv_path}")] df = pd.read_csv(training_csv_path) # 提取目标列(feature_start_column 之前所有数值列) if isinstance(feature_start_column, int): y_cols = [c for c in df.columns[:feature_start_column] if pd.api.types.is_numeric_dtype(df[c])] else: try: idx = list(df.columns).index(feature_start_column) y_cols = [c for c in df.columns[:idx] if pd.api.types.is_numeric_dtype(df[c])] except ValueError: y_cols = [] if not y_cols: notify("error", "AutoML: 未识别出目标列(feature_start_column 之前的所有数值列)") return [AutoMLResult(success=False, error="未识别出目标列")] feat_cols = [c for c in df.columns if c not in y_cols] X_all = df[feat_cols].values.astype(np.float64) # ---- 3) 预处理(仅第一项) ---- if preproc != "None": try: from src.preprocessing.spectral_Preprocessing import Preprocessing processed = Preprocessing(preproc, df[feat_cols]) if isinstance(processed, pd.DataFrame): X_all = processed.values.astype(np.float64) else: X_all = np.asarray(processed, dtype=np.float64) except Exception as e: notify("warning", f"预处理 {preproc} 失败: {e!r},改用 None") preproc = "None" # ---- 4) 检查 Optuna 是否可用 ---- try: import optuna optuna.logging.set_verbosity(optuna.logging.WARNING) optuna_available = True except ImportError: optuna_available = False notify("warning", "optuna 未安装,全目标列回退到 GridSearchCV(pip install \"optuna>=3.6\")") # ---- 5) 逐 target 跑 ---- results: List[AutoMLResult] = [] total = len(y_cols) per_model_timeout = max(10.0, timeout_sec / max(1, len(model_names))) for ti, tgt in enumerate(y_cols, 1): t0 = time.time() yv = df[tgt].values.astype(np.float64) mask = ~np.isnan(yv) X_t = X_all[mask] y_t = yv[mask] if X_t.shape[0] < cv_folds * 2: notify("warning", f"目标 {tgt}: 有效样本 {X_t.shape[0]} 不足,跳过") results.append(AutoMLResult( success=False, target_column=tgt, error=f"样本不足({X_t.shape[0]})", preprocessing=preproc, )) continue X_sub, y_sub, was_sub = smart_subsample(X_t, y_t, max_samples=max_samples, random_state=random_state) if was_sub: notify("info", f"目标 {tgt}: {X_t.shape[0]} 样本 → 子采样 {X_sub.shape[0]}(寻优用)") best_overall = AutoMLResult(success=False, target_column=tgt, preprocessing=preproc) if not optuna_available: # 全目标列一次性 fallback best_overall = _fallback_train( training_csv_path, feature_start_column, preproc, model_names[0], split_method, cv_folds, out_dir, tgt, ) else: for model_name in model_names: try: builder = _build_model(model_name, random_state=random_state) if builder is None: notify("warning", f"模型 {model_name} 暂不支持 AutoML 寻优") continue study = optuna.create_study( direction="maximize", sampler=optuna.samplers.TPESampler(seed=random_state), ) study.optimize( _make_objective(model_name, X_sub, y_sub, cv_folds, random_state), n_trials=n_trials, timeout=per_model_timeout, show_progress_bar=False, ) if study.best_value is None or study.best_value <= -1.0: notify("warning", f"{tgt}/{model_name}: 全部 trial 失败(CV 全部 <= -1)") continue # refit on FULL final_model = _refit_full(model_name, study.best_params, X_t, y_t, random_state) if final_model is None: continue # 保存 import joblib fname = f"{tgt}_{preproc}_{model_name}_AUTOML.joblib" fpath = preproc_dir / fname joblib.dump({ "model": final_model, "target_column_name": tgt, "preprocess_method": preproc, "model_name": model_name, "metadata": { "automl": True, "best_params": study.best_params, "cv_score": float(study.best_value), "n_trials_done": len(study.trials), "n_samples_used_full": int(X_t.shape[0]), "n_samples_used_for_search": int(X_sub.shape[0]), "was_subsampled": was_sub, "split_method": split_method, }, }, fpath) cand = AutoMLResult( success=True, model_path=str(fpath), cv_score=float(study.best_value), best_params=study.best_params, target_column=tgt, preprocessing=preproc, model_name=model_name, n_trials_done=len(study.trials), n_samples_used=int(X_sub.shape[0]), metadata={"refit_on_full": True, "n_samples_full": int(X_t.shape[0])}, ) if cand.cv_score > best_overall.cv_score: best_overall = cand except Exception as e: notify("warning", f"目标 {tgt} / 模型 {model_name} 失败: {e!r}") continue if not best_overall.success: notify("warning", f"目标 {tgt} 全部 Optuna trial 失败,回退 GridSearchCV") best_overall = _fallback_train( training_csv_path, feature_start_column, preproc, model_names[0], split_method, cv_folds, out_dir, tgt, ) best_overall.elapsed_sec = time.time() - t0 results.append(best_overall) notify("info", f"AutoML 目标 {tgt} 完成 ({ti}/{total}) cv={best_overall.cv_score:.4f}") # ---- 6) 汇总 json ---- summary_path = out_dir / "automl_summary.json" try: with open(summary_path, "w", encoding="utf-8") as f: json.dump([asdict(r) for r in results], f, ensure_ascii=False, indent=2, default=str) except Exception as e: notify("warning", f"写 automl_summary.json 失败: {e!r}") success_n = sum(1 for r in results if r.success) fallback_n = sum(1 for r in results if r.fallback_used) notify("completed", f"AutoML 训练完成 {success_n}/{len(results)} 成功({fallback_n} 走 fallback),汇总 {summary_path}") return results # ============================================================ # CLI 自测 # ============================================================ if __name__ == "__main__": import argparse p = argparse.ArgumentParser(description="AutoML 训练器 CLI 自测") p.add_argument("--csv", required=True, help="训练用 CSV(feature_start_column 之前的列为目标 y)") p.add_argument("--feature-start", default="0", help="特征起始列名或索引(默认 0)") p.add_argument("--n-trials", type=int, default=DEFAULT_N_TRIALS) p.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT) p.add_argument("--max-samples", type=int, default=DEFAULT_MAX_SAMPLES) p.add_argument("--out", default="./7_Supervised_Model_Training_AutoML") args = p.parse_args() # 智能推断 feature_start_column 类型 fsc: Any = args.feature_start try: fsc = int(fsc) except ValueError: pass res = train_with_automl( training_csv_path=args.csv, feature_start_column=fsc, n_trials=args.n_trials, timeout_sec=args.timeout, max_samples=args.max_samples, output_dir=args.out, ) print(f"\n训练完成 {len(res)} 个目标") for r in res: marker = "✓" if r.success else "✗" fb = " [fallback]" if r.fallback_used else "" print(f" {marker} {r.target_column}: cv={r.cv_score:.4f} path={r.model_path}{fb}")