1085 lines
42 KiB
Python
1085 lines
42 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering, SpectralClustering
|
||
from sklearn.preprocessing import StandardScaler, MinMaxScaler
|
||
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
|
||
from sklearn.decomposition import PCA
|
||
import os
|
||
import argparse
|
||
import json
|
||
from typing import Tuple, List, Dict, Optional, Union, Any
|
||
from dataclasses import dataclass
|
||
import warnings
|
||
try:
|
||
import skfuzzy as fuzz # 模糊聚类
|
||
SKFUZZY_AVAILABLE = True
|
||
except ImportError:
|
||
SKFUZZY_AVAILABLE = False
|
||
print("警告: 未安装scikit-fuzzy,模糊C均值聚类将不可用")
|
||
|
||
from sklearn.cluster import MiniBatchKMeans
|
||
import itertools
|
||
warnings.filterwarnings('ignore')
|
||
|
||
@dataclass
|
||
class ClusteringConfig:
|
||
"""聚类分析配置类"""
|
||
input_path: Optional[str] = None
|
||
method: str = 'kmeans'
|
||
n_clusters: int = 5
|
||
output_dir: Optional[str] = None
|
||
use_scaling: bool = True
|
||
scaler_type: str = 'standard'
|
||
use_pca: bool = False
|
||
pca_components: Optional[int] = None
|
||
method_params: Optional[Dict[str, Dict[str, Any]]] = None
|
||
|
||
def __post_init__(self):
|
||
"""参数校验和默认值设置"""
|
||
if not self.input_path:
|
||
raise ValueError("必须指定输入文件路径(input_path)")
|
||
|
||
valid_methods = ['kmeans', 'fuzzy-cmeans', 'gmm', 'hierarchical', 'dbscan', 'spectral', 'subspace', 'ensemble']
|
||
if self.method not in valid_methods:
|
||
raise ValueError(f"不支持的聚类方法: {self.method}。支持的方法: {valid_methods}")
|
||
|
||
if self.n_clusters <= 0:
|
||
raise ValueError("聚类数量必须大于0")
|
||
|
||
if not self.output_dir:
|
||
self.output_dir = './results'
|
||
|
||
|
||
# 从supervize_cluster.py导入距离度量类和图像处理器
|
||
from supervize_cluster_method.supervize_cluster import HyperspectralDistanceMetrics, HyperspectralImageProcessor
|
||
|
||
|
||
class DataLoader:
|
||
"""数据加载器,支持高光谱图像和CSV文件"""
|
||
|
||
def __init__(self):
|
||
self.data = None
|
||
self.header = None
|
||
self.wavelengths = None
|
||
self.is_image = False
|
||
self.csv_spectral_start_col = None
|
||
|
||
def load_hyperspectral_image(self, hdr_path: str) -> Tuple[np.ndarray, Dict]:
|
||
"""加载ENVI格式高光谱图像"""
|
||
try:
|
||
processor = HyperspectralImageProcessor()
|
||
data, header = processor.load_image(hdr_path)
|
||
self.data = data
|
||
self.header = header
|
||
self.is_image = True
|
||
|
||
# 提取波长信息
|
||
if 'wavelength' in header:
|
||
self.wavelengths = np.array([float(w) for w in header['wavelength']])
|
||
|
||
return data, header
|
||
|
||
except Exception as e:
|
||
raise IOError(f"加载高光谱图像失败: {e}")
|
||
|
||
def load_csv_file(self, csv_path: str, spectral_start_col: str) -> Tuple[np.ndarray, List[str]]:
|
||
"""加载CSV文件,提取光谱数据"""
|
||
try:
|
||
df = pd.read_csv(csv_path)
|
||
|
||
if spectral_start_col not in df.columns:
|
||
raise ValueError(f"指定的光谱起始列名 '{spectral_start_col}' 不存在于CSV文件中")
|
||
|
||
# 找到光谱起始列的索引
|
||
start_idx = df.columns.get_loc(spectral_start_col)
|
||
|
||
# 提取光谱数据(从起始列到最后)
|
||
spectral_data = df.iloc[:, start_idx:].values
|
||
column_names = df.columns[start_idx:].tolist()
|
||
|
||
# 检查数据有效性
|
||
if spectral_data.shape[1] == 0:
|
||
raise ValueError("没有找到光谱数据列")
|
||
|
||
# 处理NaN值
|
||
if np.isnan(spectral_data).any():
|
||
print("警告: 数据中包含NaN值,将用列均值填充")
|
||
col_means = np.nanmean(spectral_data, axis=0)
|
||
nan_mask = np.isnan(spectral_data)
|
||
spectral_data[nan_mask] = col_means[np.where(nan_mask)[1]]
|
||
|
||
self.data = spectral_data
|
||
self.is_image = False
|
||
self.csv_spectral_start_col = spectral_start_col
|
||
|
||
print(f"成功加载CSV文件: 样本数={spectral_data.shape[0]}, 波段数={spectral_data.shape[1]}")
|
||
|
||
return spectral_data, column_names
|
||
|
||
except Exception as e:
|
||
raise IOError(f"加载CSV文件失败: {e}")
|
||
|
||
def get_data_info(self) -> Dict[str, Any]:
|
||
"""获取数据信息"""
|
||
info = {
|
||
'is_image': self.is_image,
|
||
'data_shape': self.data.shape if self.data is not None else None,
|
||
'data_type': str(self.data.dtype) if self.data is not None else None,
|
||
}
|
||
|
||
if self.is_image:
|
||
info.update({
|
||
'wavelengths': self.wavelengths,
|
||
'header_info': self.header
|
||
})
|
||
else:
|
||
info.update({
|
||
'spectral_start_col': self.csv_spectral_start_col
|
||
})
|
||
|
||
return info
|
||
|
||
|
||
class ClusteringManager:
|
||
"""聚类算法管理器"""
|
||
|
||
def __init__(self, config: Optional[ClusteringConfig] = None, n_clusters: int = 5, random_state: int = 42,
|
||
enable_preprocessing: bool = True, max_samples: int = None,
|
||
method_params: Dict[str, Dict[str, Any]] = None):
|
||
if config is not None:
|
||
# 使用配置对象
|
||
self.config = config
|
||
self.n_clusters = config.n_clusters
|
||
self.random_state = 42 # 默认值
|
||
# 从配置对象读取预处理参数
|
||
self.enable_preprocessing = getattr(config, 'use_scaling', True)
|
||
self.max_samples = None # 默认值
|
||
# 读取其他配置参数
|
||
self.use_pca = getattr(config, 'use_pca', False)
|
||
self.pca_components = getattr(config, 'pca_components', None)
|
||
self.scaler_type = getattr(config, 'scaler_type', 'standard')
|
||
# 使用配置中的method_params
|
||
self.method_params = getattr(config, 'method_params', {}) or {}
|
||
else:
|
||
# 向后兼容:使用单独的参数
|
||
self.config = None
|
||
self.n_clusters = n_clusters
|
||
self.random_state = random_state
|
||
self.enable_preprocessing = enable_preprocessing
|
||
self.max_samples = max_samples
|
||
self.use_pca = False
|
||
self.pca_components = None
|
||
self.scaler_type = 'standard'
|
||
self.method_params = method_params or {}
|
||
self.cluster_centers_ = {}
|
||
self.labels_ = {}
|
||
self.scores_ = {}
|
||
|
||
# 设置默认超参数
|
||
self._set_default_method_params()
|
||
|
||
# 预处理对象
|
||
self.scaler = None
|
||
self.X_preprocessed = None
|
||
self.sample_indices = None
|
||
|
||
# 聚类算法映射
|
||
self.algorithms = {
|
||
'kmeans': self._kmeans_clustering,
|
||
'gmm': self._gmm_clustering,
|
||
'fuzzy-cmeans': self._fuzzy_cmeans_clustering,
|
||
'hierarchical': self._hierarchical_clustering,
|
||
'dbscan': self._dbscan_clustering,
|
||
'spectral': self._spectral_clustering,
|
||
'subspace': self._subspace_clustering,
|
||
'ensemble': self._ensemble_clustering
|
||
}
|
||
|
||
def _set_default_method_params(self) -> None:
|
||
"""设置各方法的默认超参数"""
|
||
default_params = {
|
||
'kmeans': {
|
||
'n_init': 10,
|
||
'max_iter': 300,
|
||
'batch_size': 1000,
|
||
'use_minibatch_threshold': 10000
|
||
},
|
||
'gmm': {
|
||
'covariance_type': 'full',
|
||
'max_iter': 200,
|
||
'n_init_attempts': 10
|
||
},
|
||
'fuzzy-cmeans': {
|
||
'fuzziness': 2.0,
|
||
'max_iter': 1000,
|
||
'error': 0.005
|
||
},
|
||
'hierarchical': {
|
||
'linkage': 'ward',
|
||
'sampling_threshold': 5000,
|
||
'n_neighbors': 5
|
||
},
|
||
'dbscan': {
|
||
'eps_percentile': 50,
|
||
'min_samples_factor': 0.1,
|
||
'n_neighbors': 20
|
||
},
|
||
'spectral': {
|
||
'affinity': 'nearest_neighbors',
|
||
'n_neighbors': 10,
|
||
'large_dataset_threshold': 2000
|
||
},
|
||
'subspace': {
|
||
'n_components_factor': 0.33,
|
||
'max_iter': 300
|
||
}
|
||
}
|
||
|
||
# 合并用户提供的参数和默认参数
|
||
for method, params in default_params.items():
|
||
if method in self.method_params:
|
||
params.update(self.method_params[method])
|
||
self.method_params[method] = params
|
||
|
||
def fit_predict(self, X: np.ndarray, method: str, preprocessed: bool = False) -> np.ndarray:
|
||
"""使用指定方法进行聚类"""
|
||
if method not in self.algorithms:
|
||
raise ValueError(f"不支持的聚类方法: {method}")
|
||
|
||
print(f"正在使用 {method} 方法进行聚类...")
|
||
try:
|
||
# 预处理步骤:标准化
|
||
if not preprocessed and self.enable_preprocessing:
|
||
if self.scaler_type == 'standard':
|
||
self.scaler = StandardScaler()
|
||
elif self.scaler_type == 'minmax':
|
||
self.scaler = MinMaxScaler()
|
||
else:
|
||
self.scaler = StandardScaler() # 默认使用标准标准化
|
||
X = self.scaler.fit_transform(X)
|
||
|
||
# 预处理步骤:PCA降维
|
||
if not preprocessed and self.use_pca:
|
||
from sklearn.decomposition import PCA
|
||
if self.pca_components is None:
|
||
# 自动确定PCA组件数量
|
||
n_components = min(X.shape[1], max(2, int(X.shape[1] * 0.95)))
|
||
else:
|
||
n_components = min(self.pca_components, X.shape[1])
|
||
self.pca = PCA(n_components=n_components, random_state=self.random_state)
|
||
X = self.pca.fit_transform(X)
|
||
|
||
# 获取该方法的超参数
|
||
method_params = self.method_params.get(method, {})
|
||
labels = self.algorithms[method](X, preprocessed=preprocessed, **method_params)
|
||
self.labels_[method] = labels
|
||
|
||
# 计算聚类评估分数(使用原始数据)
|
||
if len(np.unique(labels)) > 1:
|
||
try:
|
||
# 如果使用了采样,需要恢复原始标签
|
||
if self.sample_indices is not None and preprocessed:
|
||
full_labels = np.zeros(len(self.sample_indices) if self.sample_indices is not None else X.shape[0], dtype=int)
|
||
full_labels[self.sample_indices] = labels
|
||
labels_for_scores = full_labels
|
||
X_for_scores = self.scaler.inverse_transform(X) if self.scaler else X
|
||
else:
|
||
labels_for_scores = labels
|
||
X_for_scores = self.scaler.inverse_transform(X) if self.scaler and preprocessed else X
|
||
|
||
self.scores_[method] = self._calculate_clustering_scores(X_for_scores, labels_for_scores)
|
||
except:
|
||
self.scores_[method] = None
|
||
|
||
print(f"✓ {method} 聚类完成")
|
||
return labels
|
||
|
||
except Exception as e:
|
||
print(f"✗ {method} 聚类失败: {e}")
|
||
return None
|
||
|
||
def _preprocess_data(self, X: np.ndarray) -> np.ndarray:
|
||
"""预处理数据:标准化和采样"""
|
||
# 数据采样(对于大数据集)
|
||
if self.max_samples is not None and X.shape[0] > self.max_samples:
|
||
np.random.seed(self.random_state)
|
||
self.sample_indices = np.random.choice(X.shape[0], self.max_samples, replace=False)
|
||
X = X[self.sample_indices]
|
||
print(f"数据采样: {X.shape[0]} 样本 (原始: {X.shape[0] if self.sample_indices is None else len(self.sample_indices)} 样本)")
|
||
|
||
# 数据标准化
|
||
if self.enable_preprocessing:
|
||
if self.scaler is None:
|
||
self.scaler = StandardScaler()
|
||
self.X_preprocessed = self.scaler.fit_transform(X)
|
||
X_processed = self.X_preprocessed
|
||
else:
|
||
X_processed = X
|
||
|
||
return X_processed
|
||
|
||
def fit_predict_all(self, X: np.ndarray) -> Dict[str, np.ndarray]:
|
||
"""使用所有方法进行聚类"""
|
||
results = {}
|
||
methods = list(self.algorithms.keys())
|
||
|
||
print(f"开始使用 {len(methods)} 种聚类方法进行分析...")
|
||
|
||
# 预处理数据(只做一次)
|
||
X_processed = self._preprocess_data(X)
|
||
|
||
# 并行执行聚类(简化版本,实际可以考虑使用joblib)
|
||
for method in methods:
|
||
results[method] = self.fit_predict(X_processed, method, preprocessed=True)
|
||
|
||
return results
|
||
|
||
def _kmeans_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
n_init: int = 10, max_iter: int = 300,
|
||
batch_size: int = 1000, use_minibatch_threshold: int = 10000) -> np.ndarray:
|
||
"""K-Means聚类"""
|
||
# 对于大数据集使用MiniBatchKMeans
|
||
use_minibatch = X.shape[0] > use_minibatch_threshold
|
||
|
||
if use_minibatch:
|
||
kmeans = MiniBatchKMeans(n_clusters=self.n_clusters, random_state=self.random_state,
|
||
batch_size=batch_size, max_iter=max_iter)
|
||
else:
|
||
kmeans = KMeans(n_clusters=self.n_clusters, random_state=self.random_state,
|
||
n_init=n_init, max_iter=max_iter)
|
||
|
||
labels = kmeans.fit_predict(X)
|
||
|
||
# 保存聚类中心(需要反标准化)
|
||
if preprocessed and self.scaler is not None:
|
||
self.cluster_centers_['kmeans'] = self.scaler.inverse_transform(kmeans.cluster_centers_)
|
||
else:
|
||
self.cluster_centers_['kmeans'] = kmeans.cluster_centers_
|
||
|
||
return labels
|
||
|
||
def _fuzzy_cmeans_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
fuzziness: float = 2.0, max_iter: int = 1000,
|
||
error: float = 0.005) -> np.ndarray:
|
||
"""模糊C均值聚类"""
|
||
if not SKFUZZY_AVAILABLE:
|
||
raise ImportError("scikit-fuzzy未安装,无法使用模糊C均值聚类")
|
||
|
||
# 对于大数据集,降低迭代次数和容差
|
||
if X.shape[0] > 5000:
|
||
maxiter = 500
|
||
error_val = 0.01
|
||
else:
|
||
maxiter = max_iter
|
||
error_val = error
|
||
|
||
# 准备数据:scikit-fuzzy 的 cmeans 需要转置的数据格式 (n_features, n_samples)
|
||
if preprocessed:
|
||
# 如果数据已经预处理过,直接转置
|
||
X_for_cmeans = X.T
|
||
else:
|
||
# 否则进行标准化并转置
|
||
scaler = MinMaxScaler()
|
||
X_scaled = scaler.fit_transform(X)
|
||
X_for_cmeans = X_scaled.T
|
||
|
||
# 执行模糊C均值聚类
|
||
cntr, u, _, _, _, _, _ = fuzz.cluster.cmeans(
|
||
X_for_cmeans, self.n_clusters, fuzziness, error=error_val, maxiter=maxiter
|
||
)
|
||
|
||
# 获取硬聚类标签(最大隶属度)
|
||
labels = np.argmax(u, axis=0)
|
||
|
||
# 保存聚类中心
|
||
# cntr 的形状是 (n_features, n_clusters),转置为 (n_clusters, n_features)
|
||
if preprocessed and self.scaler is not None:
|
||
# 如果外部进行了预处理,需要反标准化聚类中心
|
||
self.cluster_centers_['fuzzy-cmeans'] = self.scaler.inverse_transform(cntr.T)
|
||
else:
|
||
# 否则直接使用(可能需要根据内部标准化进行调整)
|
||
self.cluster_centers_['fuzzy-cmeans'] = cntr.T
|
||
|
||
return labels
|
||
|
||
def _gmm_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
covariance_type: str = 'full', max_iter: int = 200,
|
||
n_init_attempts: int = 10) -> np.ndarray:
|
||
"""高斯混合模型聚类"""
|
||
from sklearn.mixture import GaussianMixture
|
||
|
||
# 对于大数据集,使用对角协方差矩阵以提高效率
|
||
if X.shape[0] > 10000:
|
||
covariance_type = 'diag'
|
||
max_iter_val = 100
|
||
else:
|
||
covariance_type = covariance_type
|
||
max_iter_val = max_iter
|
||
|
||
# 尝试不同的初始化次数
|
||
best_gmm = None
|
||
best_score = -np.inf
|
||
|
||
n_init_attempts = min(n_init_attempts, max(1, X.shape[0] // 1000))
|
||
|
||
for init_attempt in range(n_init_attempts):
|
||
gmm = GaussianMixture(
|
||
n_components=self.n_clusters,
|
||
covariance_type=covariance_type,
|
||
random_state=self.random_state + init_attempt,
|
||
max_iter=max_iter_val,
|
||
n_init=1
|
||
)
|
||
|
||
try:
|
||
labels = gmm.fit_predict(X)
|
||
|
||
# 计算BIC分数,选择最好的模型
|
||
bic_score = gmm.bic(X)
|
||
if bic_score > best_score:
|
||
best_score = bic_score
|
||
best_gmm = gmm
|
||
best_labels = labels
|
||
except:
|
||
continue
|
||
|
||
if best_gmm is None:
|
||
# 如果所有尝试都失败,使用默认设置
|
||
gmm = GaussianMixture(
|
||
n_components=self.n_clusters,
|
||
covariance_type='diag',
|
||
random_state=self.random_state,
|
||
max_iter=100
|
||
)
|
||
best_labels = gmm.fit_predict(X)
|
||
|
||
# 保存聚类中心(使用各组件的均值)
|
||
if hasattr(best_gmm, 'means_'):
|
||
self.cluster_centers_['gmm'] = best_gmm.means_
|
||
|
||
return best_labels
|
||
|
||
def _hierarchical_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
linkage: str = 'ward', sampling_threshold: int = 5000,
|
||
n_neighbors: int = 5) -> np.ndarray:
|
||
"""层次聚类"""
|
||
# 对于大数据集,使用采样
|
||
if X.shape[0] > sampling_threshold:
|
||
# 使用采样进行层次聚类,然后传播标签
|
||
n_samples = min(sampling_threshold, X.shape[0])
|
||
np.random.seed(self.random_state)
|
||
sample_indices = np.random.choice(X.shape[0], n_samples, replace=False)
|
||
X_sampled = X[sample_indices]
|
||
|
||
hierarchical = AgglomerativeClustering(
|
||
n_clusters=self.n_clusters, linkage=linkage
|
||
)
|
||
sample_labels = hierarchical.fit_predict(X_sampled)
|
||
|
||
# 使用KNN将标签传播到所有点
|
||
from sklearn.neighbors import KNeighborsClassifier
|
||
knn = KNeighborsClassifier(n_neighbors=n_neighbors)
|
||
knn.fit(X_sampled, sample_labels)
|
||
labels = knn.predict(X)
|
||
else:
|
||
hierarchical = AgglomerativeClustering(
|
||
n_clusters=self.n_clusters, linkage='ward'
|
||
)
|
||
labels = hierarchical.fit_predict(X)
|
||
|
||
return labels
|
||
|
||
def _dbscan_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
eps_percentile: int = 50, min_samples_factor: float = 0.1,
|
||
n_neighbors: int = 20) -> np.ndarray:
|
||
"""DBSCAN聚类"""
|
||
# 自适应选择eps参数(优化版本)
|
||
if X.shape[0] > 1000:
|
||
# 对于大数据集,使用采样估计eps
|
||
n_samples_for_eps = min(1000, X.shape[0])
|
||
np.random.seed(self.random_state)
|
||
sample_indices = np.random.choice(X.shape[0], n_samples_for_eps, replace=False)
|
||
X_sample = X[sample_indices]
|
||
else:
|
||
X_sample = X
|
||
|
||
# 使用更少的邻居来加速计算
|
||
from sklearn.neighbors import NearestNeighbors
|
||
neigh = NearestNeighbors(n_neighbors=min(n_neighbors, len(X_sample)-1))
|
||
nbrs = neigh.fit(X_sample)
|
||
distances, indices = nbrs.kneighbors(X_sample)
|
||
|
||
# 使用k距离图的拐点作为eps
|
||
k_distances = np.sort(distances[:, -1])
|
||
# 使用更保守的百分位数
|
||
eps = np.percentile(k_distances, eps_percentile)
|
||
|
||
# 对于大数据集,增加min_samples
|
||
min_samples = max(self.n_clusters, int(np.sqrt(X.shape[0]) * min_samples_factor))
|
||
|
||
dbscan = DBSCAN(eps=eps, min_samples=min_samples, n_jobs=-1) # 使用并行
|
||
labels = dbscan.fit_predict(X)
|
||
|
||
# DBSCAN可能产生噪声点(标签为-1),重新标记
|
||
unique_labels = np.unique(labels)
|
||
if -1 in unique_labels:
|
||
# 将噪声点分配到最近的聚类
|
||
noise_mask = labels == -1
|
||
if np.any(noise_mask):
|
||
# 计算噪声点到各聚类的距离
|
||
valid_labels = unique_labels[unique_labels != -1]
|
||
if len(valid_labels) > 0:
|
||
for i in np.where(noise_mask)[0]:
|
||
distances_to_centers = []
|
||
for label in valid_labels:
|
||
center = np.mean(X[labels == label], axis=0)
|
||
dist = np.linalg.norm(X[i] - center)
|
||
distances_to_centers.append(dist)
|
||
labels[i] = valid_labels[np.argmin(distances_to_centers)]
|
||
|
||
return labels
|
||
|
||
def _spectral_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
affinity: str = 'nearest_neighbors', n_neighbors: int = 10,
|
||
large_dataset_threshold: int = 2000) -> np.ndarray:
|
||
"""谱聚类"""
|
||
# 对于大数据集,减少邻居数量并使用rbf核
|
||
if X.shape[0] > large_dataset_threshold:
|
||
n_neighbors_actual = min(n_neighbors // 2, X.shape[0] // 100)
|
||
spectral = SpectralClustering(
|
||
n_clusters=self.n_clusters, random_state=self.random_state,
|
||
affinity=affinity, n_neighbors=n_neighbors_actual,
|
||
n_jobs=-1 # 并行计算
|
||
)
|
||
else:
|
||
spectral = SpectralClustering(
|
||
n_clusters=self.n_clusters, random_state=self.random_state,
|
||
affinity=affinity, n_neighbors=n_neighbors
|
||
)
|
||
|
||
labels = spectral.fit_predict(X)
|
||
return labels
|
||
|
||
def _subspace_clustering(self, X: np.ndarray, preprocessed: bool = False,
|
||
n_components_factor: float = 0.33, max_iter: int = 300) -> np.ndarray:
|
||
"""基于子空间的聚类(使用PCA降维后K-means)"""
|
||
# 动态选择PCA组件数量
|
||
n_components = min(X.shape[1], max(self.n_clusters, int(X.shape[1] * n_components_factor)))
|
||
|
||
# 使用PCA进行子空间学习
|
||
pca = PCA(n_components=n_components, random_state=self.random_state)
|
||
X_pca = pca.fit_transform(X)
|
||
|
||
# 在子空间中进行聚类(使用优化后的K-means)
|
||
if X.shape[0] > 10000:
|
||
kmeans = MiniBatchKMeans(n_clusters=self.n_clusters, random_state=self.random_state,
|
||
batch_size=1000, max_iter=max_iter)
|
||
else:
|
||
kmeans = KMeans(n_clusters=self.n_clusters, random_state=self.random_state,
|
||
n_init=10, max_iter=max_iter)
|
||
|
||
labels = kmeans.fit_predict(X_pca)
|
||
self.cluster_centers_['subspace'] = kmeans.cluster_centers_
|
||
|
||
return labels
|
||
|
||
def _ensemble_clustering(self, X: np.ndarray, preprocessed: bool = False) -> np.ndarray:
|
||
"""聚类集成(结合多种算法的结果)"""
|
||
# 根据数据集大小选择不同的基础方法
|
||
if X.shape[0] > 10000:
|
||
# 大数据集:使用计算效率更高的方法
|
||
base_methods = ['kmeans', 'subspace']
|
||
elif X.shape[0] > 2000:
|
||
# 中等数据集
|
||
base_methods = ['kmeans', 'gmm', 'subspace']
|
||
else:
|
||
# 小数据集:可以使用所有方法
|
||
base_methods = ['kmeans', 'gmm', 'hierarchical', 'spectral']
|
||
|
||
base_labels = {}
|
||
method_weights = {}
|
||
|
||
for method in base_methods:
|
||
try:
|
||
labels = self.algorithms[method](X, preprocessed=preprocessed)
|
||
base_labels[method] = labels
|
||
|
||
# 计算每个方法的置信度权重(基于轮廓系数)
|
||
if len(np.unique(labels)) > 1:
|
||
try:
|
||
silhouette = silhouette_score(X, labels)
|
||
method_weights[method] = max(0.1, silhouette + 1) # 确保权重为正
|
||
except:
|
||
method_weights[method] = 1.0
|
||
else:
|
||
method_weights[method] = 0.5
|
||
except Exception as e:
|
||
print(f"集成聚类中方法 {method} 失败: {e}")
|
||
continue
|
||
|
||
if not base_labels:
|
||
# 如果所有基础方法都失败,使用K-means
|
||
return self._kmeans_clustering(X, preprocessed=preprocessed)
|
||
|
||
# 使用加权投票结合结果
|
||
final_labels = np.zeros(X.shape[0], dtype=int)
|
||
n_methods = len(base_labels)
|
||
|
||
for i in range(X.shape[0]):
|
||
# 收集所有方法的预测
|
||
predictions = []
|
||
weights = []
|
||
|
||
for method, labels in base_labels.items():
|
||
predictions.append(labels[i])
|
||
weights.append(method_weights.get(method, 1.0))
|
||
|
||
# 加权投票
|
||
unique_preds = np.unique(predictions)
|
||
weighted_votes = np.zeros(len(unique_preds))
|
||
|
||
for j, pred in enumerate(unique_preds):
|
||
pred_mask = np.array(predictions) == pred
|
||
weighted_votes[j] = np.sum(np.array(weights)[pred_mask])
|
||
|
||
final_labels[i] = unique_preds[np.argmax(weighted_votes)]
|
||
|
||
# 重新映射标签以确保连续性
|
||
unique_final = np.unique(final_labels)
|
||
if len(unique_final) > self.n_clusters:
|
||
# 如果发现的聚类数过多,使用层次聚类进行后处理
|
||
from sklearn.cluster import AgglomerativeClustering
|
||
agg = AgglomerativeClustering(n_clusters=self.n_clusters, linkage='ward')
|
||
final_labels = agg.fit_predict(final_labels.reshape(-1, 1))
|
||
else:
|
||
# 正常映射
|
||
label_mapping = {old: new for new, old in enumerate(unique_final)}
|
||
final_labels = np.array([label_mapping[label] for label in final_labels])
|
||
|
||
return final_labels
|
||
|
||
def _calculate_clustering_scores(self, X: np.ndarray, labels: np.ndarray) -> Dict[str, float]:
|
||
"""计算聚类评估分数"""
|
||
scores = {}
|
||
|
||
try:
|
||
# 轮廓系数
|
||
scores['silhouette'] = silhouette_score(X, labels)
|
||
except:
|
||
scores['silhouette'] = None
|
||
|
||
try:
|
||
# Calinski-Harabasz指数
|
||
scores['calinski_harabasz'] = calinski_harabasz_score(X, labels)
|
||
except:
|
||
scores['calinski_harabasz'] = None
|
||
|
||
try:
|
||
# Davies-Bouldin指数
|
||
scores['davies_bouldin'] = davies_bouldin_score(X, labels)
|
||
except:
|
||
scores['davies_bouldin'] = None
|
||
|
||
return scores
|
||
|
||
|
||
class OutputManager:
|
||
"""输出管理器"""
|
||
|
||
def __init__(self):
|
||
self.processor = HyperspectralImageProcessor()
|
||
|
||
def save_clustering_results(self, results: Dict[str, np.ndarray],
|
||
output_dir: str, data_loader: DataLoader) -> List[str]:
|
||
"""保存聚类结果"""
|
||
saved_files = []
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
if data_loader.is_image:
|
||
# 保存为dat文件(图像格式)
|
||
for method, result in results.items():
|
||
if result is not None:
|
||
output_file = os.path.join(output_dir, f'clusters_{method}.dat')
|
||
# 传递原始头文件信息用于创建聚类结果的hdr文件
|
||
self.processor.save_single_band_image(result, output_file, method, data_loader.header)
|
||
saved_files.append((method, output_file))
|
||
else:
|
||
# 保存为CSV文件(添加标签列)
|
||
for method, result in results.items():
|
||
if result is not None:
|
||
output_file = os.path.join(output_dir, f'clustered_{method}.csv')
|
||
self._save_csv_with_labels(data_loader.data, result, output_file, method)
|
||
saved_files.append((method, output_file))
|
||
|
||
return saved_files
|
||
|
||
def _save_csv_with_labels(self, original_data: np.ndarray, labels: np.ndarray,
|
||
output_file: str, method: str) -> None:
|
||
"""保存带聚类标签的CSV文件"""
|
||
try:
|
||
# 创建DataFrame
|
||
df = pd.DataFrame(original_data)
|
||
|
||
# 添加聚类标签列
|
||
df['cluster_label'] = labels
|
||
|
||
# 保存到CSV文件
|
||
df.to_csv(output_file, index=False)
|
||
print(f"聚类结果已保存到: {output_file}")
|
||
print(f"数据形状: {df.shape}, 聚类数: {len(np.unique(labels))}")
|
||
|
||
except Exception as e:
|
||
raise IOError(f"保存CSV文件失败: {e}")
|
||
|
||
def print_clustering_summary(self, results: Dict[str, np.ndarray],
|
||
scores: Dict[str, Dict[str, float]]) -> None:
|
||
"""打印聚类总结"""
|
||
print("\n=== 聚类分析总结 ===")
|
||
|
||
successful_methods = [method for method, result in results.items() if result is not None]
|
||
failed_methods = [method for method, result in results.items() if result is None]
|
||
|
||
print(f"成功聚类方法: {len(successful_methods)}/{len(results)}")
|
||
|
||
if successful_methods:
|
||
print("\n聚类结果统计:")
|
||
for method in successful_methods:
|
||
result = results[method]
|
||
unique_labels = np.unique(result)
|
||
n_clusters_found = len(unique_labels)
|
||
print(f" {method}: 发现 {n_clusters_found} 个聚类")
|
||
|
||
# 显示评估分数
|
||
if method in scores and scores[method] is not None:
|
||
method_scores = scores[method]
|
||
print(f" 评估分数:")
|
||
if method_scores.get('silhouette') is not None:
|
||
print(".3f")
|
||
if method_scores.get('calinski_harabasz') is not None:
|
||
print(".3f")
|
||
if method_scores.get('davies_bouldin') is not None:
|
||
print(".3f")
|
||
|
||
if failed_methods:
|
||
print(f"\n失败的方法: {', '.join(failed_methods)}")
|
||
|
||
def visualize_clusters_if_image(self, results: Dict[str, np.ndarray],
|
||
data_loader: DataLoader, output_dir: str) -> None:
|
||
"""如果输入是图像,则生成可视化"""
|
||
if not data_loader.is_image:
|
||
return
|
||
|
||
try:
|
||
vis_path = os.path.join(output_dir, 'cluster_visualization.png')
|
||
|
||
# 将展平的聚类结果reshape回原始图像形状
|
||
reshaped_results = {}
|
||
for method, result in results.items():
|
||
if result is not None and data_loader.data is not None:
|
||
# 获取原始图像的二维形状(不包括波段维度)
|
||
original_shape = data_loader.data.shape[:-1]
|
||
# 将一维聚类结果reshape为二维图像
|
||
reshaped_result = result.reshape(original_shape)
|
||
reshaped_results[method] = reshaped_result
|
||
else:
|
||
reshaped_results[method] = result
|
||
|
||
self.processor.visualize_clusters(reshaped_results, vis_path)
|
||
except Exception as e:
|
||
print(f"可视化失败: {e}")
|
||
|
||
|
||
def main():
|
||
"""主函数:多算法聚类分析"""
|
||
parser = argparse.ArgumentParser(description='多算法聚类分析工具')
|
||
parser.add_argument('input_file', help='输入文件路径 (.hdr高光谱图像 或 .csv文件)')
|
||
|
||
# 数据参数
|
||
parser.add_argument('--csv_spectral_col', '-c', default='wavelength_400',
|
||
help='CSV文件的谱段起始列名 (默认: wavelength_400)')
|
||
|
||
# 聚类参数
|
||
parser.add_argument('--n_clusters', '-n', type=int, default=5,
|
||
help='聚类数量 (默认: 5)')
|
||
parser.add_argument('--methods', '-m', nargs='+',
|
||
choices=['kmeans', 'fuzzy-cmeans', 'gmm', 'hierarchical', 'dbscan',
|
||
'spectral', 'subspace', 'ensemble', 'all'],
|
||
default=['all'],
|
||
help='聚类方法 (默认: all,使用所有方法)')
|
||
parser.add_argument('--output_dir', '-o', default='output',
|
||
help='输出目录 (默认: output)')
|
||
|
||
# 可选参数
|
||
parser.add_argument('--visualize', '-v', action='store_true',
|
||
help='是否生成可视化结果 (仅对图像有效)')
|
||
parser.add_argument('--random_state', '-r', type=int, default=42,
|
||
help='随机种子 (默认: 42)')
|
||
parser.add_argument('--method_params', '-p', type=str, default=None,
|
||
help='各方法的超参数配置,JSON格式字符串,例如: {"kmeans": {"n_init": 20}}')
|
||
|
||
args = parser.parse_args()
|
||
|
||
try:
|
||
# 创建输出目录
|
||
os.makedirs(args.output_dir, exist_ok=True)
|
||
|
||
# 初始化组件
|
||
data_loader = DataLoader()
|
||
|
||
# 解析超参数
|
||
method_params = None
|
||
if args.method_params:
|
||
try:
|
||
method_params = json.loads(args.method_params)
|
||
except json.JSONDecodeError as e:
|
||
raise ValueError(f"超参数配置格式错误: {e}")
|
||
|
||
cluster_manager = ClusteringManager(n_clusters=args.n_clusters,
|
||
random_state=args.random_state,
|
||
method_params=method_params)
|
||
output_manager = OutputManager()
|
||
|
||
# 加载数据
|
||
print(f"加载输入文件: {args.input_file}")
|
||
file_ext = os.path.splitext(args.input_file)[1].lower()
|
||
|
||
if file_ext == '.hdr':
|
||
# 加载高光谱图像
|
||
data, header = data_loader.load_hyperspectral_image(args.input_file)
|
||
data_to_cluster = data.reshape(-1, data.shape[-1]) # 展平为2D
|
||
elif file_ext == '.csv':
|
||
# 加载CSV文件
|
||
data, column_names = data_loader.load_csv_file(args.input_file, args.csv_spectral_col)
|
||
data_to_cluster = data
|
||
else:
|
||
raise ValueError(f"不支持的文件格式: {file_ext}。请使用.hdr或.csv文件")
|
||
|
||
# 确定要使用的聚类方法
|
||
if 'all' in args.methods:
|
||
methods_to_use = list(cluster_manager.algorithms.keys())
|
||
else:
|
||
methods_to_use = args.methods
|
||
|
||
# 执行聚类
|
||
print(f"\n开始聚类分析 (聚类数: {args.n_clusters}, 方法: {methods_to_use})...")
|
||
|
||
if len(methods_to_use) == 1:
|
||
# 单个方法
|
||
method = methods_to_use[0]
|
||
result = cluster_manager.fit_predict(data_to_cluster, method)
|
||
results = {method: result} if result is not None else {}
|
||
else:
|
||
# 多个方法
|
||
results = {}
|
||
for method in methods_to_use:
|
||
result = cluster_manager.fit_predict(data_to_cluster, method)
|
||
if result is not None:
|
||
results[method] = result
|
||
|
||
if not results:
|
||
raise RuntimeError("所有聚类方法都失败了")
|
||
|
||
# 保存结果
|
||
print("\n保存聚类结果...")
|
||
saved_files = output_manager.save_clustering_results(results, args.output_dir, data_loader)
|
||
|
||
# 输出统计信息
|
||
output_manager.print_clustering_summary(results, cluster_manager.scores_)
|
||
|
||
# 可视化(仅对图像)
|
||
if args.visualize and data_loader.is_image:
|
||
print("\n生成可视化...")
|
||
output_manager.visualize_clusters_if_image(results, data_loader, args.output_dir)
|
||
|
||
print("\n✓ 聚类分析完成!")
|
||
print(f"输出目录: {args.output_dir}")
|
||
print(f"生成文件: {len(saved_files)} 个")
|
||
for method, filepath in saved_files:
|
||
print(f" - {method}: {filepath}")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 处理失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
return 0
|
||
|
||
|
||
def run_clustering(input_file: str, n_clusters: int = 5, methods: List[str] = ['all'],
|
||
output_dir: str = 'output', csv_spectral_col: str = 'wavelength_400',
|
||
visualize: bool = False, random_state: int = 42,
|
||
method_params: Dict[str, Dict[str, Any]] = None) -> int:
|
||
"""
|
||
执行聚类分析的函数接口
|
||
|
||
参数:
|
||
input_file: 输入文件路径 (.hdr 或 .csv)
|
||
n_clusters: 聚类数量 (默认: 5)
|
||
methods: 聚类方法列表 (默认: ['all'])
|
||
output_dir: 输出目录 (默认: 'output')
|
||
csv_spectral_col: CSV文件的谱段起始列名 (默认: 'wavelength_400')
|
||
visualize: 是否可视化 (默认: False)
|
||
random_state: 随机种子 (默认: 42)
|
||
method_params: 各方法的超参数配置 (默认: None)
|
||
|
||
返回:
|
||
成功返回0,失败返回1
|
||
"""
|
||
try:
|
||
# 创建输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 初始化组件
|
||
data_loader = DataLoader()
|
||
cluster_manager = ClusteringManager(n_clusters=n_clusters, random_state=random_state,
|
||
method_params=method_params)
|
||
output_manager = OutputManager()
|
||
|
||
# 加载数据
|
||
print(f"加载输入文件: {input_file}")
|
||
file_ext = os.path.splitext(input_file)[1].lower()
|
||
|
||
if file_ext == '.hdr':
|
||
data, header = data_loader.load_hyperspectral_image(input_file)
|
||
data_to_cluster = data.reshape(-1, data.shape[-1])
|
||
elif file_ext == '.csv':
|
||
data, column_names = data_loader.load_csv_file(input_file, csv_spectral_col)
|
||
data_to_cluster = data
|
||
else:
|
||
raise ValueError(f"不支持的文件格式: {file_ext}")
|
||
|
||
# 确定聚类方法
|
||
if 'all' in methods:
|
||
methods_to_use = list(cluster_manager.algorithms.keys())
|
||
else:
|
||
methods_to_use = methods
|
||
|
||
# 执行聚类
|
||
print(f"\n开始聚类分析 (聚类数: {n_clusters}, 方法: {methods_to_use})...")
|
||
|
||
if len(methods_to_use) == 1:
|
||
method = methods_to_use[0]
|
||
result = cluster_manager.fit_predict(data_to_cluster, method)
|
||
results = {method: result} if result is not None else {}
|
||
else:
|
||
results = {}
|
||
for method in methods_to_use:
|
||
result = cluster_manager.fit_predict(data_to_cluster, method)
|
||
if result is not None:
|
||
results[method] = result
|
||
|
||
if not results:
|
||
raise RuntimeError("所有聚类方法都失败了")
|
||
|
||
# 保存结果
|
||
print("\n保存聚类结果...")
|
||
saved_files = output_manager.save_clustering_results(results, output_dir, data_loader)
|
||
|
||
# 输出统计信息
|
||
output_manager.print_clustering_summary(results, cluster_manager.scores_)
|
||
|
||
# 可视化
|
||
if visualize and data_loader.is_image:
|
||
output_manager.visualize_clusters_if_image(results, data_loader, output_dir)
|
||
|
||
print("\n✓ 聚类分析完成!")
|
||
print(f"输出目录: {output_dir}")
|
||
print(f"生成文件: {len(saved_files)} 个")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 处理失败: {e}")
|
||
return 1
|
||
|
||
return 0
|
||
|
||
|
||
def validate_input_file(file_path: str) -> bool:
|
||
"""验证输入文件是否存在且格式正确"""
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"输入文件不存在: {file_path}")
|
||
|
||
file_ext = os.path.splitext(file_path)[1].lower()
|
||
if file_ext not in ['.hdr', '.csv']:
|
||
raise ValueError(f"不支持的文件格式: {file_ext}。请使用.hdr或.csv文件")
|
||
|
||
return True
|
||
|
||
|
||
def validate_clustering_parameters(n_clusters: int, methods: List[str]) -> bool:
|
||
"""验证聚类参数"""
|
||
if n_clusters < 2:
|
||
raise ValueError(f"聚类数量必须至少为2,当前值: {n_clusters}")
|
||
|
||
if n_clusters > 50:
|
||
print(f"警告: 聚类数量较大 ({n_clusters}),可能影响性能")
|
||
|
||
valid_methods = ['kmeans', 'fuzzy-cmeans', 'gmm', 'hierarchical', 'dbscan',
|
||
'spectral', 'subspace', 'ensemble', 'all']
|
||
|
||
for method in methods:
|
||
if method not in valid_methods:
|
||
raise ValueError(f"不支持的聚类方法: {method}。有效方法: {valid_methods}")
|
||
|
||
return True
|
||
|
||
|
||
def validate_csv_file(csv_path: str, spectral_start_col: str) -> bool:
|
||
"""验证CSV文件和谱段列"""
|
||
try:
|
||
df = pd.read_csv(csv_path, nrows=5) # 只读取前几行进行验证
|
||
|
||
if spectral_start_col not in df.columns:
|
||
available_cols = list(df.columns[:10]) # 显示前10列
|
||
raise ValueError(f"谱段起始列 '{spectral_start_col}' 不存在。可用列: {available_cols}")
|
||
|
||
# 检查谱段数据是否为数值型
|
||
spectral_cols = df.columns[df.columns.get_loc(spectral_start_col):]
|
||
for col in spectral_cols[:5]: # 检查前5个谱段列
|
||
if not pd.api.types.is_numeric_dtype(df[col]):
|
||
print(f"警告: 列 '{col}' 不是数值类型")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
raise ValueError(f"CSV文件验证失败: {e}")
|
||
|
||
|
||
def check_dependencies():
|
||
"""检查必要的依赖是否已安装"""
|
||
missing_deps = []
|
||
|
||
try:
|
||
import skfuzzy
|
||
except ImportError:
|
||
missing_deps.append('scikit-fuzzy (用于模糊C均值聚类)')
|
||
|
||
try:
|
||
import spectral
|
||
except ImportError:
|
||
missing_deps.append('spectral (用于高光谱数据处理)')
|
||
|
||
if missing_deps:
|
||
print("警告: 缺少以下依赖包,可能影响某些功能:")
|
||
for dep in missing_deps:
|
||
print(f" - {dep}")
|
||
print("请使用 'pip install <package_name>' 安装")
|
||
|
||
return len(missing_deps) == 0
|
||
|
||
|
||
# if __name__ == '__main__':
|
||
# # 检查依赖
|
||
# check_dependencies()
|
||
#
|
||
# # 运行主程序
|
||
# exit(main())
|
||
# result = run_clustering(
|
||
# input_file=r"C:\Program Files\Spectronon3\_internal\examples\leaf_small.bip.hdr",
|
||
# n_clusters=6,
|
||
# methods=['all'], # 使用所有可用方法
|
||
# output_dir=r'E:\code\spectronon\single_classsfication\tsst',
|
||
# csv_spectral_col='wavelength_400', # 对CSV文件有效
|
||
# visualize=True,
|
||
# random_state=43 # 固定随机种子确保结果可复现
|
||
# ) |