import torch.nn.functional as F import numpy as np import torch import torch.nn as nn from torch.utils.data import Dataset from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix import torch.optim as optim from sklearn.preprocessing import StandardScaler from torch.utils.tensorboard import SummaryWriter from torch.cuda.amp import GradScaler, autocast import os from sklearn.metrics import precision_score, recall_score, f1_score device = torch.device("cuda" if torch.cuda.is_available() else "cpu") writer = SummaryWriter() # 初始化 TensorBoard os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' # 自定义数据集,包含数据增强(添加噪声) from skopt import BayesSearchCV from skopt.space import Real, Integer import torch import torch.optim as optim from torch.utils.data import DataLoader class MyDataset(Dataset): def __init__(self, specs, labels, augment=False): self.specs = specs self.labels = labels self.augment = augment # 是否启用数据增强 def __getitem__(self, index): spec, target = self.specs[index], self.labels[index] # 数据增强:在训练数据上添加随机噪声 if self.augment: noise = 0.01 * torch.randn_like(spec) spec = spec + noise return spec, target def __len__(self): return len(self.specs) # 标准化数据 def ZspPocess(X_train, X_test, y_train, y_test, need=True): if need: scaler = StandardScaler() X_train = scaler.fit_transform(X_train) # fit_transform 用于训练集 X_test = scaler.transform(X_test) # 只对测试集应用 transform # 将标准化的数据转换为 Tensor X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32) X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32) y_train = torch.tensor(y_train, dtype=torch.long) y_test = torch.tensor(y_test, dtype=torch.long) # 使用数据增强 (augment=True) 创建训练集 data_train = MyDataset(X_train, y_train, augment=True) data_test = MyDataset(X_test, y_test, augment=False) return data_train, data_test # CNN 模型,添加 Dropout 层和调整 Dropout 率 class CNN3Layers(nn.Module): def __init__(self, nls, dropout_conv=0.3, dropout_fc=0.5): super(CNN3Layers, self).__init__() self.CONV1 = nn.Sequential( nn.Conv1d(1, 64, 5, 1, padding=2), nn.BatchNorm1d(64), nn.ReLU(), nn.MaxPool1d(2, 2), nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout ) self.CONV2 = nn.Sequential( nn.Conv1d(64, 128, 5, 1, padding=2), nn.BatchNorm1d(128), nn.ReLU(), nn.MaxPool1d(2, 2), nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout ) self.CONV3 = nn.Sequential( nn.Conv1d(128, 256, 3, 1, padding=1), nn.BatchNorm1d(256), nn.ReLU(), nn.AdaptiveMaxPool1d(1), nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout ) self.fc = nn.Sequential( nn.Linear(256, 128), nn.ReLU(), nn.Dropout(dropout_fc), # 全连接层中的 Dropout nn.Linear(128, nls) ) def forward(self, x): x = self.CONV1(x) x = self.CONV2(x) x = self.CONV3(x) x = x.view(x.size(0), -1) out = self.fc(x) return out # 训练函数 def CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path): data_train, data_test = ZspPocess(X_train, X_test, y_train, y_test, need=True) train_loader = torch.utils.data.DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True) test_loader = torch.utils.data.DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False) model = CNN3Layers(nls=nls, dropout_conv=0.3, dropout_fc=0.5).to(device) optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.001) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5) criterion = nn.CrossEntropyLoss().to(device) scaler = GradScaler() best_acc = 0.0 model_save_path = model_path for epoch in range(n_epochs): model.train() train_acc, train_loss = [], [] for i, data in enumerate(train_loader): inputs, labels = data inputs = inputs.to(device).float() labels = labels.to(device).long() optimizer.zero_grad() with autocast(): outputs = model(inputs) loss = criterion(outputs, labels) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() _, predicted = torch.max(outputs.data, 1) acc = accuracy_score(labels.cpu(), predicted.cpu()) train_acc.append(acc) train_loss.append(loss.item()) avg_train_loss = np.mean(train_loss) avg_train_acc = np.mean(train_acc) writer.add_scalar('Loss/train', avg_train_loss, epoch) writer.add_scalar('Accuracy/train', avg_train_acc, epoch) # 测试集评估 model.eval() test_acc, test_loss, test_precision, test_recall, test_f1 = [], [], [], [], [] y_true, y_pred = [], [] with torch.no_grad(): for data in test_loader: inputs, labels = data inputs = inputs.to(device).float() labels = labels.to(device).long() with autocast(): outputs = model(inputs) loss = criterion(outputs, labels) _, predicted = torch.max(outputs.data, 1) acc = accuracy_score(labels.cpu(), predicted.cpu()) precision = precision_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1) recall = recall_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1) f1 = f1_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1) y_true.extend(labels.cpu().numpy()) y_pred.extend(predicted.cpu().numpy()) test_acc.append(acc) test_loss.append(loss.item()) test_precision.append(precision) test_recall.append(recall) test_f1.append(f1) avg_test_loss = np.mean(test_loss) avg_test_acc = np.mean(test_acc) avg_test_precision = np.mean(test_precision) avg_test_recall = np.mean(test_recall) avg_test_f1 = np.mean(test_f1) writer.add_scalar('Loss/test', avg_test_loss, epoch) writer.add_scalar('Accuracy/test', avg_test_acc, epoch) writer.add_scalar('Precision/test', avg_test_precision, epoch) writer.add_scalar('Recall/test', avg_test_recall, epoch) writer.add_scalar('F1_Score/test', avg_test_f1, epoch) # 打印每个 epoch 的训练和测试结果 print(f"Epoch [{epoch + 1}/{n_epochs}]") print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_acc:.4f}") print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {avg_test_acc:.4f}") print(f"Test Precision: {avg_test_precision:.4f}, Test Recall: {avg_test_recall:.4f}, Test F1: {avg_test_f1:.4f}") if avg_test_acc > best_acc: best_acc = avg_test_acc torch.save(model.state_dict(), model_save_path) scheduler.step(avg_test_loss) return { "accuracy": avg_test_acc, "precision": avg_test_precision, "recall": avg_test_recall, "f1_score": avg_test_f1, "confusion_matrix": confusion_matrix(y_true, y_pred) } # 测试函数 def CNNtest(X_test, y_test, BATCH_SIZE, nls, model_path): # 标准化测试数据并创建 DataLoader scaler = StandardScaler() X_test = scaler.fit_transform(X_test) # 只对 X_test 进行标准化 X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32) y_test = torch.tensor(y_test, dtype=torch.long) # 创建测试数据集和 DataLoader data_test = MyDataset(X_test, y_test, augment=False) test_loader = torch.utils.data.DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False) # 加载模型结构和权重 model = CNN3Layers(nls=nls).to(device) model.load_state_dict(torch.load(model_path)) # 初始化评估指标 y_true, y_pred = [], [] # 测试过程 model.eval() with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(device).float(), labels.to(device).long() outputs = model(inputs) _, predicted = torch.max(outputs.data, 1) # 收集真实标签和预测标签 y_true.extend(labels.cpu().numpy()) y_pred.extend(predicted.cpu().numpy()) # 计算评估指标 accuracy = accuracy_score(y_true, y_pred) precision = precision_score(y_true, y_pred, average='weighted') recall = recall_score(y_true, y_pred, average='weighted') f1 = f1_score(y_true, y_pred, average='weighted') cm = confusion_matrix(y_true, y_pred) # 返回评估结果 return { "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1, "confusion_matrix": cm } def optimize_CNN(X_train, X_test, y_train, y_test, model_path): # 贝叶斯优化的搜索空间 param_space = { 'batch_size': Integer(16, 128), # batch size 的范围 'n_epochs': Integer(10, 100), # 训练 epochs 的范围 'dropout_conv': Real(0.1, 0.5, 'uniform'), # 卷积层 dropout 比例 'dropout_fc': Real(0.1, 0.5, 'uniform'), # 全连接层 dropout 比例 'lr': Real(1e-5, 1e-2, 'log-uniform'), # 学习率范围 } # 训练模型的目标函数 def objective(params): batch_size, n_epochs, dropout_conv, dropout_fc, lr = params # 使用给定的超参数进行训练 train_metrics = CNNTrain( X_train, X_test, y_train, y_test, BATCH_SIZE=batch_size, n_epochs=n_epochs, nls=21, model_path=model_path, ) # 测试模型并返回评估指标 test_metrics = CNNtest(X_test, y_test, batch_size, nls=21, model_path=model_path) # 我们以测试集的 accuracy 作为优化目标 return -test_metrics["accuracy"] # 贝叶斯优化是最小化目标函数,所以返回负值 # 使用贝叶斯优化进行调优 optimizer = BayesSearchCV( estimator=None, # 不使用具体的模型,这里我们将目标函数传给贝叶斯优化 search_spaces=param_space, # 搜索空间 n_iter=20, # 调优的迭代次数 n_jobs=-1, # 使用所有可用的 CPU 核心 verbose=1, # 输出优化过程 random_state=42, # 固定随机种子 ) # 进行超参数调优 optimizer.fit(X_train, y_train) # 输出最优超参数 best_params = optimizer.best_params_ print("Best hyperparameters:", best_params) # 使用最优超参数训练并返回评估指标 batch_size = best_params['batch_size'] n_epochs = best_params['n_epochs'] dropout_conv = best_params['dropout_conv'] dropout_fc = best_params['dropout_fc'] lr = best_params['lr'] train_metrics = CNNTrain( X_train, X_test, y_train, y_test, BATCH_SIZE=batch_size, n_epochs=n_epochs, nls=21, model_path=model_path, ) test_metrics = CNNtest(X_test, y_test, batch_size, nls=21, model_path=model_path) # 返回训练和测试的评估结果 return best_params, train_metrics, test_metrics