Files
HSI/cluster_method/cluster.py

1085 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import pandas as pd
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering, SpectralClustering
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from sklearn.decomposition import PCA
import os
import argparse
import json
from typing import Tuple, List, Dict, Optional, Union, Any
from dataclasses import dataclass
import warnings
try:
import skfuzzy as fuzz # 模糊聚类
SKFUZZY_AVAILABLE = True
except ImportError:
SKFUZZY_AVAILABLE = False
print("警告: 未安装scikit-fuzzy模糊C均值聚类将不可用")
from sklearn.cluster import MiniBatchKMeans
import itertools
warnings.filterwarnings('ignore')
@dataclass
class ClusteringConfig:
"""聚类分析配置类"""
input_path: Optional[str] = None
method: str = 'kmeans'
n_clusters: int = 5
output_dir: Optional[str] = None
use_scaling: bool = True
scaler_type: str = 'standard'
use_pca: bool = False
pca_components: Optional[int] = None
method_params: Optional[Dict[str, Dict[str, Any]]] = None
def __post_init__(self):
"""参数校验和默认值设置"""
if not self.input_path:
raise ValueError("必须指定输入文件路径(input_path)")
valid_methods = ['kmeans', 'fuzzy-cmeans', 'gmm', 'hierarchical', 'dbscan', 'spectral', 'subspace', 'ensemble']
if self.method not in valid_methods:
raise ValueError(f"不支持的聚类方法: {self.method}。支持的方法: {valid_methods}")
if self.n_clusters <= 0:
raise ValueError("聚类数量必须大于0")
if not self.output_dir:
self.output_dir = './results'
# 从supervize_cluster.py导入距离度量类和图像处理器
from supervize_cluster_method.supervize_cluster import HyperspectralDistanceMetrics, HyperspectralImageProcessor
class DataLoader:
"""数据加载器支持高光谱图像和CSV文件"""
def __init__(self):
self.data = None
self.header = None
self.wavelengths = None
self.is_image = False
self.csv_spectral_start_col = None
def load_hyperspectral_image(self, hdr_path: str) -> Tuple[np.ndarray, Dict]:
"""加载ENVI格式高光谱图像"""
try:
processor = HyperspectralImageProcessor()
data, header = processor.load_image(hdr_path)
self.data = data
self.header = header
self.is_image = True
# 提取波长信息
if 'wavelength' in header:
self.wavelengths = np.array([float(w) for w in header['wavelength']])
return data, header
except Exception as e:
raise IOError(f"加载高光谱图像失败: {e}")
def load_csv_file(self, csv_path: str, spectral_start_col: str) -> Tuple[np.ndarray, List[str]]:
"""加载CSV文件提取光谱数据"""
try:
df = pd.read_csv(csv_path)
if spectral_start_col not in df.columns:
raise ValueError(f"指定的光谱起始列名 '{spectral_start_col}' 不存在于CSV文件中")
# 找到光谱起始列的索引
start_idx = df.columns.get_loc(spectral_start_col)
# 提取光谱数据(从起始列到最后)
spectral_data = df.iloc[:, start_idx:].values
column_names = df.columns[start_idx:].tolist()
# 检查数据有效性
if spectral_data.shape[1] == 0:
raise ValueError("没有找到光谱数据列")
# 处理NaN值
if np.isnan(spectral_data).any():
print("警告: 数据中包含NaN值将用列均值填充")
col_means = np.nanmean(spectral_data, axis=0)
nan_mask = np.isnan(spectral_data)
spectral_data[nan_mask] = col_means[np.where(nan_mask)[1]]
self.data = spectral_data
self.is_image = False
self.csv_spectral_start_col = spectral_start_col
print(f"成功加载CSV文件: 样本数={spectral_data.shape[0]}, 波段数={spectral_data.shape[1]}")
return spectral_data, column_names
except Exception as e:
raise IOError(f"加载CSV文件失败: {e}")
def get_data_info(self) -> Dict[str, Any]:
"""获取数据信息"""
info = {
'is_image': self.is_image,
'data_shape': self.data.shape if self.data is not None else None,
'data_type': str(self.data.dtype) if self.data is not None else None,
}
if self.is_image:
info.update({
'wavelengths': self.wavelengths,
'header_info': self.header
})
else:
info.update({
'spectral_start_col': self.csv_spectral_start_col
})
return info
class ClusteringManager:
"""聚类算法管理器"""
def __init__(self, config: Optional[ClusteringConfig] = None, n_clusters: int = 5, random_state: int = 42,
enable_preprocessing: bool = True, max_samples: int = None,
method_params: Dict[str, Dict[str, Any]] = None):
if config is not None:
# 使用配置对象
self.config = config
self.n_clusters = config.n_clusters
self.random_state = 42 # 默认值
# 从配置对象读取预处理参数
self.enable_preprocessing = getattr(config, 'use_scaling', True)
self.max_samples = None # 默认值
# 读取其他配置参数
self.use_pca = getattr(config, 'use_pca', False)
self.pca_components = getattr(config, 'pca_components', None)
self.scaler_type = getattr(config, 'scaler_type', 'standard')
# 使用配置中的method_params
self.method_params = getattr(config, 'method_params', {}) or {}
else:
# 向后兼容:使用单独的参数
self.config = None
self.n_clusters = n_clusters
self.random_state = random_state
self.enable_preprocessing = enable_preprocessing
self.max_samples = max_samples
self.use_pca = False
self.pca_components = None
self.scaler_type = 'standard'
self.method_params = method_params or {}
self.cluster_centers_ = {}
self.labels_ = {}
self.scores_ = {}
# 设置默认超参数
self._set_default_method_params()
# 预处理对象
self.scaler = None
self.X_preprocessed = None
self.sample_indices = None
# 聚类算法映射
self.algorithms = {
'kmeans': self._kmeans_clustering,
'gmm': self._gmm_clustering,
'fuzzy-cmeans': self._fuzzy_cmeans_clustering,
'hierarchical': self._hierarchical_clustering,
'dbscan': self._dbscan_clustering,
'spectral': self._spectral_clustering,
'subspace': self._subspace_clustering,
'ensemble': self._ensemble_clustering
}
def _set_default_method_params(self) -> None:
"""设置各方法的默认超参数"""
default_params = {
'kmeans': {
'n_init': 10,
'max_iter': 300,
'batch_size': 1000,
'use_minibatch_threshold': 10000
},
'gmm': {
'covariance_type': 'full',
'max_iter': 200,
'n_init_attempts': 10
},
'fuzzy-cmeans': {
'fuzziness': 2.0,
'max_iter': 1000,
'error': 0.005
},
'hierarchical': {
'linkage': 'ward',
'sampling_threshold': 5000,
'n_neighbors': 5
},
'dbscan': {
'eps_percentile': 50,
'min_samples_factor': 0.1,
'n_neighbors': 20
},
'spectral': {
'affinity': 'nearest_neighbors',
'n_neighbors': 10,
'large_dataset_threshold': 2000
},
'subspace': {
'n_components_factor': 0.33,
'max_iter': 300
}
}
# 合并用户提供的参数和默认参数
for method, params in default_params.items():
if method in self.method_params:
params.update(self.method_params[method])
self.method_params[method] = params
def fit_predict(self, X: np.ndarray, method: str, preprocessed: bool = False) -> np.ndarray:
"""使用指定方法进行聚类"""
if method not in self.algorithms:
raise ValueError(f"不支持的聚类方法: {method}")
print(f"正在使用 {method} 方法进行聚类...")
try:
# 预处理步骤:标准化
if not preprocessed and self.enable_preprocessing:
if self.scaler_type == 'standard':
self.scaler = StandardScaler()
elif self.scaler_type == 'minmax':
self.scaler = MinMaxScaler()
else:
self.scaler = StandardScaler() # 默认使用标准标准化
X = self.scaler.fit_transform(X)
# 预处理步骤PCA降维
if not preprocessed and self.use_pca:
from sklearn.decomposition import PCA
if self.pca_components is None:
# 自动确定PCA组件数量
n_components = min(X.shape[1], max(2, int(X.shape[1] * 0.95)))
else:
n_components = min(self.pca_components, X.shape[1])
self.pca = PCA(n_components=n_components, random_state=self.random_state)
X = self.pca.fit_transform(X)
# 获取该方法的超参数
method_params = self.method_params.get(method, {})
labels = self.algorithms[method](X, preprocessed=preprocessed, **method_params)
self.labels_[method] = labels
# 计算聚类评估分数(使用原始数据)
if len(np.unique(labels)) > 1:
try:
# 如果使用了采样,需要恢复原始标签
if self.sample_indices is not None and preprocessed:
full_labels = np.zeros(len(self.sample_indices) if self.sample_indices is not None else X.shape[0], dtype=int)
full_labels[self.sample_indices] = labels
labels_for_scores = full_labels
X_for_scores = self.scaler.inverse_transform(X) if self.scaler else X
else:
labels_for_scores = labels
X_for_scores = self.scaler.inverse_transform(X) if self.scaler and preprocessed else X
self.scores_[method] = self._calculate_clustering_scores(X_for_scores, labels_for_scores)
except:
self.scores_[method] = None
print(f"{method} 聚类完成")
return labels
except Exception as e:
print(f"{method} 聚类失败: {e}")
return None
def _preprocess_data(self, X: np.ndarray) -> np.ndarray:
"""预处理数据:标准化和采样"""
# 数据采样(对于大数据集)
if self.max_samples is not None and X.shape[0] > self.max_samples:
np.random.seed(self.random_state)
self.sample_indices = np.random.choice(X.shape[0], self.max_samples, replace=False)
X = X[self.sample_indices]
print(f"数据采样: {X.shape[0]} 样本 (原始: {X.shape[0] if self.sample_indices is None else len(self.sample_indices)} 样本)")
# 数据标准化
if self.enable_preprocessing:
if self.scaler is None:
self.scaler = StandardScaler()
self.X_preprocessed = self.scaler.fit_transform(X)
X_processed = self.X_preprocessed
else:
X_processed = X
return X_processed
def fit_predict_all(self, X: np.ndarray) -> Dict[str, np.ndarray]:
"""使用所有方法进行聚类"""
results = {}
methods = list(self.algorithms.keys())
print(f"开始使用 {len(methods)} 种聚类方法进行分析...")
# 预处理数据(只做一次)
X_processed = self._preprocess_data(X)
# 并行执行聚类简化版本实际可以考虑使用joblib
for method in methods:
results[method] = self.fit_predict(X_processed, method, preprocessed=True)
return results
def _kmeans_clustering(self, X: np.ndarray, preprocessed: bool = False,
n_init: int = 10, max_iter: int = 300,
batch_size: int = 1000, use_minibatch_threshold: int = 10000) -> np.ndarray:
"""K-Means聚类"""
# 对于大数据集使用MiniBatchKMeans
use_minibatch = X.shape[0] > use_minibatch_threshold
if use_minibatch:
kmeans = MiniBatchKMeans(n_clusters=self.n_clusters, random_state=self.random_state,
batch_size=batch_size, max_iter=max_iter)
else:
kmeans = KMeans(n_clusters=self.n_clusters, random_state=self.random_state,
n_init=n_init, max_iter=max_iter)
labels = kmeans.fit_predict(X)
# 保存聚类中心(需要反标准化)
if preprocessed and self.scaler is not None:
self.cluster_centers_['kmeans'] = self.scaler.inverse_transform(kmeans.cluster_centers_)
else:
self.cluster_centers_['kmeans'] = kmeans.cluster_centers_
return labels
def _fuzzy_cmeans_clustering(self, X: np.ndarray, preprocessed: bool = False,
fuzziness: float = 2.0, max_iter: int = 1000,
error: float = 0.005) -> np.ndarray:
"""模糊C均值聚类"""
if not SKFUZZY_AVAILABLE:
raise ImportError("scikit-fuzzy未安装无法使用模糊C均值聚类")
# 对于大数据集,降低迭代次数和容差
if X.shape[0] > 5000:
maxiter = 500
error_val = 0.01
else:
maxiter = max_iter
error_val = error
# 准备数据scikit-fuzzy 的 cmeans 需要转置的数据格式 (n_features, n_samples)
if preprocessed:
# 如果数据已经预处理过,直接转置
X_for_cmeans = X.T
else:
# 否则进行标准化并转置
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
X_for_cmeans = X_scaled.T
# 执行模糊C均值聚类
cntr, u, _, _, _, _, _ = fuzz.cluster.cmeans(
X_for_cmeans, self.n_clusters, fuzziness, error=error_val, maxiter=maxiter
)
# 获取硬聚类标签(最大隶属度)
labels = np.argmax(u, axis=0)
# 保存聚类中心
# cntr 的形状是 (n_features, n_clusters),转置为 (n_clusters, n_features)
if preprocessed and self.scaler is not None:
# 如果外部进行了预处理,需要反标准化聚类中心
self.cluster_centers_['fuzzy-cmeans'] = self.scaler.inverse_transform(cntr.T)
else:
# 否则直接使用(可能需要根据内部标准化进行调整)
self.cluster_centers_['fuzzy-cmeans'] = cntr.T
return labels
def _gmm_clustering(self, X: np.ndarray, preprocessed: bool = False,
covariance_type: str = 'full', max_iter: int = 200,
n_init_attempts: int = 10) -> np.ndarray:
"""高斯混合模型聚类"""
from sklearn.mixture import GaussianMixture
# 对于大数据集,使用对角协方差矩阵以提高效率
if X.shape[0] > 10000:
covariance_type = 'diag'
max_iter_val = 100
else:
covariance_type = covariance_type
max_iter_val = max_iter
# 尝试不同的初始化次数
best_gmm = None
best_score = -np.inf
n_init_attempts = min(n_init_attempts, max(1, X.shape[0] // 1000))
for init_attempt in range(n_init_attempts):
gmm = GaussianMixture(
n_components=self.n_clusters,
covariance_type=covariance_type,
random_state=self.random_state + init_attempt,
max_iter=max_iter_val,
n_init=1
)
try:
labels = gmm.fit_predict(X)
# 计算BIC分数选择最好的模型
bic_score = gmm.bic(X)
if bic_score > best_score:
best_score = bic_score
best_gmm = gmm
best_labels = labels
except:
continue
if best_gmm is None:
# 如果所有尝试都失败,使用默认设置
gmm = GaussianMixture(
n_components=self.n_clusters,
covariance_type='diag',
random_state=self.random_state,
max_iter=100
)
best_labels = gmm.fit_predict(X)
# 保存聚类中心(使用各组件的均值)
if hasattr(best_gmm, 'means_'):
self.cluster_centers_['gmm'] = best_gmm.means_
return best_labels
def _hierarchical_clustering(self, X: np.ndarray, preprocessed: bool = False,
linkage: str = 'ward', sampling_threshold: int = 5000,
n_neighbors: int = 5) -> np.ndarray:
"""层次聚类"""
# 对于大数据集,使用采样
if X.shape[0] > sampling_threshold:
# 使用采样进行层次聚类,然后传播标签
n_samples = min(sampling_threshold, X.shape[0])
np.random.seed(self.random_state)
sample_indices = np.random.choice(X.shape[0], n_samples, replace=False)
X_sampled = X[sample_indices]
hierarchical = AgglomerativeClustering(
n_clusters=self.n_clusters, linkage=linkage
)
sample_labels = hierarchical.fit_predict(X_sampled)
# 使用KNN将标签传播到所有点
from sklearn.neighbors import KNeighborsClassifier
knn = KNeighborsClassifier(n_neighbors=n_neighbors)
knn.fit(X_sampled, sample_labels)
labels = knn.predict(X)
else:
hierarchical = AgglomerativeClustering(
n_clusters=self.n_clusters, linkage='ward'
)
labels = hierarchical.fit_predict(X)
return labels
def _dbscan_clustering(self, X: np.ndarray, preprocessed: bool = False,
eps_percentile: int = 50, min_samples_factor: float = 0.1,
n_neighbors: int = 20) -> np.ndarray:
"""DBSCAN聚类"""
# 自适应选择eps参数优化版本
if X.shape[0] > 1000:
# 对于大数据集使用采样估计eps
n_samples_for_eps = min(1000, X.shape[0])
np.random.seed(self.random_state)
sample_indices = np.random.choice(X.shape[0], n_samples_for_eps, replace=False)
X_sample = X[sample_indices]
else:
X_sample = X
# 使用更少的邻居来加速计算
from sklearn.neighbors import NearestNeighbors
neigh = NearestNeighbors(n_neighbors=min(n_neighbors, len(X_sample)-1))
nbrs = neigh.fit(X_sample)
distances, indices = nbrs.kneighbors(X_sample)
# 使用k距离图的拐点作为eps
k_distances = np.sort(distances[:, -1])
# 使用更保守的百分位数
eps = np.percentile(k_distances, eps_percentile)
# 对于大数据集增加min_samples
min_samples = max(self.n_clusters, int(np.sqrt(X.shape[0]) * min_samples_factor))
dbscan = DBSCAN(eps=eps, min_samples=min_samples, n_jobs=-1) # 使用并行
labels = dbscan.fit_predict(X)
# DBSCAN可能产生噪声点标签为-1重新标记
unique_labels = np.unique(labels)
if -1 in unique_labels:
# 将噪声点分配到最近的聚类
noise_mask = labels == -1
if np.any(noise_mask):
# 计算噪声点到各聚类的距离
valid_labels = unique_labels[unique_labels != -1]
if len(valid_labels) > 0:
for i in np.where(noise_mask)[0]:
distances_to_centers = []
for label in valid_labels:
center = np.mean(X[labels == label], axis=0)
dist = np.linalg.norm(X[i] - center)
distances_to_centers.append(dist)
labels[i] = valid_labels[np.argmin(distances_to_centers)]
return labels
def _spectral_clustering(self, X: np.ndarray, preprocessed: bool = False,
affinity: str = 'nearest_neighbors', n_neighbors: int = 10,
large_dataset_threshold: int = 2000) -> np.ndarray:
"""谱聚类"""
# 对于大数据集减少邻居数量并使用rbf核
if X.shape[0] > large_dataset_threshold:
n_neighbors_actual = min(n_neighbors // 2, X.shape[0] // 100)
spectral = SpectralClustering(
n_clusters=self.n_clusters, random_state=self.random_state,
affinity=affinity, n_neighbors=n_neighbors_actual,
n_jobs=-1 # 并行计算
)
else:
spectral = SpectralClustering(
n_clusters=self.n_clusters, random_state=self.random_state,
affinity=affinity, n_neighbors=n_neighbors
)
labels = spectral.fit_predict(X)
return labels
def _subspace_clustering(self, X: np.ndarray, preprocessed: bool = False,
n_components_factor: float = 0.33, max_iter: int = 300) -> np.ndarray:
"""基于子空间的聚类使用PCA降维后K-means"""
# 动态选择PCA组件数量
n_components = min(X.shape[1], max(self.n_clusters, int(X.shape[1] * n_components_factor)))
# 使用PCA进行子空间学习
pca = PCA(n_components=n_components, random_state=self.random_state)
X_pca = pca.fit_transform(X)
# 在子空间中进行聚类使用优化后的K-means
if X.shape[0] > 10000:
kmeans = MiniBatchKMeans(n_clusters=self.n_clusters, random_state=self.random_state,
batch_size=1000, max_iter=max_iter)
else:
kmeans = KMeans(n_clusters=self.n_clusters, random_state=self.random_state,
n_init=10, max_iter=max_iter)
labels = kmeans.fit_predict(X_pca)
self.cluster_centers_['subspace'] = kmeans.cluster_centers_
return labels
def _ensemble_clustering(self, X: np.ndarray, preprocessed: bool = False) -> np.ndarray:
"""聚类集成(结合多种算法的结果)"""
# 根据数据集大小选择不同的基础方法
if X.shape[0] > 10000:
# 大数据集:使用计算效率更高的方法
base_methods = ['kmeans', 'subspace']
elif X.shape[0] > 2000:
# 中等数据集
base_methods = ['kmeans', 'gmm', 'subspace']
else:
# 小数据集:可以使用所有方法
base_methods = ['kmeans', 'gmm', 'hierarchical', 'spectral']
base_labels = {}
method_weights = {}
for method in base_methods:
try:
labels = self.algorithms[method](X, preprocessed=preprocessed)
base_labels[method] = labels
# 计算每个方法的置信度权重(基于轮廓系数)
if len(np.unique(labels)) > 1:
try:
silhouette = silhouette_score(X, labels)
method_weights[method] = max(0.1, silhouette + 1) # 确保权重为正
except:
method_weights[method] = 1.0
else:
method_weights[method] = 0.5
except Exception as e:
print(f"集成聚类中方法 {method} 失败: {e}")
continue
if not base_labels:
# 如果所有基础方法都失败使用K-means
return self._kmeans_clustering(X, preprocessed=preprocessed)
# 使用加权投票结合结果
final_labels = np.zeros(X.shape[0], dtype=int)
n_methods = len(base_labels)
for i in range(X.shape[0]):
# 收集所有方法的预测
predictions = []
weights = []
for method, labels in base_labels.items():
predictions.append(labels[i])
weights.append(method_weights.get(method, 1.0))
# 加权投票
unique_preds = np.unique(predictions)
weighted_votes = np.zeros(len(unique_preds))
for j, pred in enumerate(unique_preds):
pred_mask = np.array(predictions) == pred
weighted_votes[j] = np.sum(np.array(weights)[pred_mask])
final_labels[i] = unique_preds[np.argmax(weighted_votes)]
# 重新映射标签以确保连续性
unique_final = np.unique(final_labels)
if len(unique_final) > self.n_clusters:
# 如果发现的聚类数过多,使用层次聚类进行后处理
from sklearn.cluster import AgglomerativeClustering
agg = AgglomerativeClustering(n_clusters=self.n_clusters, linkage='ward')
final_labels = agg.fit_predict(final_labels.reshape(-1, 1))
else:
# 正常映射
label_mapping = {old: new for new, old in enumerate(unique_final)}
final_labels = np.array([label_mapping[label] for label in final_labels])
return final_labels
def _calculate_clustering_scores(self, X: np.ndarray, labels: np.ndarray) -> Dict[str, float]:
"""计算聚类评估分数"""
scores = {}
try:
# 轮廓系数
scores['silhouette'] = silhouette_score(X, labels)
except:
scores['silhouette'] = None
try:
# Calinski-Harabasz指数
scores['calinski_harabasz'] = calinski_harabasz_score(X, labels)
except:
scores['calinski_harabasz'] = None
try:
# Davies-Bouldin指数
scores['davies_bouldin'] = davies_bouldin_score(X, labels)
except:
scores['davies_bouldin'] = None
return scores
class OutputManager:
"""输出管理器"""
def __init__(self):
self.processor = HyperspectralImageProcessor()
def save_clustering_results(self, results: Dict[str, np.ndarray],
output_dir: str, data_loader: DataLoader) -> List[str]:
"""保存聚类结果"""
saved_files = []
os.makedirs(output_dir, exist_ok=True)
if data_loader.is_image:
# 保存为dat文件图像格式
for method, result in results.items():
if result is not None:
output_file = os.path.join(output_dir, f'clusters_{method}.dat')
# 传递原始头文件信息用于创建聚类结果的hdr文件
self.processor.save_single_band_image(result, output_file, method, data_loader.header)
saved_files.append((method, output_file))
else:
# 保存为CSV文件添加标签列
for method, result in results.items():
if result is not None:
output_file = os.path.join(output_dir, f'clustered_{method}.csv')
self._save_csv_with_labels(data_loader.data, result, output_file, method)
saved_files.append((method, output_file))
return saved_files
def _save_csv_with_labels(self, original_data: np.ndarray, labels: np.ndarray,
output_file: str, method: str) -> None:
"""保存带聚类标签的CSV文件"""
try:
# 创建DataFrame
df = pd.DataFrame(original_data)
# 添加聚类标签列
df['cluster_label'] = labels
# 保存到CSV文件
df.to_csv(output_file, index=False)
print(f"聚类结果已保存到: {output_file}")
print(f"数据形状: {df.shape}, 聚类数: {len(np.unique(labels))}")
except Exception as e:
raise IOError(f"保存CSV文件失败: {e}")
def print_clustering_summary(self, results: Dict[str, np.ndarray],
scores: Dict[str, Dict[str, float]]) -> None:
"""打印聚类总结"""
print("\n=== 聚类分析总结 ===")
successful_methods = [method for method, result in results.items() if result is not None]
failed_methods = [method for method, result in results.items() if result is None]
print(f"成功聚类方法: {len(successful_methods)}/{len(results)}")
if successful_methods:
print("\n聚类结果统计:")
for method in successful_methods:
result = results[method]
unique_labels = np.unique(result)
n_clusters_found = len(unique_labels)
print(f" {method}: 发现 {n_clusters_found} 个聚类")
# 显示评估分数
if method in scores and scores[method] is not None:
method_scores = scores[method]
print(f" 评估分数:")
if method_scores.get('silhouette') is not None:
print(".3f")
if method_scores.get('calinski_harabasz') is not None:
print(".3f")
if method_scores.get('davies_bouldin') is not None:
print(".3f")
if failed_methods:
print(f"\n失败的方法: {', '.join(failed_methods)}")
def visualize_clusters_if_image(self, results: Dict[str, np.ndarray],
data_loader: DataLoader, output_dir: str) -> None:
"""如果输入是图像,则生成可视化"""
if not data_loader.is_image:
return
try:
vis_path = os.path.join(output_dir, 'cluster_visualization.png')
# 将展平的聚类结果reshape回原始图像形状
reshaped_results = {}
for method, result in results.items():
if result is not None and data_loader.data is not None:
# 获取原始图像的二维形状(不包括波段维度)
original_shape = data_loader.data.shape[:-1]
# 将一维聚类结果reshape为二维图像
reshaped_result = result.reshape(original_shape)
reshaped_results[method] = reshaped_result
else:
reshaped_results[method] = result
self.processor.visualize_clusters(reshaped_results, vis_path)
except Exception as e:
print(f"可视化失败: {e}")
def main():
"""主函数:多算法聚类分析"""
parser = argparse.ArgumentParser(description='多算法聚类分析工具')
parser.add_argument('input_file', help='输入文件路径 (.hdr高光谱图像 或 .csv文件)')
# 数据参数
parser.add_argument('--csv_spectral_col', '-c', default='wavelength_400',
help='CSV文件的谱段起始列名 (默认: wavelength_400)')
# 聚类参数
parser.add_argument('--n_clusters', '-n', type=int, default=5,
help='聚类数量 (默认: 5)')
parser.add_argument('--methods', '-m', nargs='+',
choices=['kmeans', 'fuzzy-cmeans', 'gmm', 'hierarchical', 'dbscan',
'spectral', 'subspace', 'ensemble', 'all'],
default=['all'],
help='聚类方法 (默认: all使用所有方法)')
parser.add_argument('--output_dir', '-o', default='output',
help='输出目录 (默认: output)')
# 可选参数
parser.add_argument('--visualize', '-v', action='store_true',
help='是否生成可视化结果 (仅对图像有效)')
parser.add_argument('--random_state', '-r', type=int, default=42,
help='随机种子 (默认: 42)')
parser.add_argument('--method_params', '-p', type=str, default=None,
help='各方法的超参数配置JSON格式字符串例如: {"kmeans": {"n_init": 20}}')
args = parser.parse_args()
try:
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 初始化组件
data_loader = DataLoader()
# 解析超参数
method_params = None
if args.method_params:
try:
method_params = json.loads(args.method_params)
except json.JSONDecodeError as e:
raise ValueError(f"超参数配置格式错误: {e}")
cluster_manager = ClusteringManager(n_clusters=args.n_clusters,
random_state=args.random_state,
method_params=method_params)
output_manager = OutputManager()
# 加载数据
print(f"加载输入文件: {args.input_file}")
file_ext = os.path.splitext(args.input_file)[1].lower()
if file_ext == '.hdr':
# 加载高光谱图像
data, header = data_loader.load_hyperspectral_image(args.input_file)
data_to_cluster = data.reshape(-1, data.shape[-1]) # 展平为2D
elif file_ext == '.csv':
# 加载CSV文件
data, column_names = data_loader.load_csv_file(args.input_file, args.csv_spectral_col)
data_to_cluster = data
else:
raise ValueError(f"不支持的文件格式: {file_ext}。请使用.hdr或.csv文件")
# 确定要使用的聚类方法
if 'all' in args.methods:
methods_to_use = list(cluster_manager.algorithms.keys())
else:
methods_to_use = args.methods
# 执行聚类
print(f"\n开始聚类分析 (聚类数: {args.n_clusters}, 方法: {methods_to_use})...")
if len(methods_to_use) == 1:
# 单个方法
method = methods_to_use[0]
result = cluster_manager.fit_predict(data_to_cluster, method)
results = {method: result} if result is not None else {}
else:
# 多个方法
results = {}
for method in methods_to_use:
result = cluster_manager.fit_predict(data_to_cluster, method)
if result is not None:
results[method] = result
if not results:
raise RuntimeError("所有聚类方法都失败了")
# 保存结果
print("\n保存聚类结果...")
saved_files = output_manager.save_clustering_results(results, args.output_dir, data_loader)
# 输出统计信息
output_manager.print_clustering_summary(results, cluster_manager.scores_)
# 可视化(仅对图像)
if args.visualize and data_loader.is_image:
print("\n生成可视化...")
output_manager.visualize_clusters_if_image(results, data_loader, args.output_dir)
print("\n✓ 聚类分析完成!")
print(f"输出目录: {args.output_dir}")
print(f"生成文件: {len(saved_files)}")
for method, filepath in saved_files:
print(f" - {method}: {filepath}")
except Exception as e:
print(f"✗ 处理失败: {e}")
import traceback
traceback.print_exc()
return 1
return 0
def run_clustering(input_file: str, n_clusters: int = 5, methods: List[str] = ['all'],
output_dir: str = 'output', csv_spectral_col: str = 'wavelength_400',
visualize: bool = False, random_state: int = 42,
method_params: Dict[str, Dict[str, Any]] = None) -> int:
"""
执行聚类分析的函数接口
参数:
input_file: 输入文件路径 (.hdr 或 .csv)
n_clusters: 聚类数量 (默认: 5)
methods: 聚类方法列表 (默认: ['all'])
output_dir: 输出目录 (默认: 'output')
csv_spectral_col: CSV文件的谱段起始列名 (默认: 'wavelength_400')
visualize: 是否可视化 (默认: False)
random_state: 随机种子 (默认: 42)
method_params: 各方法的超参数配置 (默认: None)
返回:
成功返回0失败返回1
"""
try:
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 初始化组件
data_loader = DataLoader()
cluster_manager = ClusteringManager(n_clusters=n_clusters, random_state=random_state,
method_params=method_params)
output_manager = OutputManager()
# 加载数据
print(f"加载输入文件: {input_file}")
file_ext = os.path.splitext(input_file)[1].lower()
if file_ext == '.hdr':
data, header = data_loader.load_hyperspectral_image(input_file)
data_to_cluster = data.reshape(-1, data.shape[-1])
elif file_ext == '.csv':
data, column_names = data_loader.load_csv_file(input_file, csv_spectral_col)
data_to_cluster = data
else:
raise ValueError(f"不支持的文件格式: {file_ext}")
# 确定聚类方法
if 'all' in methods:
methods_to_use = list(cluster_manager.algorithms.keys())
else:
methods_to_use = methods
# 执行聚类
print(f"\n开始聚类分析 (聚类数: {n_clusters}, 方法: {methods_to_use})...")
if len(methods_to_use) == 1:
method = methods_to_use[0]
result = cluster_manager.fit_predict(data_to_cluster, method)
results = {method: result} if result is not None else {}
else:
results = {}
for method in methods_to_use:
result = cluster_manager.fit_predict(data_to_cluster, method)
if result is not None:
results[method] = result
if not results:
raise RuntimeError("所有聚类方法都失败了")
# 保存结果
print("\n保存聚类结果...")
saved_files = output_manager.save_clustering_results(results, output_dir, data_loader)
# 输出统计信息
output_manager.print_clustering_summary(results, cluster_manager.scores_)
# 可视化
if visualize and data_loader.is_image:
output_manager.visualize_clusters_if_image(results, data_loader, output_dir)
print("\n✓ 聚类分析完成!")
print(f"输出目录: {output_dir}")
print(f"生成文件: {len(saved_files)}")
except Exception as e:
print(f"✗ 处理失败: {e}")
return 1
return 0
def validate_input_file(file_path: str) -> bool:
"""验证输入文件是否存在且格式正确"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"输入文件不存在: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in ['.hdr', '.csv']:
raise ValueError(f"不支持的文件格式: {file_ext}。请使用.hdr或.csv文件")
return True
def validate_clustering_parameters(n_clusters: int, methods: List[str]) -> bool:
"""验证聚类参数"""
if n_clusters < 2:
raise ValueError(f"聚类数量必须至少为2当前值: {n_clusters}")
if n_clusters > 50:
print(f"警告: 聚类数量较大 ({n_clusters}),可能影响性能")
valid_methods = ['kmeans', 'fuzzy-cmeans', 'gmm', 'hierarchical', 'dbscan',
'spectral', 'subspace', 'ensemble', 'all']
for method in methods:
if method not in valid_methods:
raise ValueError(f"不支持的聚类方法: {method}。有效方法: {valid_methods}")
return True
def validate_csv_file(csv_path: str, spectral_start_col: str) -> bool:
"""验证CSV文件和谱段列"""
try:
df = pd.read_csv(csv_path, nrows=5) # 只读取前几行进行验证
if spectral_start_col not in df.columns:
available_cols = list(df.columns[:10]) # 显示前10列
raise ValueError(f"谱段起始列 '{spectral_start_col}' 不存在。可用列: {available_cols}")
# 检查谱段数据是否为数值型
spectral_cols = df.columns[df.columns.get_loc(spectral_start_col):]
for col in spectral_cols[:5]: # 检查前5个谱段列
if not pd.api.types.is_numeric_dtype(df[col]):
print(f"警告: 列 '{col}' 不是数值类型")
return True
except Exception as e:
raise ValueError(f"CSV文件验证失败: {e}")
def check_dependencies():
"""检查必要的依赖是否已安装"""
missing_deps = []
try:
import skfuzzy
except ImportError:
missing_deps.append('scikit-fuzzy (用于模糊C均值聚类)')
try:
import spectral
except ImportError:
missing_deps.append('spectral (用于高光谱数据处理)')
if missing_deps:
print("警告: 缺少以下依赖包,可能影响某些功能:")
for dep in missing_deps:
print(f" - {dep}")
print("请使用 'pip install <package_name>' 安装")
return len(missing_deps) == 0
# if __name__ == '__main__':
# # 检查依赖
# check_dependencies()
#
# # 运行主程序
# exit(main())
# result = run_clustering(
# input_file=r"C:\Program Files\Spectronon3\_internal\examples\leaf_small.bip.hdr",
# n_clusters=6,
# methods=['all'], # 使用所有可用方法
# output_dir=r'E:\code\spectronon\single_classsfication\tsst',
# csv_spectral_col='wavelength_400', # 对CSV文件有效
# visualize=True,
# random_state=43 # 固定随机种子确保结果可复现
# )