Files
WQ_GUI/map_beifeng.py
2025-12-05 10:31:03 +08:00

2562 lines
113 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
import geopandas as gpd
from pyproj import CRS, Transformer
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.ticker import FuncFormatter
from matplotlib_scalebar.scalebar import ScaleBar
from scipy.interpolate import griddata
from scipy import ndimage
from scipy.spatial.distance import cdist
from scipy.spatial import ConvexHull
from shapely.geometry import Point, Polygon
import rasterio
from rasterio.features import geometry_mask
import warnings
import math
import os
import random
import glob
# 尝试导入pykrige可选依赖
try:
from pykrige.ok import OrdinaryKriging
PYKRIGE_AVAILABLE = True
except ImportError:
PYKRIGE_AVAILABLE = False
print("警告: pykrige未安装Kriging不确定性计算将不可用")
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = False
# 参数到颜色映射的字典
PARAMS_CMAP = {
"Chlorophyll": "YlGnBu_r",
"COD": "coolwarm",
"DO": "RdYlBu",
"pH": "Spectral",
"Temperature": "turbo",
"spCond": "cividis",
"Turbidity": "YlOrBr",
"TDS": "inferno",
"Cl-": "RdYlBu_r",
"NO3-N": "YlOrRd",
"NH3-N": "magma",
"BGA": "viridis",
"TT": "RdYlBu_r"
}
class ContentMapper:
def __init__(self, input_crs='EPSG:32651', output_crs='EPSG:4326'):
"""
初始化ContentMapper - 生成平滑的含量分布图
本类专门用于生成平滑、均匀的颜色分布图,而不是显示离散的采样点。
通过高密度网格插值和多级颜色映射,创建连续的颜色过渡效果。
Parameters:
-----------
input_crs : str
输入坐标系,默认为'EPSG:32651' (WGS_1984_UTM_Zone_51N)
output_crs : str
输出坐标系,默认为'EPSG:4326' (WGS84)
"""
# 定义坐标转换器
self.input_crs = input_crs
self.output_crs = output_crs
self.transformer = Transformer.from_crs(
CRS.from_string(input_crs),
CRS.from_string(output_crs),
always_xy=True
)
# 参数到颜色映射的字典
self.params_cmap = PARAMS_CMAP.copy()
# 所有可用的matplotlib colormap列表用于随机选择
self.available_cmaps = ['viridis', 'plasma', 'inferno', 'magma', 'cividis',
'coolwarm', 'RdYlBu', 'Spectral', 'YlGnBu_r', 'YlOrBr',
'YlOrRd', 'turbo', 'RdYlBu_r', 'cool', 'hot', 'jet']
print(f"坐标转换设置: {input_crs} -> {output_crs}")
def _extract_param_name(self, csv_file):
"""
从CSV文件名或内容中提取参数名称
Parameters:
-----------
csv_file : str
CSV文件路径
Returns:
--------
param_name : str or None
提取的参数名称如果未找到则返回None
"""
print(f"[调试] 开始从文件 {csv_file} 中提取参数名称")
print(f"[调试] 字典中的参数键: {list(self.params_cmap.keys())}")
# 从文件名中提取(去除路径和扩展名)
file_name = os.path.basename(csv_file)
file_name_no_ext = os.path.splitext(file_name)[0]
print(f"[调试] 文件名(不含扩展名): {file_name_no_ext}")
# 尝试从文件名中匹配参数名称(不区分大小写)
file_name_upper = file_name_no_ext.upper()
for param in self.params_cmap.keys():
param_upper = param.upper()
if param_upper in file_name_upper:
print(f"从文件名中识别到参数: {param} (匹配到 '{param_upper}''{file_name_upper}' 中)")
return param # 返回字典中的原始键(保持大小写)
# 如果文件名中没有找到尝试从CSV内容中提取检查列名
try:
df = pd.read_csv(csv_file, encoding='utf-8', nrows=0) # 只读取列名
columns = [col.upper() for col in df.columns]
print(f"[调试] CSV列名: {list(df.columns)}")
for param in self.params_cmap.keys():
param_upper = param.upper()
# 检查列名中是否包含参数名称
for col in columns:
if param_upper in col or col in param_upper:
print(f"从CSV列名中识别到参数: {param} (匹配到列名 '{col}')")
return param # 返回字典中的原始键(保持大小写)
except Exception as e:
print(f"读取CSV列名时出错: {e}")
print(f"未能在文件 {csv_file} 中识别参数名称")
print(f"[调试] 可用的参数名: {list(self.params_cmap.keys())}")
return None
def _get_colormap(self, param_name=None):
"""
根据参数名称获取对应的colormap
Parameters:
-----------
param_name : str, optional
参数名称。如果为None或不在映射中则随机选择一个colormap
Returns:
--------
cmap : str
颜色映射名称
"""
# 打印调试信息
print(f"[调试] _get_colormap 被调用param_name={param_name}")
print(f"[调试] 当前字典中的键: {list(self.params_cmap.keys())}")
if param_name:
# 首先尝试精确匹配(区分大小写)
if param_name in self.params_cmap:
cmap = self.params_cmap[param_name]
print(f"使用参数 '{param_name}' 对应的颜色映射: {cmap}")
return cmap
# 如果精确匹配失败,尝试不区分大小写的匹配
param_name_upper = param_name.upper()
for key in self.params_cmap.keys():
if key.upper() == param_name_upper:
cmap = self.params_cmap[key]
print(f"使用参数 '{key}' (不区分大小写匹配 '{param_name}') 对应的颜色映射: {cmap}")
return cmap
# 如果都不匹配,随机选择
cmap = random.choice(self.available_cmaps)
print(f"警告: 参数 '{param_name}' 不在映射中,随机选择颜色映射: {cmap}")
print(f"可用的参数名: {list(self.params_cmap.keys())}")
return cmap
else:
# 随机选择一个colormap
cmap = random.choice(self.available_cmaps)
print(f"未指定参数名称,随机选择颜色映射: {cmap}")
return cmap
def _check_point_distribution(self, points):
"""检查数据点的几何分布"""
print("正在检查数据点分布...")
# 检查是否有重复点
unique_points = np.unique(points, axis=0)
if len(unique_points) < len(points):
print(f"警告:发现 {len(points) - len(unique_points)} 个重复数据点")
# 检查点是否共线
if len(unique_points) >= 3:
# 计算前三个点构成的三角形面积
p1, p2, p3 = unique_points[:3]
area = 0.5 * abs((p2[0] - p1[0]) * (p3[1] - p1[1]) - (p3[0] - p1[0]) * (p2[1] - p1[1]))
if area < 1e-10: # 面积太小,可能共线
print("警告:前三个数据点可能共线")
# 尝试找到不共线的点
for i in range(3, len(unique_points)):
p4 = unique_points[i]
area = 0.5 * abs((p2[0] - p1[0]) * (p4[1] - p1[1]) - (p4[0] - p1[0]) * (p2[1] - p1[1]))
if area > 1e-10:
print(f"找到非共线点,使用点 {i}")
break
else:
print("警告:所有数据点可能都共线,这会导致插值失败")
# 检查坐标范围
x_range = points[:, 0].max() - points[:, 0].min()
y_range = points[:, 1].max() - points[:, 1].min()
if x_range < 1e-6 or y_range < 1e-6:
print(f"警告:坐标范围很小 (X范围: {x_range:.2e}, Y范围: {y_range:.2e})")
print("这可能导致插值数值不稳定")
return unique_points
def _calculate_idw_uncertainty(self, points, grid_xx, grid_yy, power=2, n_neighbors=10):
"""
计算IDW插值的不确定性距离加权和的倒数
Parameters:
-----------
points : np.ndarray
数据点坐标 (n_points, 2)
grid_xx : np.ndarray
网格X坐标
grid_yy : np.ndarray
网格Y坐标
power : float
IDW的幂参数默认为2
n_neighbors : int
使用的最近邻点数默认为10
Returns:
--------
uncertainty : np.ndarray
不确定性网格距离加权和的倒数归一化到0-1
"""
print("正在计算IDW不确定性...")
grid_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
# 使用向量化计算提高性能
# 计算所有网格点到所有数据点的距离矩阵
distances = cdist(grid_points, points)
# 对每个网格点找到最近的n_neighbors个点
n_neighbors = min(n_neighbors, len(points))
nearest_distances = np.partition(distances, n_neighbors-1, axis=1)[:, :n_neighbors]
# 避免除零
nearest_distances = np.maximum(nearest_distances, 1e-10)
# 计算距离加权和(向量化)
weights = 1.0 / (nearest_distances ** power)
weight_sums = np.sum(weights, axis=1)
# 不确定性 = 1 / 距离加权和(加权和越大,不确定性越小)
valid_mask = weight_sums > 0
uncertainty_flat = np.full(len(grid_points), np.nan)
uncertainty_flat[valid_mask] = 1.0 / weight_sums[valid_mask]
uncertainty = uncertainty_flat.reshape(grid_xx.shape)
# 归一化到0-1范围
valid_uncertainty = uncertainty[~np.isnan(uncertainty)]
if len(valid_uncertainty) > 0:
min_unc = valid_uncertainty.min()
max_unc = valid_uncertainty.max()
if max_unc > min_unc:
uncertainty = (uncertainty - min_unc) / (max_unc - min_unc)
print("IDW不确定性计算完成")
return uncertainty
def _calculate_rbf_uncertainty(self, points, grid_xx, grid_yy, n_neighbors=10):
"""
计算RBF插值的不确定性基于距离的倒数
Parameters:
-----------
points : np.ndarray
数据点坐标 (n_points, 2)
grid_xx : np.ndarray
网格X坐标
grid_yy : np.ndarray
网格Y坐标
n_neighbors : int
使用的最近邻点数默认为10
Returns:
--------
uncertainty : np.ndarray
不确定性网格归一化到0-1
"""
print("正在计算RBF不确定性...")
grid_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
# 使用向量化计算提高性能
distances = cdist(grid_points, points)
# 对每个网格点找到最近的n_neighbors个点
n_neighbors = min(n_neighbors, len(points))
nearest_distances = np.partition(distances, n_neighbors-1, axis=1)[:, :n_neighbors]
# 使用平均距离作为不确定性指标(距离越大,不确定性越大)
mean_distances = np.mean(nearest_distances, axis=1)
uncertainty = mean_distances.reshape(grid_xx.shape)
# 归一化到0-1范围
valid_uncertainty = uncertainty[~np.isnan(uncertainty)]
if len(valid_uncertainty) > 0:
min_unc = valid_uncertainty.min()
max_unc = valid_uncertainty.max()
if max_unc > min_unc:
uncertainty = (uncertainty - min_unc) / (max_unc - min_unc)
print("RBF不确定性计算完成")
return uncertainty
def _calculate_kriging_uncertainty(self, points, values, grid_xx, grid_yy):
"""
计算Kriging插值的方差不确定性
Parameters:
-----------
points : np.ndarray
数据点坐标 (n_points, 2)
values : np.ndarray
数据值
grid_xx : np.ndarray
网格X坐标
grid_yy : np.ndarray
网格Y坐标
Returns:
--------
uncertainty : np.ndarray
Kriging方差网格归一化到0-1
"""
if not PYKRIGE_AVAILABLE:
print("警告: pykrige未安装无法计算Kriging不确定性将使用IDW方法")
return self._calculate_idw_uncertainty(points, grid_xx, grid_yy)
print("正在计算Kriging不确定性...")
try:
# 创建Kriging插值器
ok = OrdinaryKriging(
points[:, 0], points[:, 1], values,
variogram_model='linear', # 使用线性变异函数模型
verbose=False,
enable_plotting=False
)
# 计算网格点的方差
grid_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
z, ss = ok.execute('grid', grid_points[:, 0], grid_points[:, 1])
# ss是方差转换为不确定性网格
uncertainty = ss.reshape(grid_xx.shape)
# 归一化到0-1范围
valid_uncertainty = uncertainty[~np.isnan(uncertainty)]
if len(valid_uncertainty) > 0:
min_unc = valid_uncertainty.min()
max_unc = valid_uncertainty.max()
if max_unc > min_unc:
uncertainty = (uncertainty - min_unc) / (max_unc - min_unc)
print("Kriging不确定性计算完成")
return uncertainty
except Exception as e:
print(f"Kriging不确定性计算失败: {e}将使用IDW方法")
return self._calculate_idw_uncertainty(points, grid_xx, grid_yy)
def _fill_boundary_blanks_with_distance_diffusion(self, grid_content, grid_xx, grid_yy, mask,
boundary_gdf, max_diffusion_distance=None,
power=2, n_neighbors=15):
"""
使用距离扩散方法填充边界附近的空白区域
Parameters:
-----------
grid_content : np.ndarray
插值网格数据
grid_xx : np.ndarray
网格X坐标
grid_yy : np.ndarray
网格Y坐标
mask : np.ndarray
边界掩膜True表示边界内
boundary_gdf : gpd.GeoDataFrame
边界几何数据
max_diffusion_distance : float, optional
最大扩散距离单位与坐标相同。如果为None自动计算为网格分辨率的5倍
power : float, default=2
IDW距离衰减幂参数
n_neighbors : int, default=15
使用的最近邻点数
Returns:
--------
grid_content : np.ndarray
填充后的网格数据
"""
print("正在使用距离扩散方法填充边界空白区域...")
# 找到边界内的空白区域
nan_mask = np.isnan(grid_content)
within_boundary_nan = nan_mask & mask
if not np.any(within_boundary_nan):
print("边界内没有空白区域需要填充")
return grid_content
blank_count = np.sum(within_boundary_nan)
print(f"发现 {blank_count} 个边界内的空白点,开始距离扩散填充...")
# 找到边界内有效值的点
valid_mask = ~nan_mask & mask
if np.sum(valid_mask) == 0:
print("警告:边界内没有有效值,无法进行距离扩散")
return grid_content
# 计算网格分辨率(用于确定最大扩散距离)
if max_diffusion_distance is None:
# 自动计算:使用网格点之间的平均距离
dx = np.abs(grid_xx[0, 1] - grid_xx[0, 0]) if grid_xx.shape[1] > 1 else 1.0
dy = np.abs(grid_yy[1, 0] - grid_yy[0, 0]) if grid_xx.shape[0] > 1 else 1.0
avg_resolution = (dx + dy) / 2.0
max_diffusion_distance = avg_resolution * 5.0 # 5倍网格分辨率
print(f"自动计算最大扩散距离: {max_diffusion_distance:.6f}")
# 准备有效数据点
valid_points = np.column_stack((grid_xx[valid_mask], grid_yy[valid_mask]))
valid_values = grid_content[valid_mask]
# 准备空白点
blank_points = np.column_stack((grid_xx[within_boundary_nan], grid_yy[within_boundary_nan]))
print(f"使用 {len(valid_points)} 个有效点填充 {len(blank_points)} 个空白点...")
# 使用向量化计算距离矩阵
distances = cdist(blank_points, valid_points)
# 对每个空白点找到最近的n_neighbors个有效点
n_neighbors = min(n_neighbors, len(valid_points))
# 应用最大扩散距离限制
if max_diffusion_distance > 0:
# 只考虑在最大扩散距离内的点
# 对于每个空白点,找到在扩散距离内的最近邻
filled_values = np.full(len(blank_points), np.nan)
global_mean = np.nanmean(valid_values)
for i in range(len(blank_points)):
point_distances = distances[i, :]
valid_idx = point_distances <= max_diffusion_distance
if np.any(valid_idx):
# 找到最近的n_neighbors个点在扩散距离内
valid_dist = point_distances[valid_idx]
valid_vals = valid_values[valid_idx]
# 如果有效点数量超过n_neighbors只取最近的n_neighbors个
if len(valid_dist) > n_neighbors:
nearest_idx = np.argpartition(valid_dist, n_neighbors-1)[:n_neighbors]
valid_dist = valid_dist[nearest_idx]
valid_vals = valid_vals[nearest_idx]
# 避免除零
valid_dist = np.maximum(valid_dist, 1e-10)
# 计算IDW权重
weights = 1.0 / (valid_dist ** power)
weight_sum = np.sum(weights)
if weight_sum > 0:
# 距离加权平均
filled_values[i] = np.sum(weights * valid_vals) / weight_sum
else:
filled_values[i] = global_mean
else:
# 如果该点不在任何有效点的扩散距离内,使用全局平均值
filled_values[i] = global_mean
else:
# 不使用距离限制对所有点进行IDW插值批量处理以提高效率
# 对每个空白点找到最近的n_neighbors个点
nearest_indices = np.argpartition(distances, n_neighbors-1, axis=1)[:, :n_neighbors]
# 批量提取距离和值
nearest_dists = np.take_along_axis(distances, nearest_indices, axis=1)
nearest_vals = valid_values[nearest_indices]
# 避免除零
nearest_dists = np.maximum(nearest_dists, 1e-10)
# 批量计算IDW权重
weights = 1.0 / (nearest_dists ** power)
weight_sums = np.sum(weights, axis=1)
# 批量计算加权平均值
filled_values = np.sum(weights * nearest_vals, axis=1) / weight_sums
# 处理可能的NaN如果weight_sum为0
nan_mask = np.isnan(filled_values) | (weight_sums == 0)
if np.any(nan_mask):
filled_values[nan_mask] = np.nanmean(valid_values)
# 填充空白点
grid_content[within_boundary_nan] = filled_values
# 检查填充结果
filled_count = np.sum(~np.isnan(filled_values))
print(f"距离扩散填充完成:成功填充 {filled_count} / {blank_count} 个空白点")
return grid_content
def _calculate_model_variance_uncertainty(self, points, uncertainty_values, grid_xx, grid_yy):
"""
插值模型输出方差MC Dropout等到网格
Parameters:
-----------
points : np.ndarray
数据点坐标 (n_points, 2)
uncertainty_values : np.ndarray
数据点的不确定性值(方差)
grid_xx : np.ndarray
网格X坐标
grid_yy : np.ndarray
网格Y坐标
Returns:
--------
uncertainty : np.ndarray
不确定性网格归一化到0-1
"""
print("正在插值模型方差到网格...")
# 使用线性插值将不确定性值插值到网格
grid_uncertainty = griddata(
points, uncertainty_values, (grid_xx, grid_yy),
method='linear', fill_value=np.nan
)
# 如果有NaN用最近邻填充
if np.any(np.isnan(grid_uncertainty)):
grid_nearest = griddata(
points, uncertainty_values, (grid_xx, grid_yy),
method='nearest'
)
nan_mask = np.isnan(grid_uncertainty)
grid_uncertainty[nan_mask] = grid_nearest[nan_mask]
# 归一化到0-1范围
valid_uncertainty = grid_uncertainty[~np.isnan(grid_uncertainty)]
if len(valid_uncertainty) > 0:
min_unc = valid_uncertainty.min()
max_unc = valid_uncertainty.max()
if max_unc > min_unc:
grid_uncertainty = (grid_uncertainty - min_unc) / (max_unc - min_unc)
print("模型方差插值完成")
return grid_uncertainty
def _perform_interpolation(self, points, values, grid_xx, grid_yy):
"""执行空间插值"""
print(f"插值输入检查:")
print(f" - 数据点数量: {len(points)}")
print(f" - 数据值范围: {values.min():.4f} - {values.max():.4f}")
print(f" - 网格大小: {grid_xx.shape}")
print(f" - 坐标系: {self.output_crs}")
# 检查数据的有效性
finite_mask = np.isfinite(values)
if not np.all(finite_mask):
print(f"警告:发现 {np.sum(~finite_mask)} 个无效数据值,将被移除")
points = points[finite_mask]
values = values[finite_mask]
if len(points) < 3:
raise ValueError(f"有效数据点不足3个当前{len(points)}个)")
try:
# 首先尝试使用线性插值
print("正在尝试线性插值...")
grid_content = griddata(
points, values, (grid_xx, grid_yy),
method='linear', fill_value=np.nan
)
# 检查线性插值结果
valid_linear = ~np.isnan(grid_content)
valid_count = np.sum(valid_linear)
print(f"线性插值结果:有效点数 {valid_count} / {grid_content.size}")
if valid_count > 0:
print(f"线性插值成功,有效区域覆盖率: {valid_count / grid_content.size * 100:.1f}%")
# 如果有NaN值用最近邻插值填充
nan_count = np.sum(np.isnan(grid_content))
if nan_count > 0:
print(f"正在用最近邻插值填充 {nan_count} 个缺失值...")
grid_nearest = griddata(
points, values, (grid_xx, grid_yy),
method='nearest'
)
# 只填充线性插值的NaN区域
nan_mask = np.isnan(grid_content)
grid_content[nan_mask] = grid_nearest[nan_mask]
print("缺失值填充完成")
# 最终检查
final_valid = ~np.isnan(grid_content)
print(f"最终有效点数: {np.sum(final_valid)} / {grid_content.size}")
return grid_content
else:
print("线性插值失败,尝试最近邻插值...")
except Exception as e:
print(f"线性插值失败: {e}")
print("尝试最近邻插值...")
try:
# 使用最近邻插值作为备选方案
print("执行最近邻插值...")
grid_content = griddata(
points, values, (grid_xx, grid_yy),
method='nearest'
)
valid_count = np.sum(~np.isnan(grid_content))
print(f"最近邻插值成功,有效点数: {valid_count}")
if valid_count == 0:
raise ValueError("最近邻插值也没有产生有效结果")
return grid_content
except Exception as e:
print(f"最近邻插值也失败: {e}")
# 对于地理坐标系,尝试更简单的方法
if self.output_crs == 'EPSG:4326':
print("地理坐标系检测到,尝试简化插值...")
try:
# 创建一个基于距离的简单插值
grid_content = np.full(grid_xx.shape, np.nan)
# 为每个网格点找到最近的数据点
for i in range(grid_xx.shape[0]):
for j in range(grid_xx.shape[1]):
grid_x, grid_y = grid_xx[i, j], grid_yy[i, j]
# 计算到所有数据点的距离
distances = np.sqrt((points[:, 0] - grid_x) ** 2 + (points[:, 1] - grid_y) ** 2)
nearest_idx = np.argmin(distances)
# 如果距离不是太远,就使用该值
if distances[nearest_idx] < (grid_xx.max() - grid_xx.min()) * 0.1: # 10%的范围内
grid_content[i, j] = values[nearest_idx]
valid_count = np.sum(~np.isnan(grid_content))
print(f"简化插值完成,有效点数: {valid_count}")
if valid_count > 0:
return grid_content
else:
raise ValueError("简化插值也没有产生有效结果")
except Exception as e3:
print(f"简化插值失败: {e3}")
print("尝试立方插值作为最后手段...")
try:
# 最后尝试立方插值
grid_content = griddata(
points, values, (grid_xx, grid_yy),
method='cubic', fill_value=np.nan
)
# 如果立方插值有NaN用最近邻填充
if np.any(np.isnan(grid_content)):
print("用最近邻插值填充立方插值的NaN值...")
grid_nearest = griddata(
points, values, (grid_xx, grid_yy),
method='nearest'
)
nan_mask = np.isnan(grid_content)
grid_content[nan_mask] = grid_nearest[nan_mask]
valid_count = np.sum(~np.isnan(grid_content))
print(f"立方插值成功,有效点数: {valid_count}")
return grid_content
except Exception as e4:
print(f"立方插值也失败: {e4}")
print(f"所有插值方法都失败")
raise ValueError("无法完成空间插值,请检查数据点的分布和数值")
def read_csv_data(self, csv_file, uncertainty_col=None):
"""
读取CSV文件并进行坐标转换
Parameters:
-----------
csv_file : str
CSV文件路径
uncertainty_col : str, optional
不确定性数据列名。如果为None将自动检测包含'variance''uncertainty''std''sigma''var''mc_dropout'的列
Returns:
--------
gdf : gpd.GeoDataFrame
包含坐标和含量数据的GeoDataFrame如果找到不确定性列会包含'uncertainty'
"""
print("正在读取CSV文件...")
df = pd.read_csv(csv_file, encoding='utf-8')
# 假设前三列分别是经度、纬度、含量
if df.shape[1] < 3:
raise ValueError("CSV文件必须至少包含3列经度、纬度、含量")
# 获取列名
lon_col = df.columns[0]
lat_col = df.columns[1]
content_col = df.columns[2]
print(f"检测到列名:经度({lon_col}),纬度({lat_col}),含量({content_col})")
# 自动检测不确定性列
if uncertainty_col is None:
uncertainty_keywords = ['variance', 'uncertainty', 'std', 'sigma', 'var', 'mc_dropout']
for col in df.columns:
col_lower = col.lower()
if any(keyword in col_lower for keyword in uncertainty_keywords):
uncertainty_col = col
print(f"自动检测到不确定性列: {uncertainty_col}")
break
# 坐标转换
print(f"正在进行坐标转换: {self.input_crs} -> {self.output_crs}")
transformed_x, transformed_y = self.transformer.transform(
df[lon_col].values,
df[lat_col].values
)
# 创建GeoDataFrame
geometry = [Point(x, y) for x, y in zip(transformed_x, transformed_y)]
gdf = gpd.GeoDataFrame(
df,
geometry=geometry,
crs=self.output_crs
)
gdf['proj_x'] = transformed_x
gdf['proj_y'] = transformed_y
gdf['content'] = df[content_col]
# 如果找到不确定性列添加到GeoDataFrame
if uncertainty_col and uncertainty_col in df.columns:
gdf['uncertainty'] = df[uncertainty_col].values
print(f"已加载不确定性数据列: {uncertainty_col}")
print(f"不确定性值范围: {gdf['uncertainty'].min():.4f} - {gdf['uncertainty'].max():.4f}")
print(f"成功读取 {len(gdf)} 个数据点")
return gdf
def read_boundary_shapefile(self, shp_file):
"""读取边界shapefile"""
print("正在读取边界文件...")
boundary = gpd.read_file(shp_file)
# 确保边界文件使用目标投影坐标系
if boundary.crs != self.output_crs:
print(f"正在转换边界文件坐标系到 {self.output_crs}...")
boundary = boundary.to_crs(self.output_crs)
print(f"边界文件包含 {len(boundary)} 个要素")
return boundary
def _identify_edge_points(self, points_gdf):
"""
识别边缘采样点(使用凸包方法)
Parameters:
-----------
points_gdf : gpd.GeoDataFrame
采样点GeoDataFrame
Returns:
--------
edge_indices : np.ndarray
边缘点的索引数组
"""
print("正在识别边缘采样点...")
# 获取所有点的坐标
points = np.column_stack((points_gdf['proj_x'].values, points_gdf['proj_y'].values))
if len(points) < 3:
print("警告采样点数量少于3个无法识别边缘点")
return np.array([])
try:
# 使用凸包识别边缘点
hull = ConvexHull(points)
edge_indices = hull.vertices
print(f"识别到 {len(edge_indices)} 个边缘采样点(共 {len(points)} 个点)")
return edge_indices
except Exception as e:
print(f"识别边缘点时出错: {e},将使用所有点作为边缘点")
return np.arange(len(points))
def _expand_edge_points(self, points_gdf, boundary_gdf, resolution=100, expand_ratio=0.05):
"""
对边缘采样点进行外扩处理,外扩到整个图像的边界(包括外扩后的边界)
按照指定的间距resolution生成外扩点铺满整个画面
Parameters:
-----------
points_gdf : gpd.GeoDataFrame
原始采样点GeoDataFrame
boundary_gdf : gpd.GeoDataFrame
水域掩膜边界GeoDataFrame
resolution : float, default=100
外扩点的间距(单位与坐标相同),与插值网格分辨率一致
expand_ratio : float, default=0.05
边界外扩比例与create_interpolation_grid中的expand_ratio一致
Returns:
--------
expanded_gdf : gpd.GeoDataFrame
外扩后的采样点GeoDataFrame
"""
print(f"正在对边缘采样点进行外扩处理(按照 {resolution} 的间距外扩到整个图像边界)...")
# 识别边缘点
edge_indices = self._identify_edge_points(points_gdf)
if len(edge_indices) == 0:
print("未识别到边缘点,跳过外扩处理")
return points_gdf.copy()
# 获取水域掩膜的边界范围
boundary_bounds = boundary_gdf.total_bounds # [minx, miny, maxx, maxy]
mask_minx, mask_miny, mask_maxx, mask_maxy = boundary_bounds
# 计算范围大小
width = mask_maxx - mask_minx
height = mask_maxy - mask_miny
# 外扩边界与create_interpolation_grid中的逻辑一致确保外扩到整个图像范围
expand_x = width * expand_ratio
expand_y = height * expand_ratio
image_minx = mask_minx - expand_x
image_maxx = mask_maxx + expand_x
image_miny = mask_miny - expand_y
image_maxy = mask_maxy + expand_y
# 获取所有点的坐标
points = np.column_stack((points_gdf['proj_x'].values, points_gdf['proj_y'].values))
# 计算点集的范围和中心
x_min, x_max = points[:, 0].min(), points[:, 0].max()
y_min, y_max = points[:, 1].min(), points[:, 1].max()
center = np.array([(x_min + x_max) / 2, (y_min + y_max) / 2])
# 存储新添加的点
new_points_list = []
new_data_list = []
# 对每个边缘点进行外扩
for edge_idx in edge_indices:
edge_point = points[edge_idx]
# 计算从中心到边缘点的方向向量
direction = edge_point - center
distance_to_center = np.linalg.norm(direction)
if distance_to_center < 1e-10:
# 如果边缘点就是中心点,跳过
continue
# 归一化方向向量
direction_unit = direction / distance_to_center
# 计算该方向与水域掩膜边界的交点
# 使用射线法:从边缘点沿方向延伸,找到与边界框的交点
max_distance = 0
# 检查与四个边界的交点(使用整个图像的范围,包括外扩后的边界)
# 上边界 (y = image_maxy)
if direction_unit[1] > 1e-10: # 向上
t = (image_maxy - edge_point[1]) / direction_unit[1]
if t > 0:
intersect_x = edge_point[0] + direction_unit[0] * t
if image_minx <= intersect_x <= image_maxx:
max_distance = max(max_distance, t)
# 下边界 (y = image_miny)
if direction_unit[1] < -1e-10: # 向下
t = (image_miny - edge_point[1]) / direction_unit[1]
if t > 0:
intersect_x = edge_point[0] + direction_unit[0] * t
if image_minx <= intersect_x <= image_maxx:
max_distance = max(max_distance, t)
# 右边界 (x = image_maxx)
if direction_unit[0] > 1e-10: # 向右
t = (image_maxx - edge_point[0]) / direction_unit[0]
if t > 0:
intersect_y = edge_point[1] + direction_unit[1] * t
if image_miny <= intersect_y <= image_maxy:
max_distance = max(max_distance, t)
# 左边界 (x = image_minx)
if direction_unit[0] < -1e-10: # 向左
t = (image_minx - edge_point[0]) / direction_unit[0]
if t > 0:
intersect_y = edge_point[1] + direction_unit[1] * t
if image_miny <= intersect_y <= image_maxy:
max_distance = max(max_distance, t)
# 如果找到了边界交点按照resolution间距创建外扩点
if max_distance > 1e-10:
# 从边缘点到边界的距离
distance_to_boundary = max_distance
# 计算需要生成的外扩点数量按照resolution间距
# 使用ceil确保能铺满到边界
n_points = int(np.ceil(distance_to_boundary / resolution))
# 从边缘点开始按照resolution间距生成点直到边界
for i in range(1, n_points + 1):
# 计算外扩距离从边缘点开始按照resolution间距
expand_distance = i * resolution
# 如果超过边界距离,使用边界距离作为最后一个点
if expand_distance >= distance_to_boundary:
expand_distance = distance_to_boundary
# 计算新点位置
new_point = edge_point + direction_unit * expand_distance
# 确保新点在图像范围内(包括外扩后的边界)
new_point[0] = np.clip(new_point[0], image_minx, image_maxx)
new_point[1] = np.clip(new_point[1], image_miny, image_maxy)
# 创建新点的数据(复制边缘点的所有属性)
new_row = points_gdf.iloc[edge_idx].copy()
new_row['proj_x'] = new_point[0]
new_row['proj_y'] = new_point[1]
# 更新geometry
new_row['geometry'] = Point(new_point[0], new_point[1])
new_points_list.append(new_point)
new_data_list.append(new_row)
# 如果已经到达边界,停止生成
if expand_distance >= distance_to_boundary:
break
# 合并原始点和外扩点
if len(new_data_list) > 0:
# 创建新点的GeoDataFrame
expanded_gdf = gpd.GeoDataFrame(new_data_list, crs=points_gdf.crs)
# 合并原始点和外扩点使用gpd.concat以确保geometry列正确处理
result_gdf = gpd.GeoDataFrame(pd.concat([points_gdf, expanded_gdf], ignore_index=True), crs=points_gdf.crs)
print(f"外扩完成:原始点 {len(points_gdf)} 个,边缘点 {len(edge_indices)} 个,"
f"新增外扩点 {len(new_data_list)} 个(间距 {resolution}),总计 {len(result_gdf)} 个点")
print(f"水域掩膜范围: X[{mask_minx:.2f}, {mask_maxx:.2f}], Y[{mask_miny:.2f}, {mask_maxy:.2f}]")
print(f"图像范围(含外扩): X[{image_minx:.2f}, {image_maxx:.2f}], Y[{image_miny:.2f}, {image_maxy:.2f}]")
return result_gdf
else:
print("未生成外扩点,返回原始点集")
return points_gdf.copy()
def create_interpolation_grid(self, points_gdf, boundary_gdf, resolution=100, expand_ratio=0.05,
uncertainty_method='auto', calculate_uncertainty=True,
use_distance_diffusion=True, max_diffusion_distance=None,
diffusion_power=2, diffusion_n_neighbors=15):
"""
创建插值网格
Parameters:
-----------
expand_ratio : float, default=0.05
边界外扩比例5%),确保图像边界不完全挨着地图
uncertainty_method : str, default='auto'
不确定性计算方法:'auto', 'kriging', 'idw', 'rbf', 'model_variance'
calculate_uncertainty : bool, default=True
是否计算不确定性网格
use_distance_diffusion : bool, default=True
是否使用距离扩散方法填充边界空白区域
max_diffusion_distance : float, optional
最大扩散距离单位与坐标相同。如果为None自动计算为网格分辨率的5倍
diffusion_power : float, default=2
距离扩散的IDW幂参数值越大距离衰减越快
diffusion_n_neighbors : int, default=15
距离扩散使用的最近邻点数
Returns:
--------
grid_xx, grid_yy, grid_content, bounds : tuple
如果calculate_uncertainty=False
grid_xx, grid_yy, grid_content, bounds, grid_uncertainty : tuple
如果calculate_uncertainty=True
"""
print("正在创建插值网格...")
# 获取边界范围
bounds = boundary_gdf.total_bounds
minx, miny, maxx, maxy = bounds
print(f"原始边界范围: X({minx:.6f} - {maxx:.6f}), Y({miny:.6f} - {maxy:.6f})")
# 计算范围大小
width = maxx - minx
height = maxy - miny
# 外扩边界,确保图像不完全挨着地图
expand_x = width * expand_ratio
expand_y = height * expand_ratio
minx -= expand_x
maxx += expand_x
miny -= expand_y
maxy += expand_y
print(f"外扩后边界范围: X({minx:.6f} - {maxx:.6f}), Y({miny:.6f} - {maxy:.6f})")
print(f"外扩比例: {expand_ratio * 100:.1f}%")
if self.output_crs == 'EPSG:4326':
print(f"区域尺寸: 宽度={width:.6f}°, 高度={height:.6f}°")
# 对于地理坐标系,需要调整分辨率单位(度)
# 1度约等于111公里所以100米约等于0.0009度
resolution_deg = resolution / 111000.0 # 将米转换为度
print(f"网格分辨率: {resolution}m ≈ {resolution_deg:.6f}°")
else:
print(f"区域尺寸: 宽度={width:.2f}m, 高度={height:.2f}m")
resolution_deg = resolution
# 检查分辨率是否合理
min_grid_points = 50 # 增加最少网格点数以获得更平滑的插值效果
if self.output_crs == 'EPSG:4326':
# 地理坐标系的网格点计算
grid_points_x = max(int(width / resolution_deg), min_grid_points)
grid_points_y = max(int(height / resolution_deg), min_grid_points)
else:
# 投影坐标系的网格点计算
grid_points_x = max(int(width / resolution), min_grid_points)
grid_points_y = max(int(height / resolution), min_grid_points)
# 确保网格足够密集以获得平滑效果
grid_points_x = max(grid_points_x, 100)
grid_points_y = max(grid_points_y, 100)
# 创建网格
grid_x = np.linspace(minx, maxx, grid_points_x)
grid_y = np.linspace(miny, maxy, grid_points_y)
grid_xx, grid_yy = np.meshgrid(grid_x, grid_y)
print(f"网格大小: {grid_xx.shape[1]} x {grid_xx.shape[0]} (宽 x 高)")
# 检查网格大小
if grid_xx.shape[0] < 2 or grid_xx.shape[1] < 2:
raise ValueError(f"网格尺寸太小 {grid_xx.shape},无法进行插值。请检查数据范围和分辨率设置。")
# 准备插值数据(使用原始点+外扩点的合并数据)
points = np.column_stack((points_gdf['proj_x'], points_gdf['proj_y']))
values = points_gdf['content'].values
print(f"插值数据点数量: {len(points)}(包含原始采样点和外扩点)")
print(f"数据点范围: X({points[:, 0].min():.6f} - {points[:, 0].max():.6f}), "
f"Y({points[:, 1].min():.6f} - {points[:, 1].max():.6f})")
print(f"含量值范围: {values.min():.4f} - {values.max():.4f}")
print(f"含量值统计: 平均={values.mean():.4f}, 标准差={values.std():.4f}")
# 检查数据点数量
if len(points) < 3:
raise ValueError("插值需要至少3个数据点")
# 检查数据点的几何分布
self._check_point_distribution(points)
# 执行插值(先对整个网格插值,包括边界外)
print("正在执行空间插值(整个网格,包括边界外)...")
grid_content = self._perform_interpolation(points, values, grid_xx, grid_yy)
# 创建边界掩膜(用于识别边界内外)
print("正在识别边界区域...")
# 创建掩膜
mask_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
mask_geometry = [Point(x, y) for x, y in mask_points]
mask_gdf = gpd.GeoDataFrame(geometry=mask_geometry, crs=self.output_crs)
# 检查哪些点在边界内
within_boundary = mask_gdf.within(boundary_gdf.unary_union)
mask = within_boundary.values.reshape(grid_xx.shape)
# 找到边界边缘上的点(在边界内,但靠近边界)
print("正在提取边界边缘值并填充边界外区域...")
# 方法:找到边界内有效值的边缘点,然后填充到边界外
# 1. 先填充边界内的NaN使用距离扩散方法
nan_mask = np.isnan(grid_content)
within_boundary_nan = nan_mask & mask
if np.any(within_boundary_nan):
if use_distance_diffusion:
# 使用距离扩散方法填充边界内的空白区域
grid_content = self._fill_boundary_blanks_with_distance_diffusion(
grid_content, grid_xx, grid_yy, mask, boundary_gdf,
max_diffusion_distance=max_diffusion_distance,
power=diffusion_power,
n_neighbors=diffusion_n_neighbors
)
else:
# 使用传统的最近邻插值方法
print(f"填充边界内的 {np.sum(within_boundary_nan)} 个NaN点使用最近邻插值...")
valid_mask = ~nan_mask & mask
if np.sum(valid_mask) > 0:
valid_points = np.column_stack((grid_xx[valid_mask], grid_yy[valid_mask]))
valid_values = grid_content[valid_mask]
nan_points = np.column_stack((grid_xx[within_boundary_nan], grid_yy[within_boundary_nan]))
filled_values = griddata(
valid_points, valid_values, nan_points,
method='nearest'
)
grid_content[within_boundary_nan] = filled_values
print(f"边界内填充完成")
# 2. 找到边界边缘的值(边界内但靠近边界外的点)
# 使用形态学操作找到边界边缘
boundary_mask_binary = mask.astype(int)
# 创建边界外掩膜
outside_mask = ~mask
# 找到边界边缘(在边界内,但相邻有边界外的点)
# 对边界外区域进行膨胀,找到边界边缘
kernel = np.ones((3, 3), dtype=bool)
dilated_outside = ndimage.binary_dilation(outside_mask, structure=kernel)
edge_mask = mask & dilated_outside # 边界内但靠近边界外的点
# 3. 提取边缘值,填充到边界外
if np.any(edge_mask):
edge_values = grid_content[edge_mask]
edge_valid = ~np.isnan(edge_values)
if np.any(edge_valid):
# 使用边缘的有效值填充边界外
edge_mean = np.nanmean(edge_values)
print(f"边界边缘平均值: {edge_mean:.4f}")
# 将边缘值填充到边界外的所有NaN点
outside_nan = outside_mask & np.isnan(grid_content)
if np.any(outside_nan):
# 使用最近邻插值从边缘值填充
edge_points = np.column_stack((grid_xx[edge_mask & ~np.isnan(grid_content)],
grid_yy[edge_mask & ~np.isnan(grid_content)]))
if len(edge_points) > 0:
edge_vals = grid_content[edge_mask & ~np.isnan(grid_content)]
outside_points = np.column_stack((grid_xx[outside_nan], grid_yy[outside_nan]))
outside_filled = griddata(
edge_points, edge_vals, outside_points,
method='nearest'
)
grid_content[outside_nan] = outside_filled
print(f"已填充边界外的 {np.sum(~np.isnan(outside_filled))} 个点")
else:
# 如果没有边缘值,使用边缘平均值填充
grid_content[outside_nan] = edge_mean
print(f"使用边缘平均值填充边界外的 {np.sum(outside_nan)} 个点")
else:
print("边界外区域已全部填充")
else:
# 如果边缘没有有效值,使用全局平均值填充边界外
global_mean = np.nanmean(grid_content[mask])
if not np.isnan(global_mean):
grid_content[outside_mask & np.isnan(grid_content)] = global_mean
print(f"使用全局平均值 {global_mean:.4f} 填充边界外")
else:
# 如果没有找到边缘,直接使用边界内的平均值填充边界外
mean_in_boundary = np.nanmean(grid_content[mask])
if not np.isnan(mean_in_boundary):
grid_content[outside_mask & np.isnan(grid_content)] = mean_in_boundary
print(f"使用边界内平均值 {mean_in_boundary:.4f} 填充边界外")
print("整个画面已铺满,边界外区域已用边缘值填充")
# 最终检查:确保边界内所有区域都有值
final_check_nan = np.isnan(grid_content) & mask
if np.any(final_check_nan):
print(f"警告: 仍有 {np.sum(final_check_nan)} 个边界内的点未填充,使用平均值填充...")
if np.sum(~np.isnan(grid_content) & mask) > 0:
mean_value = np.nanmean(grid_content[mask])
grid_content[final_check_nan] = mean_value
print(f" 使用平均值 {mean_value:.4f} 填充剩余 {np.sum(final_check_nan)} 个点")
else:
# 如果边界内完全没有有效值,使用全局平均值
global_mean = np.nanmean(grid_content)
if not np.isnan(global_mean):
grid_content[final_check_nan] = global_mean
else:
grid_content[final_check_nan] = 0
print(" 使用全局平均值填充")
else:
print("边界内所有区域已完全填充")
# 检查插值结果
valid_data = ~np.isnan(grid_content)
valid_count = np.sum(valid_data)
print(f"有效插值点数量: {valid_count} / {grid_content.size}")
if valid_count == 0:
raise ValueError("边界掩膜后没有有效数据点,请检查数据点是否在边界范围内")
if valid_count < 4:
print("警告:有效数据点很少,可能影响绘图效果")
# 输出插值结果的统计信息
valid_values = grid_content[valid_data]
print(
f"插值后数据统计: 最小值={valid_values.min():.4f}, 最大值={valid_values.max():.4f}, 平均值={valid_values.mean():.4f}")
# 计算不确定性网格
grid_uncertainty = None
if calculate_uncertainty:
print("\n开始计算不确定性网格...")
# 确定使用的不确定性方法
if uncertainty_method == 'auto':
# 自动选择如果有模型方差数据使用model_variance否则尝试kriging失败则用idw
if 'uncertainty' in points_gdf.columns:
uncertainty_method = 'model_variance'
print("检测到模型方差数据使用model_variance方法")
elif PYKRIGE_AVAILABLE:
uncertainty_method = 'kriging'
print("使用Kriging方法计算不确定性")
else:
uncertainty_method = 'idw'
print("使用IDW方法计算不确定性")
# 根据方法计算不确定性
if uncertainty_method == 'model_variance':
if 'uncertainty' not in points_gdf.columns:
print("警告: 未找到不确定性数据列改用IDW方法")
uncertainty_method = 'idw'
else:
uncertainty_values = points_gdf['uncertainty'].values
# 过滤无效值
valid_unc_mask = np.isfinite(uncertainty_values)
if np.sum(valid_unc_mask) > 0:
grid_uncertainty = self._calculate_model_variance_uncertainty(
points[valid_unc_mask], uncertainty_values[valid_unc_mask],
grid_xx, grid_yy
)
else:
print("警告: 不确定性数据无效改用IDW方法")
uncertainty_method = 'idw'
if uncertainty_method == 'kriging':
grid_uncertainty = self._calculate_kriging_uncertainty(points, values, grid_xx, grid_yy)
elif uncertainty_method == 'idw':
grid_uncertainty = self._calculate_idw_uncertainty(points, grid_xx, grid_yy)
elif uncertainty_method == 'rbf':
grid_uncertainty = self._calculate_rbf_uncertainty(points, grid_xx, grid_yy)
# 应用边界掩膜到不确定性网格
if grid_uncertainty is not None:
# 边界外的区域设为NaN
grid_uncertainty[~mask] = np.nan
valid_unc = grid_uncertainty[~np.isnan(grid_uncertainty)]
if len(valid_unc) > 0:
print(f"不确定性统计: 最小值={valid_unc.min():.4f}, 最大值={valid_unc.max():.4f}, 平均值={valid_unc.mean():.4f}")
else:
print("警告: 不确定性网格中没有有效数据")
# 返回外扩后的bounds
expanded_bounds = np.array([minx, miny, maxx, maxy])
if calculate_uncertainty and grid_uncertainty is not None:
return grid_xx, grid_yy, grid_content, expanded_bounds, grid_uncertainty
else:
return grid_xx, grid_yy, grid_content, expanded_bounds
def create_content_map(self, points_gdf, boundary_gdf, grid_xx, grid_yy,
grid_content, bounds, output_file='content_map.png',
show_sample_points=False, base_map_tif=None,
grid_uncertainty=None, show_uncertainty=True,
uncertainty_alpha=0.5, uncertainty_threshold=0.5,
uncertainty_cmap='Reds', cmap='viridis'):
"""
创建含量图
Parameters:
-----------
base_map_tif : str, optional
TIF正射底图文件路径。如果提供将在水域掩膜外显示底图
grid_uncertainty : np.ndarray, optional
不确定性网格
show_uncertainty : bool, default=True
是否显示不确定性叠加层
uncertainty_alpha : float, default=0.5
不确定性叠加层透明度0-1
uncertainty_threshold : float, default=0.5
不确定性显示阈值0-1只显示高于此阈值的不确定性区域
uncertainty_cmap : str, default='Reds'
不确定性颜色映射
cmap : str, default='viridis'
含量数据的颜色映射
"""
print("正在生成含量图...")
# 检查网格数据
print(f"网格形状: {grid_content.shape}")
# 创建边界掩膜(用于绘图时只显示边界内)
print("创建边界掩膜用于绘图...")
try:
# 创建网格点的GeoDataFrame
grid_points = gpd.GeoDataFrame(
geometry=[Point(x, y) for x, y in zip(grid_xx.flatten(), grid_yy.flatten())],
crs=points_gdf.crs
)
# 检查哪些点在边界内
within_boundary = grid_points.within(boundary_gdf.unary_union)
mask = within_boundary.values.reshape(grid_xx.shape)
print(f"边界内点数: {np.sum(mask)} / {mask.size}")
except Exception as e:
print(f"创建边界掩膜时出现错误: {e},继续绘图...")
mask = np.ones_like(grid_content, dtype=bool) # 如果失败,显示全部
valid_data = ~np.isnan(grid_content)
if np.sum(valid_data) == 0:
raise ValueError("没有有效的插值数据用于绘图")
# 计算数据统计
valid_values = grid_content[valid_data]
print(
f"插值结果统计: 最小值={valid_values.min():.4f}, 最大值={valid_values.max():.4f}, 平均值={valid_values.mean():.4f}")
print(f"有效数据点数量: {np.sum(valid_data)} / {grid_content.size}")
# 检查数据范围
data_range = valid_values.max() - valid_values.min()
print(f"数据范围: {data_range:.6f}")
if data_range == 0:
print("警告:所有数据值都相同,将使用单一颜色显示")
# 创建图形
fig, ax = plt.subplots(figsize=(12, 10))
# 如果提供了底图,先绘制底图(在水域掩膜外)
if base_map_tif is not None:
try:
print(f"正在加载底图: {base_map_tif}")
self._add_base_map(ax, base_map_tif, bounds, mask, grid_xx, grid_yy, boundary_gdf)
print("底图加载成功")
except Exception as e:
print(f"加载底图失败: {e},将跳过底图显示")
# 设置颜色映射参数
im = None
try:
if data_range > 0:
# 设置颜色范围,确保有足够的对比度
vmin = valid_values.min()
vmax = valid_values.max()
# 如果范围很小,稍微扩展一下以增加对比度
if data_range < 1e-6:
center = valid_values.mean()
expansion = max(abs(center) * 0.01, 1e-6) # 扩展1%或最小值
vmin = center - expansion
vmax = center + expansion
print(f"颜色映射范围: {vmin:.6f} - {vmax:.6f}")
# 方法1尝试使用contourf
try:
print("尝试使用contourf绘制...")
# 使用掩膜数组:边界外的数据被掩膜掉,只显示边界内
# mask已经在前面创建好了
masked_data = np.ma.masked_where(~mask, grid_content)
# 创建更多等级数以获得更平滑的颜色过渡
levels = np.linspace(vmin, vmax, 100) # 创建100个等级以获得平滑效果
im = ax.contourf(grid_xx, grid_yy, masked_data,
levels=levels, cmap=cmap, alpha=0.9,
vmin=vmin, vmax=vmax, extend='both')
print("contourf绘制成功")
# 可选择性添加等值线(默认不添加,以保持平滑效果)
# 如果需要等值线,可以取消注释下面的代码
# try:
# contour_levels = np.linspace(vmin, vmax, 11)
# contours = ax.contour(grid_xx, grid_yy, grid_content,
# levels=contour_levels, colors='white',
# alpha=0.3, linewidths=0.5)
# ax.clabel(contours, inline=True, fontsize=8, fmt='%.3f')
# print("等值线添加成功")
# except Exception as e:
# print(f"等值线绘制失败: {e}")
except Exception as e:
print(f"contourf失败: {e}")
# 方法2使用pcolormesh
try:
print("尝试使用pcolormesh绘制...")
# 使用掩膜数组:边界外的数据被掩膜掉,只显示边界内
# mask已经在前面创建好了
masked_data = np.ma.masked_where(~mask, grid_content)
im = ax.pcolormesh(grid_xx, grid_yy, masked_data,
cmap=cmap, alpha=0.9,
vmin=vmin, vmax=vmax, shading='gouraud') # 使用gouraud平滑着色
print("pcolormesh绘制成功")
except Exception as e2:
print(f"pcolormesh也失败: {e2}")
raise e2
else:
# 所有值相同的情况
print("使用单一颜色填充(所有值相同)")
# 创建一个简单的填充
single_value = valid_values[0]
im = ax.contourf(grid_xx, grid_yy, grid_content,
levels=[single_value - 0.001, single_value + 0.001],
cmap=cmap, alpha=0.8)
except Exception as e:
print(f"主要绘图方法失败,尝试备选方案: {e}")
# 备选方案1imshow
try:
print("尝试使用imshow...")
# 处理NaN值
display_data = grid_content.copy()
nan_mask = np.isnan(display_data)
if np.any(nan_mask):
# 用平均值填充NaN
display_data[nan_mask] = valid_values.mean()
if data_range > 0:
vmin = valid_values.min()
vmax = valid_values.max()
im = ax.imshow(display_data,
extent=[grid_xx.min(), grid_xx.max(),
grid_yy.min(), grid_yy.max()],
cmap=cmap, alpha=0.8, origin='lower',
vmin=vmin, vmax=vmax, aspect='auto')
else:
im = ax.imshow(display_data,
extent=[grid_xx.min(), grid_xx.max(),
grid_yy.min(), grid_yy.max()],
cmap=cmap, alpha=0.8, origin='lower',
aspect='auto')
print("imshow绘制成功")
except Exception as e2:
print(f"imshow也失败: {e2}")
# 备选方案2散点图
try:
print("尝试使用散点图...")
valid_x = grid_xx[valid_data]
valid_y = grid_yy[valid_data]
valid_z = grid_content[valid_data]
if data_range > 0:
im = ax.scatter(valid_x, valid_y, c=valid_z,
cmap=cmap, alpha=0.8, s=10,
vmin=valid_values.min(), vmax=valid_values.max())
else:
im = ax.scatter(valid_x, valid_y, c=valid_z,
cmap=cmap, alpha=0.8, s=10)
print("散点图绘制成功")
except Exception as e3:
print(f"所有绘图方法都失败: {e3}")
raise ValueError("无法生成颜色图,请检查数据")
# 绘制边界(黑色)
try:
boundary_gdf.boundary.plot(ax=ax, color='black', linewidth=2, alpha=1.0)
print("边界绘制成功(黑色)")
except Exception as e:
print(f"边界绘制失败: {e}")
# 绘制不确定性叠加层(半透明)- 已禁用
# if show_uncertainty and grid_uncertainty is not None:
# try:
# print("正在添加不确定性叠加层...")
# # 创建不确定性掩膜(只显示高于阈值的不确定性区域)
# uncertainty_mask = (grid_uncertainty >= uncertainty_threshold) & (~np.isnan(grid_uncertainty))
#
# if np.any(uncertainty_mask):
# # 创建不确定性显示数据(只显示高不确定性区域)
# uncertainty_display = grid_uncertainty.copy()
# uncertainty_display[~uncertainty_mask] = np.nan
#
# # 使用contourf或pcolormesh绘制不确定性叠加层
# try:
# # 使用pcolormesh绘制半透明的不确定性层
# uncertainty_im = ax.pcolormesh(
# grid_xx, grid_yy, uncertainty_display,
# cmap=uncertainty_cmap, alpha=uncertainty_alpha,
# vmin=uncertainty_threshold, vmax=1.0,
# shading='gouraud', zorder=10
# )
# print(f"不确定性叠加层添加成功(阈值={uncertainty_threshold},透明度={uncertainty_alpha}")
# except Exception as e:
# print(f"不确定性叠加层绘制失败: {e}")
# # 备选方案使用contourf
# try:
# uncertainty_levels = np.linspace(uncertainty_threshold, 1.0, 10)
# uncertainty_im = ax.contourf(
# grid_xx, grid_yy, uncertainty_display,
# levels=uncertainty_levels,
# cmap=uncertainty_cmap, alpha=uncertainty_alpha,
# zorder=10
# )
# print("不确定性叠加层添加成功使用contourf")
# except Exception as e2:
# print(f"不确定性叠加层绘制失败: {e2}")
# else:
# print(f"警告:没有高于阈值{uncertainty_threshold}的不确定性区域")
#
# except Exception as e:
# print(f"不确定性叠加层添加失败: {e}")
# 可选择性绘制采样点(默认不绘制,以显示平滑的颜色分布)
if show_sample_points:
try:
points_gdf.plot(ax=ax, color='black', markersize=6, alpha=0.7,
marker='+', edgecolors='white', linewidth=1)
print("采样点绘制成功")
except Exception as e:
print(f"采样点绘制失败: {e}")
# 设置坐标轴标签和格式
# 由于输入是投影坐标系,输出是地理坐标系,始终显示为地理坐标
ax.set_xlabel('经度 (°)', fontsize=12)
ax.set_ylabel('纬度 (°)', fontsize=12)
# 格式化坐标轴刻度为经纬度格式保留3位小数
def lon_formatter(x, p):
return f'{x:.3f}°'
def lat_formatter(x, p):
return f'{x:.3f}°'
ax.xaxis.set_major_formatter(FuncFormatter(lon_formatter))
ax.yaxis.set_major_formatter(FuncFormatter(lat_formatter))
# 添加格网线
ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.5, color='gray')
ax.set_axisbelow(True) # 将格网线放在图层下方
# ax.set_title('含量分布图', fontsize=16, fontweight='bold', pad=20) # 已去除标题
# 添加颜色条
try:
if im is not None:
cbar = plt.colorbar(im, ax=ax, shrink=0.5, aspect=40, pad=0.02)
cbar.set_label('含量值', fontsize=10)
# 设置颜色条的刻度
if data_range > 0:
tick_values = np.linspace(valid_values.min(), valid_values.max(), 6)
cbar.set_ticks(tick_values)
cbar.set_ticklabels([f'{val:.3f}' for val in tick_values])
cbar.ax.tick_params(labelsize=8) # 缩小刻度标签字体
print("颜色条添加成功")
else:
print("警告无法添加颜色条im对象为None")
except Exception as e:
print(f"颜色条添加失败: {e}")
# 添加指北针
try:
self.add_north_arrow(ax, bounds)
except Exception as e:
print(f"指北针添加失败: {e}")
# 添加比例尺
try:
self.add_scale_bar(ax)
except Exception as e:
print(f"比例尺添加失败: {e}")
# 添加图例
try:
self.add_legend(ax, show_uncertainty=False) # 已禁用不确定性显示
except Exception as e:
print(f"图例添加失败: {e}")
# 设置图形边界进一步外扩1%以确保边界不完全挨着地图)
try:
x_range = bounds[2] - bounds[0]
y_range = bounds[3] - bounds[1]
display_expand = 0.01 # 显示时再外扩1%
ax.set_xlim(bounds[0] - x_range * display_expand, bounds[2] + x_range * display_expand)
ax.set_ylim(bounds[1] - y_range * display_expand, bounds[3] + y_range * display_expand)
except Exception as e:
print(f"设置图形边界失败: {e}")
# 调整布局
plt.tight_layout()
# 保存图片
try:
plt.savefig(output_file, dpi=300, bbox_inches='tight',
facecolor='white', edgecolor='none')
print(f"含量图已保存为:{output_file}")
except Exception as e:
print(f"图片保存失败: {e}")
# 显示图片
try:
plt.show()
except Exception as e:
print(f"图片显示失败: {e}")
def add_north_arrow(self, ax, bounds):
"""添加指北针(左上角)- 复杂罗盘样式"""
minx, miny, maxx, maxy = bounds
# 计算指北针位置(左上角)
arrow_x = minx + (maxx - minx) * 0.1
arrow_y = maxy - (maxy - miny) * 0.1
# 缩小指北针尺寸
size_factor = (maxy - miny) * 0.04 # 缩小尺寸
radius = size_factor * 1.0 # 罗盘半径
# 绘制圆形背景(外圈)
circle_outer = patches.Circle(
(arrow_x, arrow_y),
radius=radius,
facecolor='white',
edgecolor='black',
linewidth=2.5,
zorder=10
)
ax.add_patch(circle_outer)
# 绘制内圈(装饰)
circle_inner = patches.Circle(
(arrow_x, arrow_y),
radius=radius * 0.7,
facecolor='none',
edgecolor='gray',
linewidth=1.5,
linestyle='--',
zorder=11
)
ax.add_patch(circle_inner)
# 绘制四个方向的刻度线
tick_length = radius * 0.3
tick_width = 1.5
# 北方向刻度(主刻度)
ax.plot([arrow_x, arrow_x], [arrow_y, arrow_y + radius * 0.85],
'k-', linewidth=tick_width * 2, zorder=12)
# 南方向刻度
ax.plot([arrow_x, arrow_x], [arrow_y, arrow_y - radius * 0.85],
'k-', linewidth=tick_width, zorder=12)
# 东方向刻度
ax.plot([arrow_x, arrow_x + radius * 0.85], [arrow_y, arrow_y],
'k-', linewidth=tick_width, zorder=12)
# 西方向刻度
ax.plot([arrow_x, arrow_x - radius * 0.85], [arrow_y, arrow_y],
'k-', linewidth=tick_width, zorder=12)
# 绘制次要刻度45度方向
for angle in [45, 135, 225, 315]:
angle_rad = math.radians(angle)
x_end = arrow_x + radius * 0.7 * math.cos(angle_rad)
y_end = arrow_y + radius * 0.7 * math.sin(angle_rad)
ax.plot([arrow_x, x_end], [arrow_y, y_end],
'k-', linewidth=tick_width * 0.5, alpha=0.6, zorder=12)
# 绘制指北箭头(三角形,填充)
arrow_size = radius * 0.6
arrow_points = np.array([
[arrow_x, arrow_y + radius * 0.9], # 顶点(北)
[arrow_x - arrow_size * 0.3, arrow_y + radius * 0.3], # 左下
[arrow_x + arrow_size * 0.3, arrow_y + radius * 0.3] # 右下
])
arrow_poly = patches.Polygon(
arrow_points,
facecolor='black',
edgecolor='black',
linewidth=2,
zorder=13
)
ax.add_patch(arrow_poly)
# 绘制指南箭头(三角形,填充,但较小)
south_arrow_size = radius * 0.4
south_arrow_points = np.array([
[arrow_x, arrow_y - radius * 0.6], # 顶点(南)
[arrow_x - south_arrow_size * 0.2, arrow_y - radius * 0.2], # 左上
[arrow_x + south_arrow_size * 0.2, arrow_y - radius * 0.2] # 右上
])
south_arrow_poly = patches.Polygon(
south_arrow_points,
facecolor='white',
edgecolor='black',
linewidth=1.5,
zorder=13
)
ax.add_patch(south_arrow_poly)
# 添加方向标记N, S, E, W
label_offset = radius * 1.15
font_size = 16 * 0.5 # 缩小字体到原来的一半
ax.text(arrow_x, arrow_y + label_offset, 'N',
fontsize=font_size, fontweight='bold', ha='center', va='bottom',
color='black', zorder=14)
ax.text(arrow_x, arrow_y - label_offset, 'S',
fontsize=font_size * 0.8, fontweight='bold', ha='center', va='top',
color='black', zorder=14)
ax.text(arrow_x + label_offset, arrow_y, 'E',
fontsize=font_size * 0.8, fontweight='bold', ha='left', va='center',
color='black', zorder=14)
ax.text(arrow_x - label_offset, arrow_y, 'W',
fontsize=font_size * 0.8, fontweight='bold', ha='right', va='center',
color='black', zorder=14)
def add_scale_bar(self, ax):
"""添加比例尺"""
try:
if self.output_crs == 'EPSG:4326':
# 地理坐标系,需要指定度数与距离的换算关系
# 在地球表面1度约等于111公里在赤道附近
# 使用deg作为单位matplotlib-scalebar会自动处理
scalebar = ScaleBar(
111000, # 1度 = 111000米
units='m',
location='lower left',
box_alpha=0.8,
color='black',
font_properties={'size': 10},
label_loc='bottom'
)
ax.add_artist(scalebar)
print("地理坐标系比例尺添加成功")
else:
# 投影坐标系,使用米作为单位
scalebar = ScaleBar(1, units='m', location='lower left',
box_alpha=0.8, color='black',
font_properties={'size': 10})
ax.add_artist(scalebar)
print("投影坐标系比例尺添加成功")
except Exception as e:
print(f"比例尺添加失败: {e}")
# 如果matplotlib-scalebar失败尝试手动添加简单的比例尺
try:
self._add_manual_scale_bar(ax)
print("手动比例尺添加成功")
except Exception as e2:
print(f"手动比例尺也失败: {e2}")
def _add_manual_scale_bar(self, ax):
"""手动添加简单的比例尺"""
# 获取当前坐标轴的范围
xlim = ax.get_xlim()
ylim = ax.get_ylim()
# 计算比例尺的位置和长度
x_range = xlim[1] - xlim[0]
y_range = ylim[1] - ylim[0]
# 比例尺位置(左下角)
scale_x = xlim[0] + x_range * 0.05
scale_y = ylim[0] + y_range * 0.1
if self.output_crs == 'EPSG:4326':
# 地理坐标系:计算合适的比例尺长度(度)
# 选择一个合理的距离比如1公里、5公里或10公里
distance_km = 5 # 5公里
scale_length_deg = distance_km / 111.0 # 转换为度数
# 绘制比例尺线
ax.plot([scale_x, scale_x + scale_length_deg], [scale_y, scale_y],
'k-', linewidth=3)
ax.plot([scale_x, scale_x], [scale_y - y_range * 0.01, scale_y + y_range * 0.01],
'k-', linewidth=2)
ax.plot([scale_x + scale_length_deg, scale_x + scale_length_deg],
[scale_y - y_range * 0.01, scale_y + y_range * 0.01], 'k-', linewidth=2)
# 添加文字标注
ax.text(scale_x + scale_length_deg / 2, scale_y + y_range * 0.02,
f'{distance_km} km', ha='center', va='bottom', fontsize=10,
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8))
else:
# 投影坐标系:使用米为单位
# 选择合适的比例尺长度
if x_range > 10000: # 大于10km
scale_length = 5000 # 5km
scale_text = '5 km'
elif x_range > 2000: # 大于2km
scale_length = 1000 # 1km
scale_text = '1 km'
else: # 小于2km
scale_length = 500 # 500m
scale_text = '500 m'
# 绘制比例尺线
ax.plot([scale_x, scale_x + scale_length], [scale_y, scale_y],
'k-', linewidth=3)
ax.plot([scale_x, scale_x], [scale_y - y_range * 0.01, scale_y + y_range * 0.01],
'k-', linewidth=2)
ax.plot([scale_x + scale_length, scale_x + scale_length],
[scale_y - y_range * 0.01, scale_y + y_range * 0.01], 'k-', linewidth=2)
# 添加文字标注
ax.text(scale_x + scale_length / 2, scale_y + y_range * 0.02,
scale_text, ha='center', va='bottom', fontsize=10,
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8))
def _add_base_map(self, ax, base_map_tif, bounds, mask, grid_xx, grid_yy, boundary_gdf):
"""添加正射底图(在水域掩膜外显示)
Parameters:
-----------
ax : matplotlib.axes.Axes
绘图轴对象
base_map_tif : str
TIF底图文件路径
bounds : np.ndarray
显示范围 [minx, miny, maxx, maxy]
mask : np.ndarray
水域掩膜True表示水域内
grid_xx : np.ndarray
网格X坐标
grid_yy : np.ndarray
网格Y坐标
boundary_gdf : gpd.GeoDataFrame
边界几何数据
"""
import rasterio
import rasterio.windows
from rasterio.warp import calculate_default_transform, reproject, Resampling
print("正在读取底图文件...")
with rasterio.open(base_map_tif) as src:
# 获取底图的坐标系
tif_crs = src.crs
tif_bounds = src.bounds
print(f"底图坐标系: {tif_crs}")
print(f"底图范围: {tif_bounds}")
print(f"目标范围: {bounds}")
# 检查是否需要投影转换
target_crs = CRS.from_string(self.output_crs)
need_reproject = tif_crs != target_crs
# 读取底图数据
if need_reproject:
print(f"底图坐标系({tif_crs})与目标坐标系({target_crs})不同,正在转换...")
# 计算转换后的变换参数和尺寸
transform, width, height = calculate_default_transform(
tif_crs, target_crs,
src.width, src.height,
left=bounds[0], bottom=bounds[1],
right=bounds[2], top=bounds[3]
)
# 创建目标数组
if src.count == 1:
# 单波段
base_map_data = np.zeros((height, width), dtype=src.dtypes[0])
reproject(
source=rasterio.band(src, 1),
destination=base_map_data,
src_transform=src.transform,
src_crs=tif_crs,
dst_transform=transform,
dst_crs=target_crs,
resampling=Resampling.bilinear
)
else:
# 多波段RGB取前3个波段
num_bands = min(3, src.count)
base_map_data = np.zeros((num_bands, height, width), dtype=src.dtypes[0])
for i in range(num_bands):
reproject(
source=rasterio.band(src, i + 1),
destination=base_map_data[i],
src_transform=src.transform,
src_crs=tif_crs,
dst_transform=transform,
dst_crs=target_crs,
resampling=Resampling.bilinear
)
# 如果是RGB转换为(height, width, 3)格式
if num_bands == 3:
base_map_data = np.transpose(base_map_data, (1, 2, 0))
# 创建extent用于显示
extent = [bounds[0], bounds[2], bounds[1], bounds[3]]
else:
# 不需要投影转换,直接读取对应范围的数据
print("底图坐标系与目标坐标系一致,直接读取...")
# 计算需要读取的窗口
row_min, col_min = src.index(bounds[0], bounds[3]) # 左上角
row_max, col_max = src.index(bounds[2], bounds[1]) # 右下角
# 确保索引在有效范围内
row_min = max(0, row_min)
row_max = min(src.height, row_max + 1)
col_min = max(0, col_min)
col_max = min(src.width, col_max + 1)
window = rasterio.windows.Window.from_slices(
(row_min, row_max), (col_min, col_max)
)
# 读取数据
if src.count == 1:
base_map_data = src.read(1, window=window)
else:
# 多波段取前3个波段
num_bands = min(3, src.count)
base_map_data = src.read(list(range(1, num_bands + 1)), window=window)
if num_bands == 3:
# 转换为(height, width, 3)格式
base_map_data = np.transpose(base_map_data, (1, 2, 0))
# 计算extent
window_transform = rasterio.windows.transform(window, src.transform)
left = window_transform[2]
top = window_transform[5]
right = left + window_transform[0] * base_map_data.shape[1]
bottom = top + window_transform[4] * base_map_data.shape[0]
# 确保extent不超过bounds
extent = [
max(bounds[0], left),
min(bounds[2], right),
max(bounds[1], bottom),
min(bounds[3], top)
]
# 将底图数据缩放到网格大小以便显示
# 创建底图的显示掩膜:只在边界外显示
print("正在创建底图显示掩膜...")
# 创建底图网格(与显示范围对齐)
base_map_height, base_map_width = base_map_data.shape[:2]
# 性能优化:如果底图分辨率过高,进行降采样以提高处理速度
# 限制最大边长为2000像素保持足够清晰度的同时提高速度
max_display_size = 2000
scale_factor = 1.0
if max(base_map_height, base_map_width) > max_display_size:
scale_factor = max_display_size / max(base_map_height, base_map_width)
new_height = int(base_map_height * scale_factor)
new_width = int(base_map_width * scale_factor)
print(
f"底图分辨率较高 ({base_map_width}x{base_map_height}),降采样到 {new_width}x{new_height} 以提高速度")
# 使用scipy的zoom进行降采样
if base_map_data.ndim == 2:
base_map_data = ndimage.zoom(base_map_data, scale_factor, order=1)
else:
base_map_data = ndimage.zoom(base_map_data, (scale_factor, scale_factor, 1), order=1)
base_map_height, base_map_width = base_map_data.shape[:2]
# 更新extent以匹配新的分辨率
extent_width = extent[1] - extent[0]
extent_height = extent[3] - extent[2]
extent = [
extent[0],
extent[0] + extent_width,
extent[2],
extent[2] + extent_height
]
# 使用rasterio的geometry_mask快速生成掩膜比创建大量Point对象快得多
# 创建底图的变换矩阵
if need_reproject:
# 如果进行了投影转换使用计算得到的transform
base_map_transform = transform
else:
# 如果没有投影转换,使用窗口变换
base_map_transform = window_transform
# 如果进行了降采样需要调整transform
if scale_factor < 1.0:
# 调整transform以适应新的分辨率
# rasterio的transform是6元素tuple或Affine对象需要调整像素大小
# 获取transform的6个参数 (a, b, c, d, e, f)
# 其中a和e是像素大小需要除以scale_factor
try:
from affine import Affine
except ImportError:
# 如果affine包不可用尝试从rasterio导入
try:
from rasterio.transform import Affine
except ImportError:
# 如果都不可用使用tuple方式
Affine = None
if Affine is not None:
# 获取6个参数
if hasattr(base_map_transform, '__iter__') and len(base_map_transform) == 6:
a, b, c, d, e, f = base_map_transform
else:
a, b, c, d, e, f = base_map_transform[0], base_map_transform[1], base_map_transform[2], \
base_map_transform[3], base_map_transform[4], base_map_transform[5]
# 创建新的transform调整像素大小a和e是像素大小
base_map_transform = Affine(a / scale_factor, b, c, d, e / scale_factor, f)
else:
# 降级方案使用tuple
if hasattr(base_map_transform, '__iter__') and len(base_map_transform) == 6:
a, b, c, d, e, f = base_map_transform
else:
a, b, c, d, e, f = base_map_transform[0], base_map_transform[1], base_map_transform[2], \
base_map_transform[3], base_map_transform[4], base_map_transform[5]
base_map_transform = (a / scale_factor, b, c, d, e / scale_factor, f)
# 调试信息:检查边界数据和底图范围
print(f"底图显示范围 (extent): {extent}")
print(f"底图分辨率: {base_map_width}x{base_map_height}")
print(f"底图transform: {base_map_transform}")
if boundary_gdf is not None and len(boundary_gdf) > 0:
boundary_bounds = boundary_gdf.total_bounds
print(f"边界数据范围: {boundary_bounds}")
print(f"边界数据坐标系: {boundary_gdf.crs}")
print(f"边界要素数量: {len(boundary_gdf)}")
# 检查边界是否与底图范围重叠
overlap_x = not (boundary_bounds[2] < extent[0] or boundary_bounds[0] > extent[1])
overlap_y = not (boundary_bounds[3] < extent[2] or boundary_bounds[1] > extent[3])
if not (overlap_x and overlap_y):
print("警告: 边界数据范围与底图显示范围不重叠!")
print(" 将不应用掩膜,显示整个底图")
# 创建全True的掩膜显示所有区域
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
else:
# 使用geometry_mask生成掩膜True表示在几何体内即水域内
# 注意geometry_mask返回True表示需要掩膜的区域在几何体内
# 但我们想要的是边界外的区域(不在几何体内),所以需要反转
try:
within_boundary_mask = geometry_mask(
boundary_gdf.geometry,
out_shape=(base_map_height, base_map_width),
transform=base_map_transform,
invert=False # False表示掩膜几何体内的区域水域内
)
# 反转掩膜True表示边界外需要显示的区域
base_map_mask = ~within_boundary_mask
except Exception as e:
print(f"生成掩膜时出错: {e}")
print(" 将不应用掩膜,显示整个底图")
import traceback
traceback.print_exc()
# 创建全True的掩膜显示所有区域
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
else:
print("警告: 边界数据为空,将不应用掩膜,显示整个底图")
# 创建全True的掩膜显示所有区域
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
# 调试信息:检查掩膜状态
mask_ratio = np.sum(base_map_mask) / base_map_mask.size
print(
f"底图掩膜状态: 可显示区域占比 {mask_ratio * 100:.2f}% ({np.sum(base_map_mask)}/{base_map_mask.size} 像素)")
# 如果掩膜后没有可显示区域,警告并显示整个底图
if mask_ratio == 0.0:
print("警告: 掩膜后没有可显示区域,将显示整个底图(不应用掩膜)")
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
# 归一化数据以便显示(如果是数值型)
# 注意:先归一化整个数据,再应用掩膜,这样可以保证归一化范围正确
if base_map_data.dtype != np.uint8:
if base_map_data.ndim == 2:
# 单波段归一化到0-1
# 使用整个数据集的范围进行归一化(不仅仅是掩膜区域)
data_min = np.nanmin(base_map_data)
data_max = np.nanmax(base_map_data)
print(f"底图数据范围: [{data_min}, {data_max}], dtype: {base_map_data.dtype}")
if data_max > data_min:
# 先归一化整个数组
base_map_normalized = (base_map_data - data_min) / (data_max - data_min)
# 然后应用掩膜:只显示边界外的区域
base_map_display = np.ma.masked_where(~base_map_mask, base_map_normalized)
else:
# 如果数据范围无效创建全0的掩膜数组
print("警告: 底图数据范围无效,所有值相同")
base_map_display = np.ma.masked_where(~base_map_mask, np.zeros_like(base_map_data))
else:
# RGB每个波段单独归一化
base_map_normalized = base_map_data.copy().astype(np.float32)
for i in range(base_map_data.shape[2]):
band_data = base_map_data[:, :, i]
data_min = np.nanmin(band_data)
data_max = np.nanmax(band_data)
print(f"底图波段 {i} 数据范围: [{data_min}, {data_max}]")
if data_max > data_min:
# 归一化整个波段
base_map_normalized[:, :, i] = (band_data - data_min) / (data_max - data_min)
else:
print(f"警告: 底图波段 {i} 数据范围无效,所有值相同")
base_map_normalized[:, :, i] = np.zeros_like(band_data)
# 应用掩膜:只显示边界外的区域
mask_3d = np.broadcast_to(~base_map_mask[..., np.newaxis], base_map_data.shape)
base_map_display = np.ma.masked_where(mask_3d, base_map_normalized)
else:
# uint8类型直接使用但可能需要归一化到0-1用于imshow
if base_map_data.ndim == 2:
# 单波段uint8通常已经是0-255范围归一化到0-1
base_map_normalized = base_map_data.astype(np.float32) / 255.0
base_map_display = np.ma.masked_where(~base_map_mask, base_map_normalized)
else:
# RGBuint8归一化到0-1
base_map_normalized = base_map_data.astype(np.float32) / 255.0
mask_3d = np.broadcast_to(~base_map_mask[..., np.newaxis], base_map_data.shape)
base_map_display = np.ma.masked_where(mask_3d, base_map_normalized)
# 检查归一化后的数据范围
if isinstance(base_map_display, np.ma.MaskedArray):
valid_data = base_map_display[~base_map_display.mask]
if len(valid_data) > 0:
print(
f"归一化后有效数据范围: [{np.nanmin(valid_data):.3f}, {np.nanmax(valid_data):.3f}], 有效像素数: {len(valid_data)}")
else:
print("警告: 归一化后没有有效数据显示区域")
# 绘制底图
print("正在绘制底图...")
# 注意extent格式为 [left, right, bottom, top]
# 对于地理坐标系y轴通常向上为正所以使用origin='lower'
try:
if base_map_data.ndim == 2:
# 单波段:使用灰度图
# 确保数据在0-1范围内
if isinstance(base_map_display, np.ma.MaskedArray):
# 对于masked array确保数据范围正确
if np.ma.max(base_map_display) > 1.0 or np.ma.min(base_map_display) < 0.0:
base_map_display = np.ma.clip(base_map_display, 0.0, 1.0)
else:
base_map_display = np.clip(base_map_display, 0.0, 1.0)
im = ax.imshow(base_map_display, extent=extent, origin='lower',
cmap='gray', alpha=0.8, zorder=0, interpolation='bilinear',
vmin=0.0, vmax=1.0)
else:
# RGB直接显示
# 确保数据格式正确需要在0-1范围内
if isinstance(base_map_display, np.ma.MaskedArray):
# 对于masked array确保数据在0-1范围内
if np.ma.max(base_map_display) > 1.0 or np.ma.min(base_map_display) < 0.0:
base_map_display = np.ma.clip(base_map_display, 0.0, 1.0)
else:
base_map_display = np.clip(base_map_display, 0.0, 1.0)
# 确保是float32类型imshow期望0-1范围的float数组
if base_map_display.dtype != np.float32 and base_map_display.dtype != np.float64:
base_map_display = base_map_display.astype(np.float32)
im = ax.imshow(base_map_display, extent=extent, origin='lower',
alpha=0.8, zorder=0, interpolation='bilinear')
print(f"底图绘制成功")
except Exception as e:
print(f"底图绘制出错: {e}")
import traceback
traceback.print_exc()
# 如果绘制失败,至少尝试绘制一个简单的占位图
print("尝试使用备用方法绘制底图...")
try:
if base_map_data.ndim == 2:
# 使用简单的numpy数组不应用掩膜
simple_display = np.clip(base_map_data.astype(np.float32) / np.nanmax(base_map_data), 0, 1)
ax.imshow(simple_display, extent=extent, origin='lower',
cmap='gray', alpha=0.5, zorder=0)
else:
simple_display = np.clip(base_map_data.astype(np.float32) / 255.0, 0, 1)
ax.imshow(simple_display, extent=extent, origin='lower',
alpha=0.5, zorder=0)
print("备用方法绘制成功")
except Exception as e2:
print(f"备用方法也失败: {e2}")
print(f"底图已绘制,显示范围: {extent}")
def add_legend(self, ax, show_uncertainty=False):
"""
添加图例
Parameters:
-----------
show_uncertainty : bool
是否显示不确定性图例项
"""
legend_elements = [
# 移除边界标签
# plt.Line2D([0], [0], color='red', linewidth=2, label='边界'),
# 移除采样点和等值线图例项,以突出平滑的颜色分布效果
# plt.Line2D([0], [0], marker='+', color='w', markerfacecolor='black',
# markersize=8, label='采样点'),
]
# 如果显示了不确定性,添加图例项(已禁用)
# if show_uncertainty:
# from matplotlib.patches import Patch
# legend_elements.append(
# Patch(facecolor='red', alpha=0.5, label='高不确定性区域')
# )
# 如果图例为空,则不显示图例
if legend_elements:
ax.legend(handles=legend_elements, loc='upper left',
framealpha=0.9, fontsize=10)
def process_data(self, csv_file, shp_file, output_file='content_map.png',
resolution=100, show_sample_points=False, base_map_tif=None,
uncertainty_col=None, uncertainty_method='auto',
calculate_uncertainty=True, show_uncertainty=True,
uncertainty_alpha=0.5, uncertainty_threshold=0.5,
use_distance_diffusion=True, max_diffusion_distance=None,
diffusion_power=2, diffusion_n_neighbors=15, cmap=None,
expand_ratio=0.05):
"""
主处理函数
Parameters:
-----------
base_map_tif : str, optional
TIF正射底图文件路径。如果提供将在水域掩膜外显示底图
uncertainty_col : str, optional
不确定性数据列名。如果为None将自动检测
uncertainty_method : str, default='auto'
不确定性计算方法:'auto', 'kriging', 'idw', 'rbf', 'model_variance'
calculate_uncertainty : bool, default=True
是否计算不确定性
show_uncertainty : bool, default=True
是否在图上显示不确定性叠加层
uncertainty_alpha : float, default=0.5
不确定性叠加层透明度0-1
uncertainty_threshold : float, default=0.5
不确定性显示阈值0-1只显示高于此阈值的不确定性区域
use_distance_diffusion : bool, default=True
是否使用距离扩散方法填充边界空白区域
max_diffusion_distance : float, optional
最大扩散距离单位与坐标相同。如果为None自动计算为网格分辨率的5倍
diffusion_power : float, default=2
距离扩散的IDW幂参数值越大距离衰减越快
diffusion_n_neighbors : int, default=15
距离扩散使用的最近邻点数
cmap : str, optional
颜色映射。如果为None将从CSV文件名或内容中自动识别参数并选择对应的colormap
expand_ratio : float, default=0.05
边界外扩比例5%),确保图像边界不完全挨着地图
"""
try:
# 自动识别参数名称并获取colormap
if cmap is None:
param_name = self._extract_param_name(csv_file)
cmap = self._get_colormap(param_name)
else:
print(f"使用指定的颜色映射: {cmap}")
# 读取数据
points_gdf = self.read_csv_data(csv_file, uncertainty_col=uncertainty_col)
boundary_gdf = self.read_boundary_shapefile(shp_file)
# 对边缘采样点进行外扩处理外扩到整个图像边界按照resolution间距
points_gdf = self._expand_edge_points(points_gdf, boundary_gdf, resolution=resolution, expand_ratio=expand_ratio)
# 创建插值网格(包含不确定性)
result = self.create_interpolation_grid(
points_gdf, boundary_gdf, resolution,
expand_ratio=expand_ratio,
uncertainty_method=uncertainty_method,
calculate_uncertainty=calculate_uncertainty,
use_distance_diffusion=use_distance_diffusion,
max_diffusion_distance=max_diffusion_distance,
diffusion_power=diffusion_power,
diffusion_n_neighbors=diffusion_n_neighbors
)
# 根据返回值解包
if len(result) == 5:
grid_xx, grid_yy, grid_content, bounds, grid_uncertainty = result
else:
grid_xx, grid_yy, grid_content, bounds = result
grid_uncertainty = None
# 生成含量图(包含不确定性叠加)
self.create_content_map(
points_gdf, boundary_gdf, grid_xx, grid_yy,
grid_content, bounds, output_file, show_sample_points, base_map_tif,
grid_uncertainty=grid_uncertainty,
show_uncertainty=show_uncertainty,
uncertainty_alpha=uncertainty_alpha,
uncertainty_threshold=uncertainty_threshold,
cmap=cmap
)
print("处理完成!")
# 输出统计信息
print(f"\n统计信息:")
print(f"数据点数量: {len(points_gdf)}")
print(f"含量值范围: {points_gdf['content'].min():.2f} - {points_gdf['content'].max():.2f}")
print(f"含量值平均: {points_gdf['content'].mean():.2f}")
print(f"含量值标准差: {points_gdf['content'].std():.2f}")
except Exception as e:
print(f"处理过程中出现错误: {str(e)}")
raise
def process_batch(self, csv_folder, shp_file, output_folder=None,
resolution=100, show_sample_points=False, base_map_tif=None,
uncertainty_col=None, uncertainty_method='auto',
calculate_uncertainty=True, show_uncertainty=True,
uncertainty_alpha=0.5, uncertainty_threshold=0.5,
use_distance_diffusion=True, max_diffusion_distance=None,
diffusion_power=2, diffusion_n_neighbors=15):
"""
批量处理文件夹中的CSV文件
Parameters:
-----------
csv_folder : str
包含CSV文件的文件夹路径
shp_file : str
边界shapefile文件路径
output_folder : str, optional
输出文件夹路径。如果为None将在CSV文件所在文件夹创建'map_output'子文件夹
resolution : int, default=100
网格分辨率(米)
show_sample_points : bool, default=False
是否显示采样点
base_map_tif : str, optional
TIF正射底图文件路径
uncertainty_col : str, optional
不确定性数据列名
uncertainty_method : str, default='auto'
不确定性计算方法
calculate_uncertainty : bool, default=True
是否计算不确定性
show_uncertainty : bool, default=True
是否显示不确定性叠加层
uncertainty_alpha : float, default=0.5
不确定性叠加层透明度
uncertainty_threshold : float, default=0.5
不确定性显示阈值
use_distance_diffusion : bool, default=True
是否使用距离扩散方法
max_diffusion_distance : float, optional
最大扩散距离
diffusion_power : float, default=2
距离扩散的IDW幂参数
diffusion_n_neighbors : int, default=15
距离扩散使用的最近邻点数
"""
print("=" * 60)
print("开始批量处理CSV文件")
print("=" * 60)
# 检查输入文件夹是否存在
if not os.path.isdir(csv_folder):
raise ValueError(f"输入文件夹不存在: {csv_folder}")
# 获取所有CSV文件
csv_files = glob.glob(os.path.join(csv_folder, "*.csv"))
if len(csv_files) == 0:
raise ValueError(f"在文件夹 {csv_folder} 中未找到CSV文件")
print(f"找到 {len(csv_files)} 个CSV文件")
# 创建输出文件夹
if output_folder is None:
output_folder = os.path.join(csv_folder, "map_output")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
print(f"创建输出文件夹: {output_folder}")
else:
print(f"使用输出文件夹: {output_folder}")
# 统计信息
success_count = 0
fail_count = 0
failed_files = []
# 批量处理每个CSV文件
for i, csv_file in enumerate(csv_files, 1):
print("\n" + "=" * 60)
print(f"处理文件 {i}/{len(csv_files)}: {os.path.basename(csv_file)}")
print("=" * 60)
try:
# 生成输出文件名使用CSV文件名但扩展名为.png
csv_basename = os.path.splitext(os.path.basename(csv_file))[0]
output_file = os.path.join(output_folder, f"{csv_basename}.png")
# 处理单个文件自动识别参数并选择colormap
self.process_data(
csv_file=csv_file,
shp_file=shp_file,
output_file=output_file,
resolution=resolution,
show_sample_points=show_sample_points,
base_map_tif=base_map_tif,
uncertainty_col=uncertainty_col,
uncertainty_method=uncertainty_method,
calculate_uncertainty=calculate_uncertainty,
show_uncertainty=show_uncertainty,
uncertainty_alpha=uncertainty_alpha,
uncertainty_threshold=uncertainty_threshold,
use_distance_diffusion=use_distance_diffusion,
max_diffusion_distance=max_diffusion_distance,
diffusion_power=diffusion_power,
diffusion_n_neighbors=diffusion_n_neighbors,
cmap=None # 自动识别
)
success_count += 1
print(f"✓ 成功处理: {csv_basename}.png")
except Exception as e:
fail_count += 1
failed_files.append((os.path.basename(csv_file), str(e)))
print(f"✗ 处理失败: {os.path.basename(csv_file)}")
print(f" 错误信息: {e}")
import traceback
traceback.print_exc()
# 输出批量处理结果统计
print("\n" + "=" * 60)
print("批量处理完成")
print("=" * 60)
print(f"总文件数: {len(csv_files)}")
print(f"成功: {success_count}")
print(f"失败: {fail_count}")
print(f"输出文件夹: {output_folder}")
if failed_files:
print("\n失败的文件列表:")
for file_name, error in failed_files:
print(f" - {file_name}: {error}")
return {
'total': len(csv_files),
'success': success_count,
'failed': fail_count,
'output_folder': output_folder,
'failed_files': failed_files
}
def main():
"""主函数 - 使用示例"""
# 创建处理器实例
mapper = ContentMapper()
# 示例1处理单个文件
# csv_file = r"E:\code\WQ\yaobao925\预测样点_无耀斑\BGA.csv" # 采样点的预测值
# shp_file = r"E:\code\WQ\yaobao925\roi\shp\roi.shp" # 水体边界shapefile路径
# output_file = r"E:\code\WQ\yaobao925\map\test\BGA.png" # 输出图片路径
#
# mapper.process_data(
# csv_file=csv_file,
# shp_file=shp_file,
# output_file=output_file,
# resolution=30, # 网格分辨率(米),更小的值产生更平滑的效果
# show_sample_points=False, # 设置为False以显示平滑的颜色分布True则显示采样点位置
# base_map_tif=None, # 正射底图路径(可选)
# # 不确定性相关参数
# uncertainty_col=None, # 如果不确定性列名已知,可以指定;否则会自动检测
# uncertainty_method='auto', # 'auto', 'kriging', 'idw', 'rbf', 'model_variance'
# calculate_uncertainty=True, # 是否计算不确定性
# show_uncertainty=True, # 是否在图上显示不确定性叠加层
# uncertainty_alpha=0.2, # 不确定性叠加层透明度0-1
# uncertainty_threshold=0.2, # 不确定性显示阈值0-1只显示高于此阈值的不确定性区域
# cmap=None # 自动从文件名或内容中识别参数并选择对应的colormap
# )
# 示例2批量处理文件夹中的所有CSV文件
csv_folder = r"D:\BaiduNetdiskDownload\yaobao\prediction" # CSV文件所在文件夹
shp_file = r"E:\code\WQ\yaobao925\roi\shp\roi.shp" # 水体边界shapefile路径
output_folder = r"D:\BaiduNetdiskDownload\yaobao\model" # 输出文件夹可选如果为None则在CSV文件夹下创建map_output
# 批量处理会自动识别每个CSV文件的参数名称并选择对应的colormap
result = mapper.process_batch(
csv_folder=csv_folder,
shp_file=shp_file,
output_folder=output_folder, # 如果为None将在CSV文件夹下创建map_output子文件夹
resolution=30, # 网格分辨率(米)
show_sample_points=False, # 是否显示采样点
base_map_tif=None, # 正射底图路径(可选)
# 不确定性相关参数
uncertainty_col=None,
uncertainty_method='auto',
calculate_uncertainty=True,
show_uncertainty=True,
uncertainty_alpha=0.2,
uncertainty_threshold=0.2
)
print(f"\n批量处理结果: {result}")
if __name__ == "__main__":
# 使用示例
print("含量分布图生成器")
print("=" * 50)
# 如果要直接运行,请取消下面的注释并修改文件路径
main()
# 或者交互式使用
# print("使用方法:")
# print("1. 准备CSV文件前两列为WGS84经纬度第三列为含量数据")
# print("2. 准备边界Shapefile文件")
# print("3. 调用以下代码:")
# print("""
# mapper = ContentMapper()
# mapper.process_data(
# csv_file='your_data.csv',
# shp_file='your_boundary.shp',
# output_file='output_map.png',
# resolution=50
# )
# """)