Files
micro_plastic/classification_model/Classification/CNN_网格搜索.py
2026-02-25 09:42:51 +08:00

310 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from torch.utils.tensorboard import SummaryWriter
# 设置设备和TensorBoard记录器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
writer = SummaryWriter() # 初始化 TensorBoard
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# ---------------------------
# 数据集及数据预处理函数
# ---------------------------
class MyDataset(Dataset):
"""
自定义数据集,支持数据增强(在训练时添加噪声)
"""
def __init__(self, specs, labels, augment=False):
self.specs = specs
self.labels = labels
self.augment = augment
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
if self.augment:
noise = 0.01 * torch.randn_like(spec)
spec = spec + noise
return spec, target
def __len__(self):
return len(self.specs)
def ZspProcess(X_train, X_test, y_train, y_test, need=True):
"""
标准化数据并转换为Tensor转换后数据形状为 (样本数, 1, 特征数)
"""
if need:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
data_train = MyDataset(X_train, y_train, augment=True)
data_test = MyDataset(X_test, y_test, augment=False)
return data_train, data_test
# ---------------------------
# 模型定义
# ---------------------------
class CNN3Layers(nn.Module):
"""
三层1D卷积神经网络支持自定义卷积层后Dropout率以及全连接层Dropout率
"""
def __init__(self, nls, dropout_conv=0.3, dropout_fc=0.5):
super(CNN3Layers, self).__init__()
self.CONV1 = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=5, stride=1, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2),
nn.Dropout(dropout_conv)
)
self.CONV2 = nn.Sequential(
nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2),
nn.Dropout(dropout_conv)
)
self.CONV3 = nn.Sequential(
nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.AdaptiveMaxPool1d(1),
nn.Dropout(dropout_conv)
)
self.fc = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(dropout_fc),
nn.Linear(128, nls)
)
def forward(self, x):
x = self.CONV1(x)
x = self.CONV2(x)
x = self.CONV3(x)
x = x.view(x.size(0), -1)
out = self.fc(x)
return out
# ---------------------------
# 训练与测试函数
# ---------------------------
def CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path, dropout_conv, dropout_fc):
"""
训练过程:训练指定轮次,记录训练与测试指标,并保存测试准确率最高的模型
"""
data_train, data_test = ZspProcess(X_train, X_test, y_train, y_test, need=True)
train_loader = DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNN3Layers(nls=nls, dropout_conv=dropout_conv, dropout_fc=dropout_fc).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
criterion = nn.CrossEntropyLoss().to(device)
scaler = GradScaler()
best_acc = 0.0
# 用于记录最后一次测试的预测结果(用于计算混淆矩阵等指标)
final_y_true, final_y_pred = [], []
for epoch in range(n_epochs):
model.train()
train_acc_list, train_loss_list = [], []
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
train_acc_list.append(acc)
train_loss_list.append(loss.item())
avg_train_loss = np.mean(train_loss_list)
avg_train_acc = np.mean(train_acc_list)
writer.add_scalar('Loss/train', avg_train_loss, epoch)
writer.add_scalar('Accuracy/train', avg_train_acc, epoch)
# 测试过程
model.eval()
test_acc_list, test_loss_list = [], []
test_precision_list, test_recall_list, test_f1_list = [], [], []
y_true, y_pred = [], []
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.to(device)
labels = labels.to(device)
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
prec = precision_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
rec = recall_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
f1 = f1_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
test_acc_list.append(acc)
test_loss_list.append(loss.item())
test_precision_list.append(prec)
test_recall_list.append(rec)
test_f1_list.append(f1)
avg_test_loss = np.mean(test_loss_list)
avg_test_acc = np.mean(test_acc_list)
avg_test_precision = np.mean(test_precision_list)
avg_test_recall = np.mean(test_recall_list)
avg_test_f1 = np.mean(test_f1_list)
writer.add_scalar('Loss/test', avg_test_loss, epoch)
writer.add_scalar('Accuracy/test', avg_test_acc, epoch)
writer.add_scalar('Precision/test', avg_test_precision, epoch)
writer.add_scalar('Recall/test', avg_test_recall, epoch)
writer.add_scalar('F1_Score/test', avg_test_f1, epoch)
print(f"Epoch [{epoch + 1}/{n_epochs}]: Train Loss={avg_train_loss:.4f}, Train Acc={avg_train_acc:.4f} | "
f"Test Loss={avg_test_loss:.4f}, Test Acc={avg_test_acc:.4f}, Precision={avg_test_precision:.4f}, "
f"Recall={avg_test_recall:.4f}, F1={avg_test_f1:.4f}")
# 如果当前测试准确率更好则保存模型
if avg_test_acc > best_acc:
best_acc = avg_test_acc
torch.save(model.state_dict(), model_path)
final_y_true = y_true.copy()
final_y_pred = y_pred.copy()
scheduler.step(avg_test_loss)
train_metrics = {
"train_loss": avg_train_loss,
"train_accuracy": avg_train_acc
}
test_metrics = {
"test_loss": avg_test_loss,
"test_accuracy": avg_test_acc,
"precision": avg_test_precision,
"recall": avg_test_recall,
"f1_score": avg_test_f1,
"confusion_matrix": confusion_matrix(final_y_true, final_y_pred)
}
return train_metrics, test_metrics
def CNNTest(X_test, y_test, BATCH_SIZE, nls, model_path, dropout_conv, dropout_fc):
"""
加载保存的模型,并在测试集上计算各项指标
"""
# 仅对测试集进行标准化处理
scaler = StandardScaler()
X_test = scaler.fit_transform(X_test)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
data_test = MyDataset(X_test, y_test, augment=False)
test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNN3Layers(nls=nls, dropout_conv=dropout_conv, dropout_fc=dropout_fc).to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
y_true, y_pred = [], []
test_loss_list = []
criterion = nn.CrossEntropyLoss().to(device)
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
test_loss_list.append(loss.item())
avg_loss = np.mean(test_loss_list)
acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, average='weighted', zero_division=1)
rec = recall_score(y_true, y_pred, average='weighted', zero_division=1)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=1)
test_metrics = {
"test_loss": avg_loss,
"test_accuracy": acc,
"precision": prec,
"recall": rec,
"f1_score": f1,
"confusion_matrix": confusion_matrix(y_true, y_pred)
}
return test_metrics
# ---------------------------
# 自定义随机搜索超参数优化函数
# ---------------------------
def optimize_hyperparameters(X_train, X_test, y_train, y_test, nls, n_iter=10, BATCH_SIZE=32, n_epochs=10):
"""
随机搜索指定次数,每次随机采样超参数(这里以 dropout_conv 和 dropout_fc 为例),
对模型进行训练和测试,最后返回使测试准确率最高的超参数配置以及对应的训练和测试指标。
"""
best_test_acc = -1.0
best_params = None
best_train_metrics = None
best_test_metrics = None
for i in range(n_iter):
# 从均匀分布中随机采样超参数
dropout_conv = np.random.uniform(0.2, 0.7) # 可根据需要调整取值范围
dropout_fc = np.random.uniform(0.3, 0.8)
print(f"\nIteration {i + 1}/{n_iter}: Testing dropout_conv={dropout_conv:.4f}, dropout_fc={dropout_fc:.4f}")
# 指定模型保存路径(每次覆盖保存最佳模型)
model_path = "best_model.pth"
# 训练模型
train_metrics, _ = CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path,
dropout_conv, dropout_fc)
# 评估测试指标(加载保存的最佳模型)
test_metrics = CNNTest(X_test, y_test, BATCH_SIZE, nls, model_path, dropout_conv, dropout_fc)
current_test_acc = test_metrics["test_accuracy"]
print(f"Iteration {i + 1} result: Test Accuracy = {current_test_acc:.4f}")
# 更新最佳超参数
if current_test_acc > best_test_acc:
best_test_acc = current_test_acc
best_params = {"dropout_conv": dropout_conv, "dropout_fc": dropout_fc}
best_train_metrics = train_metrics
best_test_metrics = test_metrics
return best_params, best_train_metrics, best_test_metrics