2562 lines
113 KiB
Python
2562 lines
113 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
import geopandas as gpd
|
||
from pyproj import CRS, Transformer
|
||
import matplotlib.pyplot as plt
|
||
import matplotlib.patches as patches
|
||
from matplotlib.ticker import FuncFormatter
|
||
from matplotlib_scalebar.scalebar import ScaleBar
|
||
from scipy.interpolate import griddata
|
||
from scipy import ndimage
|
||
from scipy.spatial.distance import cdist
|
||
from scipy.spatial import ConvexHull
|
||
from shapely.geometry import Point, Polygon
|
||
import rasterio
|
||
from rasterio.features import geometry_mask
|
||
import warnings
|
||
import math
|
||
import os
|
||
import random
|
||
import glob
|
||
|
||
# 尝试导入pykrige(可选依赖)
|
||
try:
|
||
from pykrige.ok import OrdinaryKriging
|
||
PYKRIGE_AVAILABLE = True
|
||
except ImportError:
|
||
PYKRIGE_AVAILABLE = False
|
||
print("警告: pykrige未安装,Kriging不确定性计算将不可用")
|
||
|
||
warnings.filterwarnings('ignore')
|
||
|
||
# 设置中文字体
|
||
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
# 参数到颜色映射的字典
|
||
PARAMS_CMAP = {
|
||
"Chlorophyll": "YlGnBu_r",
|
||
"COD": "coolwarm",
|
||
"DO": "RdYlBu",
|
||
"pH": "Spectral",
|
||
"Temperature": "turbo",
|
||
"spCond": "cividis",
|
||
"Turbidity": "YlOrBr",
|
||
"TDS": "inferno",
|
||
"Cl-": "RdYlBu_r",
|
||
"NO3-N": "YlOrRd",
|
||
"NH3-N": "magma",
|
||
"BGA": "viridis",
|
||
"TT": "RdYlBu_r"
|
||
}
|
||
|
||
|
||
class ContentMapper:
|
||
def __init__(self, input_crs='EPSG:32651', output_crs='EPSG:4326'):
|
||
"""
|
||
初始化ContentMapper - 生成平滑的含量分布图
|
||
|
||
本类专门用于生成平滑、均匀的颜色分布图,而不是显示离散的采样点。
|
||
通过高密度网格插值和多级颜色映射,创建连续的颜色过渡效果。
|
||
|
||
Parameters:
|
||
-----------
|
||
input_crs : str
|
||
输入坐标系,默认为'EPSG:32651' (WGS_1984_UTM_Zone_51N)
|
||
output_crs : str
|
||
输出坐标系,默认为'EPSG:4326' (WGS84)
|
||
"""
|
||
# 定义坐标转换器
|
||
self.input_crs = input_crs
|
||
self.output_crs = output_crs
|
||
self.transformer = Transformer.from_crs(
|
||
CRS.from_string(input_crs),
|
||
CRS.from_string(output_crs),
|
||
always_xy=True
|
||
)
|
||
|
||
# 参数到颜色映射的字典
|
||
self.params_cmap = PARAMS_CMAP.copy()
|
||
|
||
# 所有可用的matplotlib colormap列表(用于随机选择)
|
||
self.available_cmaps = ['viridis', 'plasma', 'inferno', 'magma', 'cividis',
|
||
'coolwarm', 'RdYlBu', 'Spectral', 'YlGnBu_r', 'YlOrBr',
|
||
'YlOrRd', 'turbo', 'RdYlBu_r', 'cool', 'hot', 'jet']
|
||
|
||
print(f"坐标转换设置: {input_crs} -> {output_crs}")
|
||
|
||
def _extract_param_name(self, csv_file):
|
||
"""
|
||
从CSV文件名或内容中提取参数名称
|
||
|
||
Parameters:
|
||
-----------
|
||
csv_file : str
|
||
CSV文件路径
|
||
|
||
Returns:
|
||
--------
|
||
param_name : str or None
|
||
提取的参数名称,如果未找到则返回None
|
||
"""
|
||
print(f"[调试] 开始从文件 {csv_file} 中提取参数名称")
|
||
print(f"[调试] 字典中的参数键: {list(self.params_cmap.keys())}")
|
||
|
||
# 从文件名中提取(去除路径和扩展名)
|
||
file_name = os.path.basename(csv_file)
|
||
file_name_no_ext = os.path.splitext(file_name)[0]
|
||
print(f"[调试] 文件名(不含扩展名): {file_name_no_ext}")
|
||
|
||
# 尝试从文件名中匹配参数名称(不区分大小写)
|
||
file_name_upper = file_name_no_ext.upper()
|
||
for param in self.params_cmap.keys():
|
||
param_upper = param.upper()
|
||
if param_upper in file_name_upper:
|
||
print(f"从文件名中识别到参数: {param} (匹配到 '{param_upper}' 在 '{file_name_upper}' 中)")
|
||
return param # 返回字典中的原始键(保持大小写)
|
||
|
||
# 如果文件名中没有找到,尝试从CSV内容中提取(检查列名)
|
||
try:
|
||
df = pd.read_csv(csv_file, encoding='utf-8', nrows=0) # 只读取列名
|
||
columns = [col.upper() for col in df.columns]
|
||
print(f"[调试] CSV列名: {list(df.columns)}")
|
||
|
||
for param in self.params_cmap.keys():
|
||
param_upper = param.upper()
|
||
# 检查列名中是否包含参数名称
|
||
for col in columns:
|
||
if param_upper in col or col in param_upper:
|
||
print(f"从CSV列名中识别到参数: {param} (匹配到列名 '{col}')")
|
||
return param # 返回字典中的原始键(保持大小写)
|
||
except Exception as e:
|
||
print(f"读取CSV列名时出错: {e}")
|
||
|
||
print(f"未能在文件 {csv_file} 中识别参数名称")
|
||
print(f"[调试] 可用的参数名: {list(self.params_cmap.keys())}")
|
||
return None
|
||
|
||
def _get_colormap(self, param_name=None):
|
||
"""
|
||
根据参数名称获取对应的colormap
|
||
|
||
Parameters:
|
||
-----------
|
||
param_name : str, optional
|
||
参数名称。如果为None或不在映射中,则随机选择一个colormap
|
||
|
||
Returns:
|
||
--------
|
||
cmap : str
|
||
颜色映射名称
|
||
"""
|
||
# 打印调试信息
|
||
print(f"[调试] _get_colormap 被调用,param_name={param_name}")
|
||
print(f"[调试] 当前字典中的键: {list(self.params_cmap.keys())}")
|
||
|
||
if param_name:
|
||
# 首先尝试精确匹配(区分大小写)
|
||
if param_name in self.params_cmap:
|
||
cmap = self.params_cmap[param_name]
|
||
print(f"使用参数 '{param_name}' 对应的颜色映射: {cmap}")
|
||
return cmap
|
||
|
||
# 如果精确匹配失败,尝试不区分大小写的匹配
|
||
param_name_upper = param_name.upper()
|
||
for key in self.params_cmap.keys():
|
||
if key.upper() == param_name_upper:
|
||
cmap = self.params_cmap[key]
|
||
print(f"使用参数 '{key}' (不区分大小写匹配 '{param_name}') 对应的颜色映射: {cmap}")
|
||
return cmap
|
||
|
||
# 如果都不匹配,随机选择
|
||
cmap = random.choice(self.available_cmaps)
|
||
print(f"警告: 参数 '{param_name}' 不在映射中,随机选择颜色映射: {cmap}")
|
||
print(f"可用的参数名: {list(self.params_cmap.keys())}")
|
||
return cmap
|
||
else:
|
||
# 随机选择一个colormap
|
||
cmap = random.choice(self.available_cmaps)
|
||
print(f"未指定参数名称,随机选择颜色映射: {cmap}")
|
||
return cmap
|
||
|
||
def _check_point_distribution(self, points):
|
||
"""检查数据点的几何分布"""
|
||
print("正在检查数据点分布...")
|
||
|
||
# 检查是否有重复点
|
||
unique_points = np.unique(points, axis=0)
|
||
if len(unique_points) < len(points):
|
||
print(f"警告:发现 {len(points) - len(unique_points)} 个重复数据点")
|
||
|
||
# 检查点是否共线
|
||
if len(unique_points) >= 3:
|
||
# 计算前三个点构成的三角形面积
|
||
p1, p2, p3 = unique_points[:3]
|
||
area = 0.5 * abs((p2[0] - p1[0]) * (p3[1] - p1[1]) - (p3[0] - p1[0]) * (p2[1] - p1[1]))
|
||
|
||
if area < 1e-10: # 面积太小,可能共线
|
||
print("警告:前三个数据点可能共线")
|
||
|
||
# 尝试找到不共线的点
|
||
for i in range(3, len(unique_points)):
|
||
p4 = unique_points[i]
|
||
area = 0.5 * abs((p2[0] - p1[0]) * (p4[1] - p1[1]) - (p4[0] - p1[0]) * (p2[1] - p1[1]))
|
||
if area > 1e-10:
|
||
print(f"找到非共线点,使用点 {i}")
|
||
break
|
||
else:
|
||
print("警告:所有数据点可能都共线,这会导致插值失败")
|
||
|
||
# 检查坐标范围
|
||
x_range = points[:, 0].max() - points[:, 0].min()
|
||
y_range = points[:, 1].max() - points[:, 1].min()
|
||
|
||
if x_range < 1e-6 or y_range < 1e-6:
|
||
print(f"警告:坐标范围很小 (X范围: {x_range:.2e}, Y范围: {y_range:.2e})")
|
||
print("这可能导致插值数值不稳定")
|
||
|
||
return unique_points
|
||
|
||
def _calculate_idw_uncertainty(self, points, grid_xx, grid_yy, power=2, n_neighbors=10):
|
||
"""
|
||
计算IDW插值的不确定性(距离加权和的倒数)
|
||
|
||
Parameters:
|
||
-----------
|
||
points : np.ndarray
|
||
数据点坐标 (n_points, 2)
|
||
grid_xx : np.ndarray
|
||
网格X坐标
|
||
grid_yy : np.ndarray
|
||
网格Y坐标
|
||
power : float
|
||
IDW的幂参数,默认为2
|
||
n_neighbors : int
|
||
使用的最近邻点数,默认为10
|
||
|
||
Returns:
|
||
--------
|
||
uncertainty : np.ndarray
|
||
不确定性网格(距离加权和的倒数,归一化到0-1)
|
||
"""
|
||
print("正在计算IDW不确定性...")
|
||
grid_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
|
||
|
||
# 使用向量化计算提高性能
|
||
# 计算所有网格点到所有数据点的距离矩阵
|
||
distances = cdist(grid_points, points)
|
||
|
||
# 对每个网格点,找到最近的n_neighbors个点
|
||
n_neighbors = min(n_neighbors, len(points))
|
||
nearest_distances = np.partition(distances, n_neighbors-1, axis=1)[:, :n_neighbors]
|
||
|
||
# 避免除零
|
||
nearest_distances = np.maximum(nearest_distances, 1e-10)
|
||
|
||
# 计算距离加权和(向量化)
|
||
weights = 1.0 / (nearest_distances ** power)
|
||
weight_sums = np.sum(weights, axis=1)
|
||
|
||
# 不确定性 = 1 / 距离加权和(加权和越大,不确定性越小)
|
||
valid_mask = weight_sums > 0
|
||
uncertainty_flat = np.full(len(grid_points), np.nan)
|
||
uncertainty_flat[valid_mask] = 1.0 / weight_sums[valid_mask]
|
||
|
||
uncertainty = uncertainty_flat.reshape(grid_xx.shape)
|
||
|
||
# 归一化到0-1范围
|
||
valid_uncertainty = uncertainty[~np.isnan(uncertainty)]
|
||
if len(valid_uncertainty) > 0:
|
||
min_unc = valid_uncertainty.min()
|
||
max_unc = valid_uncertainty.max()
|
||
if max_unc > min_unc:
|
||
uncertainty = (uncertainty - min_unc) / (max_unc - min_unc)
|
||
|
||
print("IDW不确定性计算完成")
|
||
return uncertainty
|
||
|
||
def _calculate_rbf_uncertainty(self, points, grid_xx, grid_yy, n_neighbors=10):
|
||
"""
|
||
计算RBF插值的不确定性(基于距离的倒数)
|
||
|
||
Parameters:
|
||
-----------
|
||
points : np.ndarray
|
||
数据点坐标 (n_points, 2)
|
||
grid_xx : np.ndarray
|
||
网格X坐标
|
||
grid_yy : np.ndarray
|
||
网格Y坐标
|
||
n_neighbors : int
|
||
使用的最近邻点数,默认为10
|
||
|
||
Returns:
|
||
--------
|
||
uncertainty : np.ndarray
|
||
不确定性网格(归一化到0-1)
|
||
"""
|
||
print("正在计算RBF不确定性...")
|
||
grid_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
|
||
|
||
# 使用向量化计算提高性能
|
||
distances = cdist(grid_points, points)
|
||
|
||
# 对每个网格点,找到最近的n_neighbors个点
|
||
n_neighbors = min(n_neighbors, len(points))
|
||
nearest_distances = np.partition(distances, n_neighbors-1, axis=1)[:, :n_neighbors]
|
||
|
||
# 使用平均距离作为不确定性指标(距离越大,不确定性越大)
|
||
mean_distances = np.mean(nearest_distances, axis=1)
|
||
uncertainty = mean_distances.reshape(grid_xx.shape)
|
||
|
||
# 归一化到0-1范围
|
||
valid_uncertainty = uncertainty[~np.isnan(uncertainty)]
|
||
if len(valid_uncertainty) > 0:
|
||
min_unc = valid_uncertainty.min()
|
||
max_unc = valid_uncertainty.max()
|
||
if max_unc > min_unc:
|
||
uncertainty = (uncertainty - min_unc) / (max_unc - min_unc)
|
||
|
||
print("RBF不确定性计算完成")
|
||
return uncertainty
|
||
|
||
def _calculate_kriging_uncertainty(self, points, values, grid_xx, grid_yy):
|
||
"""
|
||
计算Kriging插值的方差(不确定性)
|
||
|
||
Parameters:
|
||
-----------
|
||
points : np.ndarray
|
||
数据点坐标 (n_points, 2)
|
||
values : np.ndarray
|
||
数据值
|
||
grid_xx : np.ndarray
|
||
网格X坐标
|
||
grid_yy : np.ndarray
|
||
网格Y坐标
|
||
|
||
Returns:
|
||
--------
|
||
uncertainty : np.ndarray
|
||
Kriging方差网格(归一化到0-1)
|
||
"""
|
||
if not PYKRIGE_AVAILABLE:
|
||
print("警告: pykrige未安装,无法计算Kriging不确定性,将使用IDW方法")
|
||
return self._calculate_idw_uncertainty(points, grid_xx, grid_yy)
|
||
|
||
print("正在计算Kriging不确定性...")
|
||
try:
|
||
# 创建Kriging插值器
|
||
ok = OrdinaryKriging(
|
||
points[:, 0], points[:, 1], values,
|
||
variogram_model='linear', # 使用线性变异函数模型
|
||
verbose=False,
|
||
enable_plotting=False
|
||
)
|
||
|
||
# 计算网格点的方差
|
||
grid_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
|
||
z, ss = ok.execute('grid', grid_points[:, 0], grid_points[:, 1])
|
||
|
||
# ss是方差,转换为不确定性网格
|
||
uncertainty = ss.reshape(grid_xx.shape)
|
||
|
||
# 归一化到0-1范围
|
||
valid_uncertainty = uncertainty[~np.isnan(uncertainty)]
|
||
if len(valid_uncertainty) > 0:
|
||
min_unc = valid_uncertainty.min()
|
||
max_unc = valid_uncertainty.max()
|
||
if max_unc > min_unc:
|
||
uncertainty = (uncertainty - min_unc) / (max_unc - min_unc)
|
||
|
||
print("Kriging不确定性计算完成")
|
||
return uncertainty
|
||
|
||
except Exception as e:
|
||
print(f"Kriging不确定性计算失败: {e},将使用IDW方法")
|
||
return self._calculate_idw_uncertainty(points, grid_xx, grid_yy)
|
||
|
||
def _fill_boundary_blanks_with_distance_diffusion(self, grid_content, grid_xx, grid_yy, mask,
|
||
boundary_gdf, max_diffusion_distance=None,
|
||
power=2, n_neighbors=15):
|
||
"""
|
||
使用距离扩散方法填充边界附近的空白区域
|
||
|
||
Parameters:
|
||
-----------
|
||
grid_content : np.ndarray
|
||
插值网格数据
|
||
grid_xx : np.ndarray
|
||
网格X坐标
|
||
grid_yy : np.ndarray
|
||
网格Y坐标
|
||
mask : np.ndarray
|
||
边界掩膜(True表示边界内)
|
||
boundary_gdf : gpd.GeoDataFrame
|
||
边界几何数据
|
||
max_diffusion_distance : float, optional
|
||
最大扩散距离(单位与坐标相同)。如果为None,自动计算为网格分辨率的5倍
|
||
power : float, default=2
|
||
IDW距离衰减幂参数
|
||
n_neighbors : int, default=15
|
||
使用的最近邻点数
|
||
|
||
Returns:
|
||
--------
|
||
grid_content : np.ndarray
|
||
填充后的网格数据
|
||
"""
|
||
print("正在使用距离扩散方法填充边界空白区域...")
|
||
|
||
# 找到边界内的空白区域
|
||
nan_mask = np.isnan(grid_content)
|
||
within_boundary_nan = nan_mask & mask
|
||
|
||
if not np.any(within_boundary_nan):
|
||
print("边界内没有空白区域需要填充")
|
||
return grid_content
|
||
|
||
blank_count = np.sum(within_boundary_nan)
|
||
print(f"发现 {blank_count} 个边界内的空白点,开始距离扩散填充...")
|
||
|
||
# 找到边界内有效值的点
|
||
valid_mask = ~nan_mask & mask
|
||
if np.sum(valid_mask) == 0:
|
||
print("警告:边界内没有有效值,无法进行距离扩散")
|
||
return grid_content
|
||
|
||
# 计算网格分辨率(用于确定最大扩散距离)
|
||
if max_diffusion_distance is None:
|
||
# 自动计算:使用网格点之间的平均距离
|
||
dx = np.abs(grid_xx[0, 1] - grid_xx[0, 0]) if grid_xx.shape[1] > 1 else 1.0
|
||
dy = np.abs(grid_yy[1, 0] - grid_yy[0, 0]) if grid_xx.shape[0] > 1 else 1.0
|
||
avg_resolution = (dx + dy) / 2.0
|
||
max_diffusion_distance = avg_resolution * 5.0 # 5倍网格分辨率
|
||
print(f"自动计算最大扩散距离: {max_diffusion_distance:.6f}")
|
||
|
||
# 准备有效数据点
|
||
valid_points = np.column_stack((grid_xx[valid_mask], grid_yy[valid_mask]))
|
||
valid_values = grid_content[valid_mask]
|
||
|
||
# 准备空白点
|
||
blank_points = np.column_stack((grid_xx[within_boundary_nan], grid_yy[within_boundary_nan]))
|
||
|
||
print(f"使用 {len(valid_points)} 个有效点填充 {len(blank_points)} 个空白点...")
|
||
|
||
# 使用向量化计算距离矩阵
|
||
distances = cdist(blank_points, valid_points)
|
||
|
||
# 对每个空白点,找到最近的n_neighbors个有效点
|
||
n_neighbors = min(n_neighbors, len(valid_points))
|
||
|
||
# 应用最大扩散距离限制
|
||
if max_diffusion_distance > 0:
|
||
# 只考虑在最大扩散距离内的点
|
||
# 对于每个空白点,找到在扩散距离内的最近邻
|
||
filled_values = np.full(len(blank_points), np.nan)
|
||
global_mean = np.nanmean(valid_values)
|
||
|
||
for i in range(len(blank_points)):
|
||
point_distances = distances[i, :]
|
||
valid_idx = point_distances <= max_diffusion_distance
|
||
|
||
if np.any(valid_idx):
|
||
# 找到最近的n_neighbors个点(在扩散距离内)
|
||
valid_dist = point_distances[valid_idx]
|
||
valid_vals = valid_values[valid_idx]
|
||
|
||
# 如果有效点数量超过n_neighbors,只取最近的n_neighbors个
|
||
if len(valid_dist) > n_neighbors:
|
||
nearest_idx = np.argpartition(valid_dist, n_neighbors-1)[:n_neighbors]
|
||
valid_dist = valid_dist[nearest_idx]
|
||
valid_vals = valid_vals[nearest_idx]
|
||
|
||
# 避免除零
|
||
valid_dist = np.maximum(valid_dist, 1e-10)
|
||
|
||
# 计算IDW权重
|
||
weights = 1.0 / (valid_dist ** power)
|
||
weight_sum = np.sum(weights)
|
||
|
||
if weight_sum > 0:
|
||
# 距离加权平均
|
||
filled_values[i] = np.sum(weights * valid_vals) / weight_sum
|
||
else:
|
||
filled_values[i] = global_mean
|
||
else:
|
||
# 如果该点不在任何有效点的扩散距离内,使用全局平均值
|
||
filled_values[i] = global_mean
|
||
else:
|
||
# 不使用距离限制,对所有点进行IDW插值(批量处理以提高效率)
|
||
# 对每个空白点,找到最近的n_neighbors个点
|
||
nearest_indices = np.argpartition(distances, n_neighbors-1, axis=1)[:, :n_neighbors]
|
||
|
||
# 批量提取距离和值
|
||
nearest_dists = np.take_along_axis(distances, nearest_indices, axis=1)
|
||
nearest_vals = valid_values[nearest_indices]
|
||
|
||
# 避免除零
|
||
nearest_dists = np.maximum(nearest_dists, 1e-10)
|
||
|
||
# 批量计算IDW权重
|
||
weights = 1.0 / (nearest_dists ** power)
|
||
weight_sums = np.sum(weights, axis=1)
|
||
|
||
# 批量计算加权平均值
|
||
filled_values = np.sum(weights * nearest_vals, axis=1) / weight_sums
|
||
|
||
# 处理可能的NaN(如果weight_sum为0)
|
||
nan_mask = np.isnan(filled_values) | (weight_sums == 0)
|
||
if np.any(nan_mask):
|
||
filled_values[nan_mask] = np.nanmean(valid_values)
|
||
|
||
# 填充空白点
|
||
grid_content[within_boundary_nan] = filled_values
|
||
|
||
# 检查填充结果
|
||
filled_count = np.sum(~np.isnan(filled_values))
|
||
print(f"距离扩散填充完成:成功填充 {filled_count} / {blank_count} 个空白点")
|
||
|
||
return grid_content
|
||
|
||
def _calculate_model_variance_uncertainty(self, points, uncertainty_values, grid_xx, grid_yy):
|
||
"""
|
||
插值模型输出方差(MC Dropout等)到网格
|
||
|
||
Parameters:
|
||
-----------
|
||
points : np.ndarray
|
||
数据点坐标 (n_points, 2)
|
||
uncertainty_values : np.ndarray
|
||
数据点的不确定性值(方差)
|
||
grid_xx : np.ndarray
|
||
网格X坐标
|
||
grid_yy : np.ndarray
|
||
网格Y坐标
|
||
|
||
Returns:
|
||
--------
|
||
uncertainty : np.ndarray
|
||
不确定性网格(归一化到0-1)
|
||
"""
|
||
print("正在插值模型方差到网格...")
|
||
|
||
# 使用线性插值将不确定性值插值到网格
|
||
grid_uncertainty = griddata(
|
||
points, uncertainty_values, (grid_xx, grid_yy),
|
||
method='linear', fill_value=np.nan
|
||
)
|
||
|
||
# 如果有NaN,用最近邻填充
|
||
if np.any(np.isnan(grid_uncertainty)):
|
||
grid_nearest = griddata(
|
||
points, uncertainty_values, (grid_xx, grid_yy),
|
||
method='nearest'
|
||
)
|
||
nan_mask = np.isnan(grid_uncertainty)
|
||
grid_uncertainty[nan_mask] = grid_nearest[nan_mask]
|
||
|
||
# 归一化到0-1范围
|
||
valid_uncertainty = grid_uncertainty[~np.isnan(grid_uncertainty)]
|
||
if len(valid_uncertainty) > 0:
|
||
min_unc = valid_uncertainty.min()
|
||
max_unc = valid_uncertainty.max()
|
||
if max_unc > min_unc:
|
||
grid_uncertainty = (grid_uncertainty - min_unc) / (max_unc - min_unc)
|
||
|
||
print("模型方差插值完成")
|
||
return grid_uncertainty
|
||
|
||
def _perform_interpolation(self, points, values, grid_xx, grid_yy):
|
||
"""执行空间插值"""
|
||
print(f"插值输入检查:")
|
||
print(f" - 数据点数量: {len(points)}")
|
||
print(f" - 数据值范围: {values.min():.4f} - {values.max():.4f}")
|
||
print(f" - 网格大小: {grid_xx.shape}")
|
||
print(f" - 坐标系: {self.output_crs}")
|
||
|
||
# 检查数据的有效性
|
||
finite_mask = np.isfinite(values)
|
||
if not np.all(finite_mask):
|
||
print(f"警告:发现 {np.sum(~finite_mask)} 个无效数据值,将被移除")
|
||
points = points[finite_mask]
|
||
values = values[finite_mask]
|
||
|
||
if len(points) < 3:
|
||
raise ValueError(f"有效数据点不足3个(当前:{len(points)}个)")
|
||
|
||
try:
|
||
# 首先尝试使用线性插值
|
||
print("正在尝试线性插值...")
|
||
grid_content = griddata(
|
||
points, values, (grid_xx, grid_yy),
|
||
method='linear', fill_value=np.nan
|
||
)
|
||
|
||
# 检查线性插值结果
|
||
valid_linear = ~np.isnan(grid_content)
|
||
valid_count = np.sum(valid_linear)
|
||
print(f"线性插值结果:有效点数 {valid_count} / {grid_content.size}")
|
||
|
||
if valid_count > 0:
|
||
print(f"线性插值成功,有效区域覆盖率: {valid_count / grid_content.size * 100:.1f}%")
|
||
|
||
# 如果有NaN值,用最近邻插值填充
|
||
nan_count = np.sum(np.isnan(grid_content))
|
||
if nan_count > 0:
|
||
print(f"正在用最近邻插值填充 {nan_count} 个缺失值...")
|
||
grid_nearest = griddata(
|
||
points, values, (grid_xx, grid_yy),
|
||
method='nearest'
|
||
)
|
||
# 只填充线性插值的NaN区域
|
||
nan_mask = np.isnan(grid_content)
|
||
grid_content[nan_mask] = grid_nearest[nan_mask]
|
||
print("缺失值填充完成")
|
||
|
||
# 最终检查
|
||
final_valid = ~np.isnan(grid_content)
|
||
print(f"最终有效点数: {np.sum(final_valid)} / {grid_content.size}")
|
||
|
||
return grid_content
|
||
else:
|
||
print("线性插值失败,尝试最近邻插值...")
|
||
|
||
except Exception as e:
|
||
print(f"线性插值失败: {e}")
|
||
print("尝试最近邻插值...")
|
||
|
||
try:
|
||
# 使用最近邻插值作为备选方案
|
||
print("执行最近邻插值...")
|
||
grid_content = griddata(
|
||
points, values, (grid_xx, grid_yy),
|
||
method='nearest'
|
||
)
|
||
|
||
valid_count = np.sum(~np.isnan(grid_content))
|
||
print(f"最近邻插值成功,有效点数: {valid_count}")
|
||
|
||
if valid_count == 0:
|
||
raise ValueError("最近邻插值也没有产生有效结果")
|
||
|
||
return grid_content
|
||
|
||
except Exception as e:
|
||
print(f"最近邻插值也失败: {e}")
|
||
|
||
# 对于地理坐标系,尝试更简单的方法
|
||
if self.output_crs == 'EPSG:4326':
|
||
print("地理坐标系检测到,尝试简化插值...")
|
||
try:
|
||
# 创建一个基于距离的简单插值
|
||
grid_content = np.full(grid_xx.shape, np.nan)
|
||
|
||
# 为每个网格点找到最近的数据点
|
||
for i in range(grid_xx.shape[0]):
|
||
for j in range(grid_xx.shape[1]):
|
||
grid_x, grid_y = grid_xx[i, j], grid_yy[i, j]
|
||
|
||
# 计算到所有数据点的距离
|
||
distances = np.sqrt((points[:, 0] - grid_x) ** 2 + (points[:, 1] - grid_y) ** 2)
|
||
nearest_idx = np.argmin(distances)
|
||
|
||
# 如果距离不是太远,就使用该值
|
||
if distances[nearest_idx] < (grid_xx.max() - grid_xx.min()) * 0.1: # 10%的范围内
|
||
grid_content[i, j] = values[nearest_idx]
|
||
|
||
valid_count = np.sum(~np.isnan(grid_content))
|
||
print(f"简化插值完成,有效点数: {valid_count}")
|
||
|
||
if valid_count > 0:
|
||
return grid_content
|
||
else:
|
||
raise ValueError("简化插值也没有产生有效结果")
|
||
|
||
except Exception as e3:
|
||
print(f"简化插值失败: {e3}")
|
||
|
||
print("尝试立方插值作为最后手段...")
|
||
try:
|
||
# 最后尝试立方插值
|
||
grid_content = griddata(
|
||
points, values, (grid_xx, grid_yy),
|
||
method='cubic', fill_value=np.nan
|
||
)
|
||
|
||
# 如果立方插值有NaN,用最近邻填充
|
||
if np.any(np.isnan(grid_content)):
|
||
print("用最近邻插值填充立方插值的NaN值...")
|
||
grid_nearest = griddata(
|
||
points, values, (grid_xx, grid_yy),
|
||
method='nearest'
|
||
)
|
||
nan_mask = np.isnan(grid_content)
|
||
grid_content[nan_mask] = grid_nearest[nan_mask]
|
||
|
||
valid_count = np.sum(~np.isnan(grid_content))
|
||
print(f"立方插值成功,有效点数: {valid_count}")
|
||
return grid_content
|
||
|
||
except Exception as e4:
|
||
print(f"立方插值也失败: {e4}")
|
||
print(f"所有插值方法都失败")
|
||
raise ValueError("无法完成空间插值,请检查数据点的分布和数值")
|
||
|
||
def read_csv_data(self, csv_file, uncertainty_col=None):
|
||
"""
|
||
读取CSV文件并进行坐标转换
|
||
|
||
Parameters:
|
||
-----------
|
||
csv_file : str
|
||
CSV文件路径
|
||
uncertainty_col : str, optional
|
||
不确定性数据列名。如果为None,将自动检测包含'variance'、'uncertainty'、'std'、'sigma'、'var'、'mc_dropout'的列
|
||
|
||
Returns:
|
||
--------
|
||
gdf : gpd.GeoDataFrame
|
||
包含坐标和含量数据的GeoDataFrame,如果找到不确定性列,会包含'uncertainty'列
|
||
"""
|
||
print("正在读取CSV文件...")
|
||
df = pd.read_csv(csv_file, encoding='utf-8')
|
||
|
||
# 假设前三列分别是经度、纬度、含量
|
||
if df.shape[1] < 3:
|
||
raise ValueError("CSV文件必须至少包含3列:经度、纬度、含量")
|
||
|
||
# 获取列名
|
||
lon_col = df.columns[0]
|
||
lat_col = df.columns[1]
|
||
content_col = df.columns[2]
|
||
|
||
print(f"检测到列名:经度({lon_col}),纬度({lat_col}),含量({content_col})")
|
||
|
||
# 自动检测不确定性列
|
||
if uncertainty_col is None:
|
||
uncertainty_keywords = ['variance', 'uncertainty', 'std', 'sigma', 'var', 'mc_dropout']
|
||
for col in df.columns:
|
||
col_lower = col.lower()
|
||
if any(keyword in col_lower for keyword in uncertainty_keywords):
|
||
uncertainty_col = col
|
||
print(f"自动检测到不确定性列: {uncertainty_col}")
|
||
break
|
||
|
||
# 坐标转换
|
||
print(f"正在进行坐标转换: {self.input_crs} -> {self.output_crs}")
|
||
transformed_x, transformed_y = self.transformer.transform(
|
||
df[lon_col].values,
|
||
df[lat_col].values
|
||
)
|
||
|
||
# 创建GeoDataFrame
|
||
geometry = [Point(x, y) for x, y in zip(transformed_x, transformed_y)]
|
||
gdf = gpd.GeoDataFrame(
|
||
df,
|
||
geometry=geometry,
|
||
crs=self.output_crs
|
||
)
|
||
|
||
gdf['proj_x'] = transformed_x
|
||
gdf['proj_y'] = transformed_y
|
||
gdf['content'] = df[content_col]
|
||
|
||
# 如果找到不确定性列,添加到GeoDataFrame
|
||
if uncertainty_col and uncertainty_col in df.columns:
|
||
gdf['uncertainty'] = df[uncertainty_col].values
|
||
print(f"已加载不确定性数据列: {uncertainty_col}")
|
||
print(f"不确定性值范围: {gdf['uncertainty'].min():.4f} - {gdf['uncertainty'].max():.4f}")
|
||
|
||
print(f"成功读取 {len(gdf)} 个数据点")
|
||
return gdf
|
||
|
||
def read_boundary_shapefile(self, shp_file):
|
||
"""读取边界shapefile"""
|
||
print("正在读取边界文件...")
|
||
boundary = gpd.read_file(shp_file)
|
||
|
||
# 确保边界文件使用目标投影坐标系
|
||
if boundary.crs != self.output_crs:
|
||
print(f"正在转换边界文件坐标系到 {self.output_crs}...")
|
||
boundary = boundary.to_crs(self.output_crs)
|
||
|
||
print(f"边界文件包含 {len(boundary)} 个要素")
|
||
return boundary
|
||
|
||
def _identify_edge_points(self, points_gdf):
|
||
"""
|
||
识别边缘采样点(使用凸包方法)
|
||
|
||
Parameters:
|
||
-----------
|
||
points_gdf : gpd.GeoDataFrame
|
||
采样点GeoDataFrame
|
||
|
||
Returns:
|
||
--------
|
||
edge_indices : np.ndarray
|
||
边缘点的索引数组
|
||
"""
|
||
print("正在识别边缘采样点...")
|
||
|
||
# 获取所有点的坐标
|
||
points = np.column_stack((points_gdf['proj_x'].values, points_gdf['proj_y'].values))
|
||
|
||
if len(points) < 3:
|
||
print("警告:采样点数量少于3个,无法识别边缘点")
|
||
return np.array([])
|
||
|
||
try:
|
||
# 使用凸包识别边缘点
|
||
hull = ConvexHull(points)
|
||
edge_indices = hull.vertices
|
||
|
||
print(f"识别到 {len(edge_indices)} 个边缘采样点(共 {len(points)} 个点)")
|
||
return edge_indices
|
||
except Exception as e:
|
||
print(f"识别边缘点时出错: {e},将使用所有点作为边缘点")
|
||
return np.arange(len(points))
|
||
|
||
def _expand_edge_points(self, points_gdf, boundary_gdf, resolution=100, expand_ratio=0.05):
|
||
"""
|
||
对边缘采样点进行外扩处理,外扩到整个图像的边界(包括外扩后的边界)
|
||
按照指定的间距(resolution)生成外扩点,铺满整个画面
|
||
|
||
Parameters:
|
||
-----------
|
||
points_gdf : gpd.GeoDataFrame
|
||
原始采样点GeoDataFrame
|
||
boundary_gdf : gpd.GeoDataFrame
|
||
水域掩膜边界GeoDataFrame
|
||
resolution : float, default=100
|
||
外扩点的间距(单位与坐标相同),与插值网格分辨率一致
|
||
expand_ratio : float, default=0.05
|
||
边界外扩比例(与create_interpolation_grid中的expand_ratio一致)
|
||
|
||
Returns:
|
||
--------
|
||
expanded_gdf : gpd.GeoDataFrame
|
||
外扩后的采样点GeoDataFrame
|
||
"""
|
||
print(f"正在对边缘采样点进行外扩处理(按照 {resolution} 的间距外扩到整个图像边界)...")
|
||
|
||
# 识别边缘点
|
||
edge_indices = self._identify_edge_points(points_gdf)
|
||
|
||
if len(edge_indices) == 0:
|
||
print("未识别到边缘点,跳过外扩处理")
|
||
return points_gdf.copy()
|
||
|
||
# 获取水域掩膜的边界范围
|
||
boundary_bounds = boundary_gdf.total_bounds # [minx, miny, maxx, maxy]
|
||
mask_minx, mask_miny, mask_maxx, mask_maxy = boundary_bounds
|
||
|
||
# 计算范围大小
|
||
width = mask_maxx - mask_minx
|
||
height = mask_maxy - mask_miny
|
||
|
||
# 外扩边界,与create_interpolation_grid中的逻辑一致,确保外扩到整个图像范围
|
||
expand_x = width * expand_ratio
|
||
expand_y = height * expand_ratio
|
||
image_minx = mask_minx - expand_x
|
||
image_maxx = mask_maxx + expand_x
|
||
image_miny = mask_miny - expand_y
|
||
image_maxy = mask_maxy + expand_y
|
||
|
||
# 获取所有点的坐标
|
||
points = np.column_stack((points_gdf['proj_x'].values, points_gdf['proj_y'].values))
|
||
|
||
# 计算点集的范围和中心
|
||
x_min, x_max = points[:, 0].min(), points[:, 0].max()
|
||
y_min, y_max = points[:, 1].min(), points[:, 1].max()
|
||
center = np.array([(x_min + x_max) / 2, (y_min + y_max) / 2])
|
||
|
||
# 存储新添加的点
|
||
new_points_list = []
|
||
new_data_list = []
|
||
|
||
# 对每个边缘点进行外扩
|
||
for edge_idx in edge_indices:
|
||
edge_point = points[edge_idx]
|
||
|
||
# 计算从中心到边缘点的方向向量
|
||
direction = edge_point - center
|
||
distance_to_center = np.linalg.norm(direction)
|
||
|
||
if distance_to_center < 1e-10:
|
||
# 如果边缘点就是中心点,跳过
|
||
continue
|
||
|
||
# 归一化方向向量
|
||
direction_unit = direction / distance_to_center
|
||
|
||
# 计算该方向与水域掩膜边界的交点
|
||
# 使用射线法:从边缘点沿方向延伸,找到与边界框的交点
|
||
max_distance = 0
|
||
|
||
# 检查与四个边界的交点(使用整个图像的范围,包括外扩后的边界)
|
||
# 上边界 (y = image_maxy)
|
||
if direction_unit[1] > 1e-10: # 向上
|
||
t = (image_maxy - edge_point[1]) / direction_unit[1]
|
||
if t > 0:
|
||
intersect_x = edge_point[0] + direction_unit[0] * t
|
||
if image_minx <= intersect_x <= image_maxx:
|
||
max_distance = max(max_distance, t)
|
||
|
||
# 下边界 (y = image_miny)
|
||
if direction_unit[1] < -1e-10: # 向下
|
||
t = (image_miny - edge_point[1]) / direction_unit[1]
|
||
if t > 0:
|
||
intersect_x = edge_point[0] + direction_unit[0] * t
|
||
if image_minx <= intersect_x <= image_maxx:
|
||
max_distance = max(max_distance, t)
|
||
|
||
# 右边界 (x = image_maxx)
|
||
if direction_unit[0] > 1e-10: # 向右
|
||
t = (image_maxx - edge_point[0]) / direction_unit[0]
|
||
if t > 0:
|
||
intersect_y = edge_point[1] + direction_unit[1] * t
|
||
if image_miny <= intersect_y <= image_maxy:
|
||
max_distance = max(max_distance, t)
|
||
|
||
# 左边界 (x = image_minx)
|
||
if direction_unit[0] < -1e-10: # 向左
|
||
t = (image_minx - edge_point[0]) / direction_unit[0]
|
||
if t > 0:
|
||
intersect_y = edge_point[1] + direction_unit[1] * t
|
||
if image_miny <= intersect_y <= image_maxy:
|
||
max_distance = max(max_distance, t)
|
||
|
||
# 如果找到了边界交点,按照resolution间距创建外扩点
|
||
if max_distance > 1e-10:
|
||
# 从边缘点到边界的距离
|
||
distance_to_boundary = max_distance
|
||
|
||
# 计算需要生成的外扩点数量(按照resolution间距)
|
||
# 使用ceil确保能铺满到边界
|
||
n_points = int(np.ceil(distance_to_boundary / resolution))
|
||
|
||
# 从边缘点开始,按照resolution间距生成点,直到边界
|
||
for i in range(1, n_points + 1):
|
||
# 计算外扩距离(从边缘点开始,按照resolution间距)
|
||
expand_distance = i * resolution
|
||
|
||
# 如果超过边界距离,使用边界距离作为最后一个点
|
||
if expand_distance >= distance_to_boundary:
|
||
expand_distance = distance_to_boundary
|
||
|
||
# 计算新点位置
|
||
new_point = edge_point + direction_unit * expand_distance
|
||
|
||
# 确保新点在图像范围内(包括外扩后的边界)
|
||
new_point[0] = np.clip(new_point[0], image_minx, image_maxx)
|
||
new_point[1] = np.clip(new_point[1], image_miny, image_maxy)
|
||
|
||
# 创建新点的数据(复制边缘点的所有属性)
|
||
new_row = points_gdf.iloc[edge_idx].copy()
|
||
new_row['proj_x'] = new_point[0]
|
||
new_row['proj_y'] = new_point[1]
|
||
|
||
# 更新geometry
|
||
new_row['geometry'] = Point(new_point[0], new_point[1])
|
||
|
||
new_points_list.append(new_point)
|
||
new_data_list.append(new_row)
|
||
|
||
# 如果已经到达边界,停止生成
|
||
if expand_distance >= distance_to_boundary:
|
||
break
|
||
|
||
# 合并原始点和外扩点
|
||
if len(new_data_list) > 0:
|
||
# 创建新点的GeoDataFrame
|
||
expanded_gdf = gpd.GeoDataFrame(new_data_list, crs=points_gdf.crs)
|
||
|
||
# 合并原始点和外扩点(使用gpd.concat以确保geometry列正确处理)
|
||
result_gdf = gpd.GeoDataFrame(pd.concat([points_gdf, expanded_gdf], ignore_index=True), crs=points_gdf.crs)
|
||
|
||
print(f"外扩完成:原始点 {len(points_gdf)} 个,边缘点 {len(edge_indices)} 个,"
|
||
f"新增外扩点 {len(new_data_list)} 个(间距 {resolution}),总计 {len(result_gdf)} 个点")
|
||
print(f"水域掩膜范围: X[{mask_minx:.2f}, {mask_maxx:.2f}], Y[{mask_miny:.2f}, {mask_maxy:.2f}]")
|
||
print(f"图像范围(含外扩): X[{image_minx:.2f}, {image_maxx:.2f}], Y[{image_miny:.2f}, {image_maxy:.2f}]")
|
||
|
||
return result_gdf
|
||
else:
|
||
print("未生成外扩点,返回原始点集")
|
||
return points_gdf.copy()
|
||
|
||
def create_interpolation_grid(self, points_gdf, boundary_gdf, resolution=100, expand_ratio=0.05,
|
||
uncertainty_method='auto', calculate_uncertainty=True,
|
||
use_distance_diffusion=True, max_diffusion_distance=None,
|
||
diffusion_power=2, diffusion_n_neighbors=15):
|
||
"""
|
||
创建插值网格
|
||
|
||
Parameters:
|
||
-----------
|
||
expand_ratio : float, default=0.05
|
||
边界外扩比例(5%),确保图像边界不完全挨着地图
|
||
uncertainty_method : str, default='auto'
|
||
不确定性计算方法:'auto', 'kriging', 'idw', 'rbf', 'model_variance'
|
||
calculate_uncertainty : bool, default=True
|
||
是否计算不确定性网格
|
||
use_distance_diffusion : bool, default=True
|
||
是否使用距离扩散方法填充边界空白区域
|
||
max_diffusion_distance : float, optional
|
||
最大扩散距离(单位与坐标相同)。如果为None,自动计算为网格分辨率的5倍
|
||
diffusion_power : float, default=2
|
||
距离扩散的IDW幂参数,值越大,距离衰减越快
|
||
diffusion_n_neighbors : int, default=15
|
||
距离扩散使用的最近邻点数
|
||
|
||
Returns:
|
||
--------
|
||
grid_xx, grid_yy, grid_content, bounds : tuple
|
||
如果calculate_uncertainty=False
|
||
grid_xx, grid_yy, grid_content, bounds, grid_uncertainty : tuple
|
||
如果calculate_uncertainty=True
|
||
"""
|
||
print("正在创建插值网格...")
|
||
|
||
# 获取边界范围
|
||
bounds = boundary_gdf.total_bounds
|
||
minx, miny, maxx, maxy = bounds
|
||
|
||
print(f"原始边界范围: X({minx:.6f} - {maxx:.6f}), Y({miny:.6f} - {maxy:.6f})")
|
||
|
||
# 计算范围大小
|
||
width = maxx - minx
|
||
height = maxy - miny
|
||
|
||
# 外扩边界,确保图像不完全挨着地图
|
||
expand_x = width * expand_ratio
|
||
expand_y = height * expand_ratio
|
||
minx -= expand_x
|
||
maxx += expand_x
|
||
miny -= expand_y
|
||
maxy += expand_y
|
||
|
||
print(f"外扩后边界范围: X({minx:.6f} - {maxx:.6f}), Y({miny:.6f} - {maxy:.6f})")
|
||
print(f"外扩比例: {expand_ratio * 100:.1f}%")
|
||
|
||
if self.output_crs == 'EPSG:4326':
|
||
print(f"区域尺寸: 宽度={width:.6f}°, 高度={height:.6f}°")
|
||
# 对于地理坐标系,需要调整分辨率单位(度)
|
||
# 1度约等于111公里,所以100米约等于0.0009度
|
||
resolution_deg = resolution / 111000.0 # 将米转换为度
|
||
print(f"网格分辨率: {resolution}m ≈ {resolution_deg:.6f}°")
|
||
else:
|
||
print(f"区域尺寸: 宽度={width:.2f}m, 高度={height:.2f}m")
|
||
resolution_deg = resolution
|
||
|
||
# 检查分辨率是否合理
|
||
min_grid_points = 50 # 增加最少网格点数以获得更平滑的插值效果
|
||
|
||
if self.output_crs == 'EPSG:4326':
|
||
# 地理坐标系的网格点计算
|
||
grid_points_x = max(int(width / resolution_deg), min_grid_points)
|
||
grid_points_y = max(int(height / resolution_deg), min_grid_points)
|
||
else:
|
||
# 投影坐标系的网格点计算
|
||
grid_points_x = max(int(width / resolution), min_grid_points)
|
||
grid_points_y = max(int(height / resolution), min_grid_points)
|
||
|
||
# 确保网格足够密集以获得平滑效果
|
||
grid_points_x = max(grid_points_x, 100)
|
||
grid_points_y = max(grid_points_y, 100)
|
||
|
||
# 创建网格
|
||
grid_x = np.linspace(minx, maxx, grid_points_x)
|
||
grid_y = np.linspace(miny, maxy, grid_points_y)
|
||
grid_xx, grid_yy = np.meshgrid(grid_x, grid_y)
|
||
|
||
print(f"网格大小: {grid_xx.shape[1]} x {grid_xx.shape[0]} (宽 x 高)")
|
||
|
||
# 检查网格大小
|
||
if grid_xx.shape[0] < 2 or grid_xx.shape[1] < 2:
|
||
raise ValueError(f"网格尺寸太小 {grid_xx.shape},无法进行插值。请检查数据范围和分辨率设置。")
|
||
|
||
# 准备插值数据(使用原始点+外扩点的合并数据)
|
||
points = np.column_stack((points_gdf['proj_x'], points_gdf['proj_y']))
|
||
values = points_gdf['content'].values
|
||
|
||
print(f"插值数据点数量: {len(points)}(包含原始采样点和外扩点)")
|
||
print(f"数据点范围: X({points[:, 0].min():.6f} - {points[:, 0].max():.6f}), "
|
||
f"Y({points[:, 1].min():.6f} - {points[:, 1].max():.6f})")
|
||
print(f"含量值范围: {values.min():.4f} - {values.max():.4f}")
|
||
print(f"含量值统计: 平均={values.mean():.4f}, 标准差={values.std():.4f}")
|
||
|
||
# 检查数据点数量
|
||
if len(points) < 3:
|
||
raise ValueError("插值需要至少3个数据点")
|
||
|
||
# 检查数据点的几何分布
|
||
self._check_point_distribution(points)
|
||
|
||
# 执行插值(先对整个网格插值,包括边界外)
|
||
print("正在执行空间插值(整个网格,包括边界外)...")
|
||
grid_content = self._perform_interpolation(points, values, grid_xx, grid_yy)
|
||
|
||
# 创建边界掩膜(用于识别边界内外)
|
||
print("正在识别边界区域...")
|
||
# 创建掩膜
|
||
mask_points = np.column_stack((grid_xx.ravel(), grid_yy.ravel()))
|
||
mask_geometry = [Point(x, y) for x, y in mask_points]
|
||
mask_gdf = gpd.GeoDataFrame(geometry=mask_geometry, crs=self.output_crs)
|
||
|
||
# 检查哪些点在边界内
|
||
within_boundary = mask_gdf.within(boundary_gdf.unary_union)
|
||
mask = within_boundary.values.reshape(grid_xx.shape)
|
||
|
||
# 找到边界边缘上的点(在边界内,但靠近边界)
|
||
print("正在提取边界边缘值并填充边界外区域...")
|
||
|
||
# 方法:找到边界内有效值的边缘点,然后填充到边界外
|
||
# 1. 先填充边界内的NaN(使用距离扩散方法)
|
||
nan_mask = np.isnan(grid_content)
|
||
within_boundary_nan = nan_mask & mask
|
||
|
||
if np.any(within_boundary_nan):
|
||
if use_distance_diffusion:
|
||
# 使用距离扩散方法填充边界内的空白区域
|
||
grid_content = self._fill_boundary_blanks_with_distance_diffusion(
|
||
grid_content, grid_xx, grid_yy, mask, boundary_gdf,
|
||
max_diffusion_distance=max_diffusion_distance,
|
||
power=diffusion_power,
|
||
n_neighbors=diffusion_n_neighbors
|
||
)
|
||
else:
|
||
# 使用传统的最近邻插值方法
|
||
print(f"填充边界内的 {np.sum(within_boundary_nan)} 个NaN点(使用最近邻插值)...")
|
||
valid_mask = ~nan_mask & mask
|
||
if np.sum(valid_mask) > 0:
|
||
valid_points = np.column_stack((grid_xx[valid_mask], grid_yy[valid_mask]))
|
||
valid_values = grid_content[valid_mask]
|
||
nan_points = np.column_stack((grid_xx[within_boundary_nan], grid_yy[within_boundary_nan]))
|
||
|
||
filled_values = griddata(
|
||
valid_points, valid_values, nan_points,
|
||
method='nearest'
|
||
)
|
||
grid_content[within_boundary_nan] = filled_values
|
||
print(f"边界内填充完成")
|
||
|
||
# 2. 找到边界边缘的值(边界内但靠近边界外的点)
|
||
# 使用形态学操作找到边界边缘
|
||
boundary_mask_binary = mask.astype(int)
|
||
# 创建边界外掩膜
|
||
outside_mask = ~mask
|
||
|
||
# 找到边界边缘(在边界内,但相邻有边界外的点)
|
||
# 对边界外区域进行膨胀,找到边界边缘
|
||
kernel = np.ones((3, 3), dtype=bool)
|
||
dilated_outside = ndimage.binary_dilation(outside_mask, structure=kernel)
|
||
edge_mask = mask & dilated_outside # 边界内但靠近边界外的点
|
||
|
||
# 3. 提取边缘值,填充到边界外
|
||
if np.any(edge_mask):
|
||
edge_values = grid_content[edge_mask]
|
||
edge_valid = ~np.isnan(edge_values)
|
||
if np.any(edge_valid):
|
||
# 使用边缘的有效值填充边界外
|
||
edge_mean = np.nanmean(edge_values)
|
||
print(f"边界边缘平均值: {edge_mean:.4f}")
|
||
|
||
# 将边缘值填充到边界外的所有NaN点
|
||
outside_nan = outside_mask & np.isnan(grid_content)
|
||
if np.any(outside_nan):
|
||
# 使用最近邻插值从边缘值填充
|
||
edge_points = np.column_stack((grid_xx[edge_mask & ~np.isnan(grid_content)],
|
||
grid_yy[edge_mask & ~np.isnan(grid_content)]))
|
||
if len(edge_points) > 0:
|
||
edge_vals = grid_content[edge_mask & ~np.isnan(grid_content)]
|
||
outside_points = np.column_stack((grid_xx[outside_nan], grid_yy[outside_nan]))
|
||
|
||
outside_filled = griddata(
|
||
edge_points, edge_vals, outside_points,
|
||
method='nearest'
|
||
)
|
||
grid_content[outside_nan] = outside_filled
|
||
print(f"已填充边界外的 {np.sum(~np.isnan(outside_filled))} 个点")
|
||
else:
|
||
# 如果没有边缘值,使用边缘平均值填充
|
||
grid_content[outside_nan] = edge_mean
|
||
print(f"使用边缘平均值填充边界外的 {np.sum(outside_nan)} 个点")
|
||
else:
|
||
print("边界外区域已全部填充")
|
||
else:
|
||
# 如果边缘没有有效值,使用全局平均值填充边界外
|
||
global_mean = np.nanmean(grid_content[mask])
|
||
if not np.isnan(global_mean):
|
||
grid_content[outside_mask & np.isnan(grid_content)] = global_mean
|
||
print(f"使用全局平均值 {global_mean:.4f} 填充边界外")
|
||
else:
|
||
# 如果没有找到边缘,直接使用边界内的平均值填充边界外
|
||
mean_in_boundary = np.nanmean(grid_content[mask])
|
||
if not np.isnan(mean_in_boundary):
|
||
grid_content[outside_mask & np.isnan(grid_content)] = mean_in_boundary
|
||
print(f"使用边界内平均值 {mean_in_boundary:.4f} 填充边界外")
|
||
|
||
print("整个画面已铺满,边界外区域已用边缘值填充")
|
||
|
||
# 最终检查:确保边界内所有区域都有值
|
||
final_check_nan = np.isnan(grid_content) & mask
|
||
if np.any(final_check_nan):
|
||
print(f"警告: 仍有 {np.sum(final_check_nan)} 个边界内的点未填充,使用平均值填充...")
|
||
if np.sum(~np.isnan(grid_content) & mask) > 0:
|
||
mean_value = np.nanmean(grid_content[mask])
|
||
grid_content[final_check_nan] = mean_value
|
||
print(f" 使用平均值 {mean_value:.4f} 填充剩余 {np.sum(final_check_nan)} 个点")
|
||
else:
|
||
# 如果边界内完全没有有效值,使用全局平均值
|
||
global_mean = np.nanmean(grid_content)
|
||
if not np.isnan(global_mean):
|
||
grid_content[final_check_nan] = global_mean
|
||
else:
|
||
grid_content[final_check_nan] = 0
|
||
print(" 使用全局平均值填充")
|
||
else:
|
||
print("边界内所有区域已完全填充")
|
||
|
||
# 检查插值结果
|
||
valid_data = ~np.isnan(grid_content)
|
||
valid_count = np.sum(valid_data)
|
||
print(f"有效插值点数量: {valid_count} / {grid_content.size}")
|
||
|
||
if valid_count == 0:
|
||
raise ValueError("边界掩膜后没有有效数据点,请检查数据点是否在边界范围内")
|
||
|
||
if valid_count < 4:
|
||
print("警告:有效数据点很少,可能影响绘图效果")
|
||
|
||
# 输出插值结果的统计信息
|
||
valid_values = grid_content[valid_data]
|
||
print(
|
||
f"插值后数据统计: 最小值={valid_values.min():.4f}, 最大值={valid_values.max():.4f}, 平均值={valid_values.mean():.4f}")
|
||
|
||
# 计算不确定性网格
|
||
grid_uncertainty = None
|
||
if calculate_uncertainty:
|
||
print("\n开始计算不确定性网格...")
|
||
|
||
# 确定使用的不确定性方法
|
||
if uncertainty_method == 'auto':
|
||
# 自动选择:如果有模型方差数据,使用model_variance;否则尝试kriging,失败则用idw
|
||
if 'uncertainty' in points_gdf.columns:
|
||
uncertainty_method = 'model_variance'
|
||
print("检测到模型方差数据,使用model_variance方法")
|
||
elif PYKRIGE_AVAILABLE:
|
||
uncertainty_method = 'kriging'
|
||
print("使用Kriging方法计算不确定性")
|
||
else:
|
||
uncertainty_method = 'idw'
|
||
print("使用IDW方法计算不确定性")
|
||
|
||
# 根据方法计算不确定性
|
||
if uncertainty_method == 'model_variance':
|
||
if 'uncertainty' not in points_gdf.columns:
|
||
print("警告: 未找到不确定性数据列,改用IDW方法")
|
||
uncertainty_method = 'idw'
|
||
else:
|
||
uncertainty_values = points_gdf['uncertainty'].values
|
||
# 过滤无效值
|
||
valid_unc_mask = np.isfinite(uncertainty_values)
|
||
if np.sum(valid_unc_mask) > 0:
|
||
grid_uncertainty = self._calculate_model_variance_uncertainty(
|
||
points[valid_unc_mask], uncertainty_values[valid_unc_mask],
|
||
grid_xx, grid_yy
|
||
)
|
||
else:
|
||
print("警告: 不确定性数据无效,改用IDW方法")
|
||
uncertainty_method = 'idw'
|
||
|
||
if uncertainty_method == 'kriging':
|
||
grid_uncertainty = self._calculate_kriging_uncertainty(points, values, grid_xx, grid_yy)
|
||
elif uncertainty_method == 'idw':
|
||
grid_uncertainty = self._calculate_idw_uncertainty(points, grid_xx, grid_yy)
|
||
elif uncertainty_method == 'rbf':
|
||
grid_uncertainty = self._calculate_rbf_uncertainty(points, grid_xx, grid_yy)
|
||
|
||
# 应用边界掩膜到不确定性网格
|
||
if grid_uncertainty is not None:
|
||
# 边界外的区域设为NaN
|
||
grid_uncertainty[~mask] = np.nan
|
||
valid_unc = grid_uncertainty[~np.isnan(grid_uncertainty)]
|
||
if len(valid_unc) > 0:
|
||
print(f"不确定性统计: 最小值={valid_unc.min():.4f}, 最大值={valid_unc.max():.4f}, 平均值={valid_unc.mean():.4f}")
|
||
else:
|
||
print("警告: 不确定性网格中没有有效数据")
|
||
|
||
# 返回外扩后的bounds
|
||
expanded_bounds = np.array([minx, miny, maxx, maxy])
|
||
|
||
if calculate_uncertainty and grid_uncertainty is not None:
|
||
return grid_xx, grid_yy, grid_content, expanded_bounds, grid_uncertainty
|
||
else:
|
||
return grid_xx, grid_yy, grid_content, expanded_bounds
|
||
|
||
def create_content_map(self, points_gdf, boundary_gdf, grid_xx, grid_yy,
|
||
grid_content, bounds, output_file='content_map.png',
|
||
show_sample_points=False, base_map_tif=None,
|
||
grid_uncertainty=None, show_uncertainty=True,
|
||
uncertainty_alpha=0.5, uncertainty_threshold=0.5,
|
||
uncertainty_cmap='Reds', cmap='viridis'):
|
||
"""
|
||
创建含量图
|
||
|
||
Parameters:
|
||
-----------
|
||
base_map_tif : str, optional
|
||
TIF正射底图文件路径。如果提供,将在水域掩膜外显示底图
|
||
grid_uncertainty : np.ndarray, optional
|
||
不确定性网格
|
||
show_uncertainty : bool, default=True
|
||
是否显示不确定性叠加层
|
||
uncertainty_alpha : float, default=0.5
|
||
不确定性叠加层透明度(0-1)
|
||
uncertainty_threshold : float, default=0.5
|
||
不确定性显示阈值(0-1),只显示高于此阈值的不确定性区域
|
||
uncertainty_cmap : str, default='Reds'
|
||
不确定性颜色映射
|
||
cmap : str, default='viridis'
|
||
含量数据的颜色映射
|
||
"""
|
||
print("正在生成含量图...")
|
||
|
||
# 检查网格数据
|
||
print(f"网格形状: {grid_content.shape}")
|
||
|
||
# 创建边界掩膜(用于绘图时只显示边界内)
|
||
print("创建边界掩膜用于绘图...")
|
||
try:
|
||
# 创建网格点的GeoDataFrame
|
||
grid_points = gpd.GeoDataFrame(
|
||
geometry=[Point(x, y) for x, y in zip(grid_xx.flatten(), grid_yy.flatten())],
|
||
crs=points_gdf.crs
|
||
)
|
||
# 检查哪些点在边界内
|
||
within_boundary = grid_points.within(boundary_gdf.unary_union)
|
||
mask = within_boundary.values.reshape(grid_xx.shape)
|
||
print(f"边界内点数: {np.sum(mask)} / {mask.size}")
|
||
except Exception as e:
|
||
print(f"创建边界掩膜时出现错误: {e},继续绘图...")
|
||
mask = np.ones_like(grid_content, dtype=bool) # 如果失败,显示全部
|
||
|
||
valid_data = ~np.isnan(grid_content)
|
||
if np.sum(valid_data) == 0:
|
||
raise ValueError("没有有效的插值数据用于绘图")
|
||
|
||
# 计算数据统计
|
||
valid_values = grid_content[valid_data]
|
||
print(
|
||
f"插值结果统计: 最小值={valid_values.min():.4f}, 最大值={valid_values.max():.4f}, 平均值={valid_values.mean():.4f}")
|
||
print(f"有效数据点数量: {np.sum(valid_data)} / {grid_content.size}")
|
||
|
||
# 检查数据范围
|
||
data_range = valid_values.max() - valid_values.min()
|
||
print(f"数据范围: {data_range:.6f}")
|
||
|
||
if data_range == 0:
|
||
print("警告:所有数据值都相同,将使用单一颜色显示")
|
||
|
||
# 创建图形
|
||
fig, ax = plt.subplots(figsize=(12, 10))
|
||
|
||
# 如果提供了底图,先绘制底图(在水域掩膜外)
|
||
if base_map_tif is not None:
|
||
try:
|
||
print(f"正在加载底图: {base_map_tif}")
|
||
self._add_base_map(ax, base_map_tif, bounds, mask, grid_xx, grid_yy, boundary_gdf)
|
||
print("底图加载成功")
|
||
except Exception as e:
|
||
print(f"加载底图失败: {e},将跳过底图显示")
|
||
|
||
# 设置颜色映射参数
|
||
im = None
|
||
|
||
try:
|
||
if data_range > 0:
|
||
# 设置颜色范围,确保有足够的对比度
|
||
vmin = valid_values.min()
|
||
vmax = valid_values.max()
|
||
|
||
# 如果范围很小,稍微扩展一下以增加对比度
|
||
if data_range < 1e-6:
|
||
center = valid_values.mean()
|
||
expansion = max(abs(center) * 0.01, 1e-6) # 扩展1%或最小值
|
||
vmin = center - expansion
|
||
vmax = center + expansion
|
||
|
||
print(f"颜色映射范围: {vmin:.6f} - {vmax:.6f}")
|
||
|
||
# 方法1:尝试使用contourf
|
||
try:
|
||
print("尝试使用contourf绘制...")
|
||
# 使用掩膜数组:边界外的数据被掩膜掉,只显示边界内
|
||
# mask已经在前面创建好了
|
||
masked_data = np.ma.masked_where(~mask, grid_content)
|
||
|
||
# 创建更多等级数以获得更平滑的颜色过渡
|
||
levels = np.linspace(vmin, vmax, 100) # 创建100个等级以获得平滑效果
|
||
im = ax.contourf(grid_xx, grid_yy, masked_data,
|
||
levels=levels, cmap=cmap, alpha=0.9,
|
||
vmin=vmin, vmax=vmax, extend='both')
|
||
print("contourf绘制成功")
|
||
|
||
# 可选择性添加等值线(默认不添加,以保持平滑效果)
|
||
# 如果需要等值线,可以取消注释下面的代码
|
||
# try:
|
||
# contour_levels = np.linspace(vmin, vmax, 11)
|
||
# contours = ax.contour(grid_xx, grid_yy, grid_content,
|
||
# levels=contour_levels, colors='white',
|
||
# alpha=0.3, linewidths=0.5)
|
||
# ax.clabel(contours, inline=True, fontsize=8, fmt='%.3f')
|
||
# print("等值线添加成功")
|
||
# except Exception as e:
|
||
# print(f"等值线绘制失败: {e}")
|
||
|
||
except Exception as e:
|
||
print(f"contourf失败: {e}")
|
||
# 方法2:使用pcolormesh
|
||
try:
|
||
print("尝试使用pcolormesh绘制...")
|
||
# 使用掩膜数组:边界外的数据被掩膜掉,只显示边界内
|
||
# mask已经在前面创建好了
|
||
masked_data = np.ma.masked_where(~mask, grid_content)
|
||
|
||
im = ax.pcolormesh(grid_xx, grid_yy, masked_data,
|
||
cmap=cmap, alpha=0.9,
|
||
vmin=vmin, vmax=vmax, shading='gouraud') # 使用gouraud平滑着色
|
||
print("pcolormesh绘制成功")
|
||
except Exception as e2:
|
||
print(f"pcolormesh也失败: {e2}")
|
||
raise e2
|
||
|
||
else:
|
||
# 所有值相同的情况
|
||
print("使用单一颜色填充(所有值相同)")
|
||
# 创建一个简单的填充
|
||
single_value = valid_values[0]
|
||
im = ax.contourf(grid_xx, grid_yy, grid_content,
|
||
levels=[single_value - 0.001, single_value + 0.001],
|
||
cmap=cmap, alpha=0.8)
|
||
|
||
except Exception as e:
|
||
print(f"主要绘图方法失败,尝试备选方案: {e}")
|
||
|
||
# 备选方案1:imshow
|
||
try:
|
||
print("尝试使用imshow...")
|
||
# 处理NaN值
|
||
display_data = grid_content.copy()
|
||
nan_mask = np.isnan(display_data)
|
||
if np.any(nan_mask):
|
||
# 用平均值填充NaN
|
||
display_data[nan_mask] = valid_values.mean()
|
||
|
||
if data_range > 0:
|
||
vmin = valid_values.min()
|
||
vmax = valid_values.max()
|
||
im = ax.imshow(display_data,
|
||
extent=[grid_xx.min(), grid_xx.max(),
|
||
grid_yy.min(), grid_yy.max()],
|
||
cmap=cmap, alpha=0.8, origin='lower',
|
||
vmin=vmin, vmax=vmax, aspect='auto')
|
||
else:
|
||
im = ax.imshow(display_data,
|
||
extent=[grid_xx.min(), grid_xx.max(),
|
||
grid_yy.min(), grid_yy.max()],
|
||
cmap=cmap, alpha=0.8, origin='lower',
|
||
aspect='auto')
|
||
print("imshow绘制成功")
|
||
|
||
except Exception as e2:
|
||
print(f"imshow也失败: {e2}")
|
||
|
||
# 备选方案2:散点图
|
||
try:
|
||
print("尝试使用散点图...")
|
||
valid_x = grid_xx[valid_data]
|
||
valid_y = grid_yy[valid_data]
|
||
valid_z = grid_content[valid_data]
|
||
|
||
if data_range > 0:
|
||
im = ax.scatter(valid_x, valid_y, c=valid_z,
|
||
cmap=cmap, alpha=0.8, s=10,
|
||
vmin=valid_values.min(), vmax=valid_values.max())
|
||
else:
|
||
im = ax.scatter(valid_x, valid_y, c=valid_z,
|
||
cmap=cmap, alpha=0.8, s=10)
|
||
print("散点图绘制成功")
|
||
|
||
except Exception as e3:
|
||
print(f"所有绘图方法都失败: {e3}")
|
||
raise ValueError("无法生成颜色图,请检查数据")
|
||
|
||
# 绘制边界(黑色)
|
||
try:
|
||
boundary_gdf.boundary.plot(ax=ax, color='black', linewidth=2, alpha=1.0)
|
||
print("边界绘制成功(黑色)")
|
||
except Exception as e:
|
||
print(f"边界绘制失败: {e}")
|
||
|
||
# 绘制不确定性叠加层(半透明)- 已禁用
|
||
# if show_uncertainty and grid_uncertainty is not None:
|
||
# try:
|
||
# print("正在添加不确定性叠加层...")
|
||
# # 创建不确定性掩膜(只显示高于阈值的不确定性区域)
|
||
# uncertainty_mask = (grid_uncertainty >= uncertainty_threshold) & (~np.isnan(grid_uncertainty))
|
||
#
|
||
# if np.any(uncertainty_mask):
|
||
# # 创建不确定性显示数据(只显示高不确定性区域)
|
||
# uncertainty_display = grid_uncertainty.copy()
|
||
# uncertainty_display[~uncertainty_mask] = np.nan
|
||
#
|
||
# # 使用contourf或pcolormesh绘制不确定性叠加层
|
||
# try:
|
||
# # 使用pcolormesh绘制半透明的不确定性层
|
||
# uncertainty_im = ax.pcolormesh(
|
||
# grid_xx, grid_yy, uncertainty_display,
|
||
# cmap=uncertainty_cmap, alpha=uncertainty_alpha,
|
||
# vmin=uncertainty_threshold, vmax=1.0,
|
||
# shading='gouraud', zorder=10
|
||
# )
|
||
# print(f"不确定性叠加层添加成功(阈值={uncertainty_threshold},透明度={uncertainty_alpha})")
|
||
# except Exception as e:
|
||
# print(f"不确定性叠加层绘制失败: {e}")
|
||
# # 备选方案:使用contourf
|
||
# try:
|
||
# uncertainty_levels = np.linspace(uncertainty_threshold, 1.0, 10)
|
||
# uncertainty_im = ax.contourf(
|
||
# grid_xx, grid_yy, uncertainty_display,
|
||
# levels=uncertainty_levels,
|
||
# cmap=uncertainty_cmap, alpha=uncertainty_alpha,
|
||
# zorder=10
|
||
# )
|
||
# print("不确定性叠加层添加成功(使用contourf)")
|
||
# except Exception as e2:
|
||
# print(f"不确定性叠加层绘制失败: {e2}")
|
||
# else:
|
||
# print(f"警告:没有高于阈值{uncertainty_threshold}的不确定性区域")
|
||
#
|
||
# except Exception as e:
|
||
# print(f"不确定性叠加层添加失败: {e}")
|
||
|
||
# 可选择性绘制采样点(默认不绘制,以显示平滑的颜色分布)
|
||
if show_sample_points:
|
||
try:
|
||
points_gdf.plot(ax=ax, color='black', markersize=6, alpha=0.7,
|
||
marker='+', edgecolors='white', linewidth=1)
|
||
print("采样点绘制成功")
|
||
except Exception as e:
|
||
print(f"采样点绘制失败: {e}")
|
||
|
||
# 设置坐标轴标签和格式
|
||
# 由于输入是投影坐标系,输出是地理坐标系,始终显示为地理坐标
|
||
ax.set_xlabel('经度 (°)', fontsize=12)
|
||
ax.set_ylabel('纬度 (°)', fontsize=12)
|
||
|
||
# 格式化坐标轴刻度为经纬度格式(保留3位小数)
|
||
def lon_formatter(x, p):
|
||
return f'{x:.3f}°'
|
||
|
||
def lat_formatter(x, p):
|
||
return f'{x:.3f}°'
|
||
|
||
ax.xaxis.set_major_formatter(FuncFormatter(lon_formatter))
|
||
ax.yaxis.set_major_formatter(FuncFormatter(lat_formatter))
|
||
|
||
# 添加格网线
|
||
ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.5, color='gray')
|
||
ax.set_axisbelow(True) # 将格网线放在图层下方
|
||
|
||
# ax.set_title('含量分布图', fontsize=16, fontweight='bold', pad=20) # 已去除标题
|
||
|
||
# 添加颜色条
|
||
try:
|
||
if im is not None:
|
||
cbar = plt.colorbar(im, ax=ax, shrink=0.5, aspect=40, pad=0.02)
|
||
cbar.set_label('含量值', fontsize=10)
|
||
|
||
# 设置颜色条的刻度
|
||
if data_range > 0:
|
||
tick_values = np.linspace(valid_values.min(), valid_values.max(), 6)
|
||
cbar.set_ticks(tick_values)
|
||
cbar.set_ticklabels([f'{val:.3f}' for val in tick_values])
|
||
cbar.ax.tick_params(labelsize=8) # 缩小刻度标签字体
|
||
|
||
print("颜色条添加成功")
|
||
else:
|
||
print("警告:无法添加颜色条,im对象为None")
|
||
except Exception as e:
|
||
print(f"颜色条添加失败: {e}")
|
||
|
||
# 添加指北针
|
||
try:
|
||
self.add_north_arrow(ax, bounds)
|
||
except Exception as e:
|
||
print(f"指北针添加失败: {e}")
|
||
|
||
# 添加比例尺
|
||
try:
|
||
self.add_scale_bar(ax)
|
||
except Exception as e:
|
||
print(f"比例尺添加失败: {e}")
|
||
|
||
# 添加图例
|
||
try:
|
||
self.add_legend(ax, show_uncertainty=False) # 已禁用不确定性显示
|
||
except Exception as e:
|
||
print(f"图例添加失败: {e}")
|
||
|
||
# 设置图形边界(进一步外扩1%以确保边界不完全挨着地图)
|
||
try:
|
||
x_range = bounds[2] - bounds[0]
|
||
y_range = bounds[3] - bounds[1]
|
||
display_expand = 0.01 # 显示时再外扩1%
|
||
ax.set_xlim(bounds[0] - x_range * display_expand, bounds[2] + x_range * display_expand)
|
||
ax.set_ylim(bounds[1] - y_range * display_expand, bounds[3] + y_range * display_expand)
|
||
except Exception as e:
|
||
print(f"设置图形边界失败: {e}")
|
||
|
||
# 调整布局
|
||
plt.tight_layout()
|
||
|
||
# 保存图片
|
||
try:
|
||
plt.savefig(output_file, dpi=300, bbox_inches='tight',
|
||
facecolor='white', edgecolor='none')
|
||
print(f"含量图已保存为:{output_file}")
|
||
except Exception as e:
|
||
print(f"图片保存失败: {e}")
|
||
|
||
# 显示图片
|
||
try:
|
||
plt.show()
|
||
except Exception as e:
|
||
print(f"图片显示失败: {e}")
|
||
|
||
def add_north_arrow(self, ax, bounds):
|
||
"""添加指北针(左上角)- 复杂罗盘样式"""
|
||
minx, miny, maxx, maxy = bounds
|
||
|
||
# 计算指北针位置(左上角)
|
||
arrow_x = minx + (maxx - minx) * 0.1
|
||
arrow_y = maxy - (maxy - miny) * 0.1
|
||
|
||
# 缩小指北针尺寸
|
||
size_factor = (maxy - miny) * 0.04 # 缩小尺寸
|
||
radius = size_factor * 1.0 # 罗盘半径
|
||
|
||
# 绘制圆形背景(外圈)
|
||
circle_outer = patches.Circle(
|
||
(arrow_x, arrow_y),
|
||
radius=radius,
|
||
facecolor='white',
|
||
edgecolor='black',
|
||
linewidth=2.5,
|
||
zorder=10
|
||
)
|
||
ax.add_patch(circle_outer)
|
||
|
||
# 绘制内圈(装饰)
|
||
circle_inner = patches.Circle(
|
||
(arrow_x, arrow_y),
|
||
radius=radius * 0.7,
|
||
facecolor='none',
|
||
edgecolor='gray',
|
||
linewidth=1.5,
|
||
linestyle='--',
|
||
zorder=11
|
||
)
|
||
ax.add_patch(circle_inner)
|
||
|
||
# 绘制四个方向的刻度线
|
||
tick_length = radius * 0.3
|
||
tick_width = 1.5
|
||
|
||
# 北方向刻度(主刻度)
|
||
ax.plot([arrow_x, arrow_x], [arrow_y, arrow_y + radius * 0.85],
|
||
'k-', linewidth=tick_width * 2, zorder=12)
|
||
|
||
# 南方向刻度
|
||
ax.plot([arrow_x, arrow_x], [arrow_y, arrow_y - radius * 0.85],
|
||
'k-', linewidth=tick_width, zorder=12)
|
||
|
||
# 东方向刻度
|
||
ax.plot([arrow_x, arrow_x + radius * 0.85], [arrow_y, arrow_y],
|
||
'k-', linewidth=tick_width, zorder=12)
|
||
|
||
# 西方向刻度
|
||
ax.plot([arrow_x, arrow_x - radius * 0.85], [arrow_y, arrow_y],
|
||
'k-', linewidth=tick_width, zorder=12)
|
||
|
||
# 绘制次要刻度(45度方向)
|
||
for angle in [45, 135, 225, 315]:
|
||
angle_rad = math.radians(angle)
|
||
x_end = arrow_x + radius * 0.7 * math.cos(angle_rad)
|
||
y_end = arrow_y + radius * 0.7 * math.sin(angle_rad)
|
||
ax.plot([arrow_x, x_end], [arrow_y, y_end],
|
||
'k-', linewidth=tick_width * 0.5, alpha=0.6, zorder=12)
|
||
|
||
# 绘制指北箭头(三角形,填充)
|
||
arrow_size = radius * 0.6
|
||
arrow_points = np.array([
|
||
[arrow_x, arrow_y + radius * 0.9], # 顶点(北)
|
||
[arrow_x - arrow_size * 0.3, arrow_y + radius * 0.3], # 左下
|
||
[arrow_x + arrow_size * 0.3, arrow_y + radius * 0.3] # 右下
|
||
])
|
||
arrow_poly = patches.Polygon(
|
||
arrow_points,
|
||
facecolor='black',
|
||
edgecolor='black',
|
||
linewidth=2,
|
||
zorder=13
|
||
)
|
||
ax.add_patch(arrow_poly)
|
||
|
||
# 绘制指南箭头(三角形,填充,但较小)
|
||
south_arrow_size = radius * 0.4
|
||
south_arrow_points = np.array([
|
||
[arrow_x, arrow_y - radius * 0.6], # 顶点(南)
|
||
[arrow_x - south_arrow_size * 0.2, arrow_y - radius * 0.2], # 左上
|
||
[arrow_x + south_arrow_size * 0.2, arrow_y - radius * 0.2] # 右上
|
||
])
|
||
south_arrow_poly = patches.Polygon(
|
||
south_arrow_points,
|
||
facecolor='white',
|
||
edgecolor='black',
|
||
linewidth=1.5,
|
||
zorder=13
|
||
)
|
||
ax.add_patch(south_arrow_poly)
|
||
|
||
# 添加方向标记(N, S, E, W)
|
||
label_offset = radius * 1.15
|
||
font_size = 16 * 0.5 # 缩小字体到原来的一半
|
||
|
||
ax.text(arrow_x, arrow_y + label_offset, 'N',
|
||
fontsize=font_size, fontweight='bold', ha='center', va='bottom',
|
||
color='black', zorder=14)
|
||
|
||
ax.text(arrow_x, arrow_y - label_offset, 'S',
|
||
fontsize=font_size * 0.8, fontweight='bold', ha='center', va='top',
|
||
color='black', zorder=14)
|
||
|
||
ax.text(arrow_x + label_offset, arrow_y, 'E',
|
||
fontsize=font_size * 0.8, fontweight='bold', ha='left', va='center',
|
||
color='black', zorder=14)
|
||
|
||
ax.text(arrow_x - label_offset, arrow_y, 'W',
|
||
fontsize=font_size * 0.8, fontweight='bold', ha='right', va='center',
|
||
color='black', zorder=14)
|
||
|
||
def add_scale_bar(self, ax):
|
||
"""添加比例尺"""
|
||
try:
|
||
if self.output_crs == 'EPSG:4326':
|
||
# 地理坐标系,需要指定度数与距离的换算关系
|
||
# 在地球表面,1度约等于111公里(在赤道附近)
|
||
# 使用deg作为单位,matplotlib-scalebar会自动处理
|
||
scalebar = ScaleBar(
|
||
111000, # 1度 = 111000米
|
||
units='m',
|
||
location='lower left',
|
||
box_alpha=0.8,
|
||
color='black',
|
||
font_properties={'size': 10},
|
||
label_loc='bottom'
|
||
)
|
||
ax.add_artist(scalebar)
|
||
print("地理坐标系比例尺添加成功")
|
||
else:
|
||
# 投影坐标系,使用米作为单位
|
||
scalebar = ScaleBar(1, units='m', location='lower left',
|
||
box_alpha=0.8, color='black',
|
||
font_properties={'size': 10})
|
||
ax.add_artist(scalebar)
|
||
print("投影坐标系比例尺添加成功")
|
||
except Exception as e:
|
||
print(f"比例尺添加失败: {e}")
|
||
# 如果matplotlib-scalebar失败,尝试手动添加简单的比例尺
|
||
try:
|
||
self._add_manual_scale_bar(ax)
|
||
print("手动比例尺添加成功")
|
||
except Exception as e2:
|
||
print(f"手动比例尺也失败: {e2}")
|
||
|
||
def _add_manual_scale_bar(self, ax):
|
||
"""手动添加简单的比例尺"""
|
||
# 获取当前坐标轴的范围
|
||
xlim = ax.get_xlim()
|
||
ylim = ax.get_ylim()
|
||
|
||
# 计算比例尺的位置和长度
|
||
x_range = xlim[1] - xlim[0]
|
||
y_range = ylim[1] - ylim[0]
|
||
|
||
# 比例尺位置(左下角)
|
||
scale_x = xlim[0] + x_range * 0.05
|
||
scale_y = ylim[0] + y_range * 0.1
|
||
|
||
if self.output_crs == 'EPSG:4326':
|
||
# 地理坐标系:计算合适的比例尺长度(度)
|
||
# 选择一个合理的距离,比如1公里、5公里或10公里
|
||
distance_km = 5 # 5公里
|
||
scale_length_deg = distance_km / 111.0 # 转换为度数
|
||
|
||
# 绘制比例尺线
|
||
ax.plot([scale_x, scale_x + scale_length_deg], [scale_y, scale_y],
|
||
'k-', linewidth=3)
|
||
ax.plot([scale_x, scale_x], [scale_y - y_range * 0.01, scale_y + y_range * 0.01],
|
||
'k-', linewidth=2)
|
||
ax.plot([scale_x + scale_length_deg, scale_x + scale_length_deg],
|
||
[scale_y - y_range * 0.01, scale_y + y_range * 0.01], 'k-', linewidth=2)
|
||
|
||
# 添加文字标注
|
||
ax.text(scale_x + scale_length_deg / 2, scale_y + y_range * 0.02,
|
||
f'{distance_km} km', ha='center', va='bottom', fontsize=10,
|
||
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8))
|
||
else:
|
||
# 投影坐标系:使用米为单位
|
||
# 选择合适的比例尺长度
|
||
if x_range > 10000: # 大于10km
|
||
scale_length = 5000 # 5km
|
||
scale_text = '5 km'
|
||
elif x_range > 2000: # 大于2km
|
||
scale_length = 1000 # 1km
|
||
scale_text = '1 km'
|
||
else: # 小于2km
|
||
scale_length = 500 # 500m
|
||
scale_text = '500 m'
|
||
|
||
# 绘制比例尺线
|
||
ax.plot([scale_x, scale_x + scale_length], [scale_y, scale_y],
|
||
'k-', linewidth=3)
|
||
ax.plot([scale_x, scale_x], [scale_y - y_range * 0.01, scale_y + y_range * 0.01],
|
||
'k-', linewidth=2)
|
||
ax.plot([scale_x + scale_length, scale_x + scale_length],
|
||
[scale_y - y_range * 0.01, scale_y + y_range * 0.01], 'k-', linewidth=2)
|
||
|
||
# 添加文字标注
|
||
ax.text(scale_x + scale_length / 2, scale_y + y_range * 0.02,
|
||
scale_text, ha='center', va='bottom', fontsize=10,
|
||
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8))
|
||
|
||
def _add_base_map(self, ax, base_map_tif, bounds, mask, grid_xx, grid_yy, boundary_gdf):
|
||
"""添加正射底图(在水域掩膜外显示)
|
||
|
||
Parameters:
|
||
-----------
|
||
ax : matplotlib.axes.Axes
|
||
绘图轴对象
|
||
base_map_tif : str
|
||
TIF底图文件路径
|
||
bounds : np.ndarray
|
||
显示范围 [minx, miny, maxx, maxy]
|
||
mask : np.ndarray
|
||
水域掩膜(True表示水域内)
|
||
grid_xx : np.ndarray
|
||
网格X坐标
|
||
grid_yy : np.ndarray
|
||
网格Y坐标
|
||
boundary_gdf : gpd.GeoDataFrame
|
||
边界几何数据
|
||
"""
|
||
import rasterio
|
||
import rasterio.windows
|
||
from rasterio.warp import calculate_default_transform, reproject, Resampling
|
||
|
||
print("正在读取底图文件...")
|
||
with rasterio.open(base_map_tif) as src:
|
||
# 获取底图的坐标系
|
||
tif_crs = src.crs
|
||
tif_bounds = src.bounds
|
||
|
||
print(f"底图坐标系: {tif_crs}")
|
||
print(f"底图范围: {tif_bounds}")
|
||
print(f"目标范围: {bounds}")
|
||
|
||
# 检查是否需要投影转换
|
||
target_crs = CRS.from_string(self.output_crs)
|
||
need_reproject = tif_crs != target_crs
|
||
|
||
# 读取底图数据
|
||
if need_reproject:
|
||
print(f"底图坐标系({tif_crs})与目标坐标系({target_crs})不同,正在转换...")
|
||
# 计算转换后的变换参数和尺寸
|
||
transform, width, height = calculate_default_transform(
|
||
tif_crs, target_crs,
|
||
src.width, src.height,
|
||
left=bounds[0], bottom=bounds[1],
|
||
right=bounds[2], top=bounds[3]
|
||
)
|
||
|
||
# 创建目标数组
|
||
if src.count == 1:
|
||
# 单波段
|
||
base_map_data = np.zeros((height, width), dtype=src.dtypes[0])
|
||
reproject(
|
||
source=rasterio.band(src, 1),
|
||
destination=base_map_data,
|
||
src_transform=src.transform,
|
||
src_crs=tif_crs,
|
||
dst_transform=transform,
|
||
dst_crs=target_crs,
|
||
resampling=Resampling.bilinear
|
||
)
|
||
else:
|
||
# 多波段(RGB),取前3个波段
|
||
num_bands = min(3, src.count)
|
||
base_map_data = np.zeros((num_bands, height, width), dtype=src.dtypes[0])
|
||
for i in range(num_bands):
|
||
reproject(
|
||
source=rasterio.band(src, i + 1),
|
||
destination=base_map_data[i],
|
||
src_transform=src.transform,
|
||
src_crs=tif_crs,
|
||
dst_transform=transform,
|
||
dst_crs=target_crs,
|
||
resampling=Resampling.bilinear
|
||
)
|
||
|
||
# 如果是RGB,转换为(height, width, 3)格式
|
||
if num_bands == 3:
|
||
base_map_data = np.transpose(base_map_data, (1, 2, 0))
|
||
|
||
# 创建extent用于显示
|
||
extent = [bounds[0], bounds[2], bounds[1], bounds[3]]
|
||
|
||
else:
|
||
# 不需要投影转换,直接读取对应范围的数据
|
||
print("底图坐标系与目标坐标系一致,直接读取...")
|
||
|
||
# 计算需要读取的窗口
|
||
row_min, col_min = src.index(bounds[0], bounds[3]) # 左上角
|
||
row_max, col_max = src.index(bounds[2], bounds[1]) # 右下角
|
||
|
||
# 确保索引在有效范围内
|
||
row_min = max(0, row_min)
|
||
row_max = min(src.height, row_max + 1)
|
||
col_min = max(0, col_min)
|
||
col_max = min(src.width, col_max + 1)
|
||
|
||
window = rasterio.windows.Window.from_slices(
|
||
(row_min, row_max), (col_min, col_max)
|
||
)
|
||
|
||
# 读取数据
|
||
if src.count == 1:
|
||
base_map_data = src.read(1, window=window)
|
||
else:
|
||
# 多波段,取前3个波段
|
||
num_bands = min(3, src.count)
|
||
base_map_data = src.read(list(range(1, num_bands + 1)), window=window)
|
||
if num_bands == 3:
|
||
# 转换为(height, width, 3)格式
|
||
base_map_data = np.transpose(base_map_data, (1, 2, 0))
|
||
|
||
# 计算extent
|
||
window_transform = rasterio.windows.transform(window, src.transform)
|
||
left = window_transform[2]
|
||
top = window_transform[5]
|
||
right = left + window_transform[0] * base_map_data.shape[1]
|
||
bottom = top + window_transform[4] * base_map_data.shape[0]
|
||
|
||
# 确保extent不超过bounds
|
||
extent = [
|
||
max(bounds[0], left),
|
||
min(bounds[2], right),
|
||
max(bounds[1], bottom),
|
||
min(bounds[3], top)
|
||
]
|
||
|
||
# 将底图数据缩放到网格大小以便显示
|
||
# 创建底图的显示掩膜:只在边界外显示
|
||
print("正在创建底图显示掩膜...")
|
||
|
||
# 创建底图网格(与显示范围对齐)
|
||
base_map_height, base_map_width = base_map_data.shape[:2]
|
||
|
||
# 性能优化:如果底图分辨率过高,进行降采样以提高处理速度
|
||
# 限制最大边长为2000像素(保持足够清晰度的同时提高速度)
|
||
max_display_size = 2000
|
||
scale_factor = 1.0
|
||
if max(base_map_height, base_map_width) > max_display_size:
|
||
scale_factor = max_display_size / max(base_map_height, base_map_width)
|
||
new_height = int(base_map_height * scale_factor)
|
||
new_width = int(base_map_width * scale_factor)
|
||
print(
|
||
f"底图分辨率较高 ({base_map_width}x{base_map_height}),降采样到 {new_width}x{new_height} 以提高速度")
|
||
# 使用scipy的zoom进行降采样
|
||
if base_map_data.ndim == 2:
|
||
base_map_data = ndimage.zoom(base_map_data, scale_factor, order=1)
|
||
else:
|
||
base_map_data = ndimage.zoom(base_map_data, (scale_factor, scale_factor, 1), order=1)
|
||
base_map_height, base_map_width = base_map_data.shape[:2]
|
||
# 更新extent以匹配新的分辨率
|
||
extent_width = extent[1] - extent[0]
|
||
extent_height = extent[3] - extent[2]
|
||
extent = [
|
||
extent[0],
|
||
extent[0] + extent_width,
|
||
extent[2],
|
||
extent[2] + extent_height
|
||
]
|
||
|
||
# 使用rasterio的geometry_mask快速生成掩膜(比创建大量Point对象快得多)
|
||
# 创建底图的变换矩阵
|
||
if need_reproject:
|
||
# 如果进行了投影转换,使用计算得到的transform
|
||
base_map_transform = transform
|
||
else:
|
||
# 如果没有投影转换,使用窗口变换
|
||
base_map_transform = window_transform
|
||
|
||
# 如果进行了降采样,需要调整transform
|
||
if scale_factor < 1.0:
|
||
# 调整transform以适应新的分辨率
|
||
# rasterio的transform是6元素tuple或Affine对象,需要调整像素大小
|
||
# 获取transform的6个参数 (a, b, c, d, e, f)
|
||
# 其中a和e是像素大小,需要除以scale_factor
|
||
try:
|
||
from affine import Affine
|
||
except ImportError:
|
||
# 如果affine包不可用,尝试从rasterio导入
|
||
try:
|
||
from rasterio.transform import Affine
|
||
except ImportError:
|
||
# 如果都不可用,使用tuple方式
|
||
Affine = None
|
||
|
||
if Affine is not None:
|
||
# 获取6个参数
|
||
if hasattr(base_map_transform, '__iter__') and len(base_map_transform) == 6:
|
||
a, b, c, d, e, f = base_map_transform
|
||
else:
|
||
a, b, c, d, e, f = base_map_transform[0], base_map_transform[1], base_map_transform[2], \
|
||
base_map_transform[3], base_map_transform[4], base_map_transform[5]
|
||
# 创建新的transform,调整像素大小(a和e是像素大小)
|
||
base_map_transform = Affine(a / scale_factor, b, c, d, e / scale_factor, f)
|
||
else:
|
||
# 降级方案:使用tuple
|
||
if hasattr(base_map_transform, '__iter__') and len(base_map_transform) == 6:
|
||
a, b, c, d, e, f = base_map_transform
|
||
else:
|
||
a, b, c, d, e, f = base_map_transform[0], base_map_transform[1], base_map_transform[2], \
|
||
base_map_transform[3], base_map_transform[4], base_map_transform[5]
|
||
base_map_transform = (a / scale_factor, b, c, d, e / scale_factor, f)
|
||
|
||
# 调试信息:检查边界数据和底图范围
|
||
print(f"底图显示范围 (extent): {extent}")
|
||
print(f"底图分辨率: {base_map_width}x{base_map_height}")
|
||
print(f"底图transform: {base_map_transform}")
|
||
if boundary_gdf is not None and len(boundary_gdf) > 0:
|
||
boundary_bounds = boundary_gdf.total_bounds
|
||
print(f"边界数据范围: {boundary_bounds}")
|
||
print(f"边界数据坐标系: {boundary_gdf.crs}")
|
||
print(f"边界要素数量: {len(boundary_gdf)}")
|
||
|
||
# 检查边界是否与底图范围重叠
|
||
overlap_x = not (boundary_bounds[2] < extent[0] or boundary_bounds[0] > extent[1])
|
||
overlap_y = not (boundary_bounds[3] < extent[2] or boundary_bounds[1] > extent[3])
|
||
if not (overlap_x and overlap_y):
|
||
print("警告: 边界数据范围与底图显示范围不重叠!")
|
||
print(" 将不应用掩膜,显示整个底图")
|
||
# 创建全True的掩膜(显示所有区域)
|
||
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
|
||
else:
|
||
# 使用geometry_mask生成掩膜(True表示在几何体内,即水域内)
|
||
# 注意:geometry_mask返回True表示需要掩膜的区域(在几何体内),
|
||
# 但我们想要的是边界外的区域(不在几何体内),所以需要反转
|
||
try:
|
||
within_boundary_mask = geometry_mask(
|
||
boundary_gdf.geometry,
|
||
out_shape=(base_map_height, base_map_width),
|
||
transform=base_map_transform,
|
||
invert=False # False表示掩膜几何体内的区域(水域内)
|
||
)
|
||
# 反转掩膜:True表示边界外(需要显示的区域)
|
||
base_map_mask = ~within_boundary_mask
|
||
except Exception as e:
|
||
print(f"生成掩膜时出错: {e}")
|
||
print(" 将不应用掩膜,显示整个底图")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# 创建全True的掩膜(显示所有区域)
|
||
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
|
||
else:
|
||
print("警告: 边界数据为空,将不应用掩膜,显示整个底图")
|
||
# 创建全True的掩膜(显示所有区域)
|
||
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
|
||
|
||
# 调试信息:检查掩膜状态
|
||
mask_ratio = np.sum(base_map_mask) / base_map_mask.size
|
||
print(
|
||
f"底图掩膜状态: 可显示区域占比 {mask_ratio * 100:.2f}% ({np.sum(base_map_mask)}/{base_map_mask.size} 像素)")
|
||
|
||
# 如果掩膜后没有可显示区域,警告并显示整个底图
|
||
if mask_ratio == 0.0:
|
||
print("警告: 掩膜后没有可显示区域,将显示整个底图(不应用掩膜)")
|
||
base_map_mask = np.ones((base_map_height, base_map_width), dtype=bool)
|
||
|
||
# 归一化数据以便显示(如果是数值型)
|
||
# 注意:先归一化整个数据,再应用掩膜,这样可以保证归一化范围正确
|
||
if base_map_data.dtype != np.uint8:
|
||
if base_map_data.ndim == 2:
|
||
# 单波段:归一化到0-1
|
||
# 使用整个数据集的范围进行归一化(不仅仅是掩膜区域)
|
||
data_min = np.nanmin(base_map_data)
|
||
data_max = np.nanmax(base_map_data)
|
||
print(f"底图数据范围: [{data_min}, {data_max}], dtype: {base_map_data.dtype}")
|
||
|
||
if data_max > data_min:
|
||
# 先归一化整个数组
|
||
base_map_normalized = (base_map_data - data_min) / (data_max - data_min)
|
||
# 然后应用掩膜:只显示边界外的区域
|
||
base_map_display = np.ma.masked_where(~base_map_mask, base_map_normalized)
|
||
else:
|
||
# 如果数据范围无效,创建全0的掩膜数组
|
||
print("警告: 底图数据范围无效,所有值相同")
|
||
base_map_display = np.ma.masked_where(~base_map_mask, np.zeros_like(base_map_data))
|
||
else:
|
||
# RGB:每个波段单独归一化
|
||
base_map_normalized = base_map_data.copy().astype(np.float32)
|
||
for i in range(base_map_data.shape[2]):
|
||
band_data = base_map_data[:, :, i]
|
||
data_min = np.nanmin(band_data)
|
||
data_max = np.nanmax(band_data)
|
||
print(f"底图波段 {i} 数据范围: [{data_min}, {data_max}]")
|
||
|
||
if data_max > data_min:
|
||
# 归一化整个波段
|
||
base_map_normalized[:, :, i] = (band_data - data_min) / (data_max - data_min)
|
||
else:
|
||
print(f"警告: 底图波段 {i} 数据范围无效,所有值相同")
|
||
base_map_normalized[:, :, i] = np.zeros_like(band_data)
|
||
|
||
# 应用掩膜:只显示边界外的区域
|
||
mask_3d = np.broadcast_to(~base_map_mask[..., np.newaxis], base_map_data.shape)
|
||
base_map_display = np.ma.masked_where(mask_3d, base_map_normalized)
|
||
else:
|
||
# uint8类型:直接使用,但可能需要归一化到0-1用于imshow
|
||
if base_map_data.ndim == 2:
|
||
# 单波段:uint8通常已经是0-255范围,归一化到0-1
|
||
base_map_normalized = base_map_data.astype(np.float32) / 255.0
|
||
base_map_display = np.ma.masked_where(~base_map_mask, base_map_normalized)
|
||
else:
|
||
# RGB:uint8归一化到0-1
|
||
base_map_normalized = base_map_data.astype(np.float32) / 255.0
|
||
mask_3d = np.broadcast_to(~base_map_mask[..., np.newaxis], base_map_data.shape)
|
||
base_map_display = np.ma.masked_where(mask_3d, base_map_normalized)
|
||
|
||
# 检查归一化后的数据范围
|
||
if isinstance(base_map_display, np.ma.MaskedArray):
|
||
valid_data = base_map_display[~base_map_display.mask]
|
||
if len(valid_data) > 0:
|
||
print(
|
||
f"归一化后有效数据范围: [{np.nanmin(valid_data):.3f}, {np.nanmax(valid_data):.3f}], 有效像素数: {len(valid_data)}")
|
||
else:
|
||
print("警告: 归一化后没有有效数据显示区域")
|
||
|
||
# 绘制底图
|
||
print("正在绘制底图...")
|
||
# 注意:extent格式为 [left, right, bottom, top]
|
||
# 对于地理坐标系,y轴通常向上为正,所以使用origin='lower'
|
||
try:
|
||
if base_map_data.ndim == 2:
|
||
# 单波段:使用灰度图
|
||
# 确保数据在0-1范围内
|
||
if isinstance(base_map_display, np.ma.MaskedArray):
|
||
# 对于masked array,确保数据范围正确
|
||
if np.ma.max(base_map_display) > 1.0 or np.ma.min(base_map_display) < 0.0:
|
||
base_map_display = np.ma.clip(base_map_display, 0.0, 1.0)
|
||
else:
|
||
base_map_display = np.clip(base_map_display, 0.0, 1.0)
|
||
|
||
im = ax.imshow(base_map_display, extent=extent, origin='lower',
|
||
cmap='gray', alpha=0.8, zorder=0, interpolation='bilinear',
|
||
vmin=0.0, vmax=1.0)
|
||
else:
|
||
# RGB:直接显示
|
||
# 确保数据格式正确(需要在0-1范围内)
|
||
if isinstance(base_map_display, np.ma.MaskedArray):
|
||
# 对于masked array,确保数据在0-1范围内
|
||
if np.ma.max(base_map_display) > 1.0 or np.ma.min(base_map_display) < 0.0:
|
||
base_map_display = np.ma.clip(base_map_display, 0.0, 1.0)
|
||
else:
|
||
base_map_display = np.clip(base_map_display, 0.0, 1.0)
|
||
|
||
# 确保是float32类型,imshow期望0-1范围的float数组
|
||
if base_map_display.dtype != np.float32 and base_map_display.dtype != np.float64:
|
||
base_map_display = base_map_display.astype(np.float32)
|
||
|
||
im = ax.imshow(base_map_display, extent=extent, origin='lower',
|
||
alpha=0.8, zorder=0, interpolation='bilinear')
|
||
print(f"底图绘制成功")
|
||
except Exception as e:
|
||
print(f"底图绘制出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# 如果绘制失败,至少尝试绘制一个简单的占位图
|
||
print("尝试使用备用方法绘制底图...")
|
||
try:
|
||
if base_map_data.ndim == 2:
|
||
# 使用简单的numpy数组,不应用掩膜
|
||
simple_display = np.clip(base_map_data.astype(np.float32) / np.nanmax(base_map_data), 0, 1)
|
||
ax.imshow(simple_display, extent=extent, origin='lower',
|
||
cmap='gray', alpha=0.5, zorder=0)
|
||
else:
|
||
simple_display = np.clip(base_map_data.astype(np.float32) / 255.0, 0, 1)
|
||
ax.imshow(simple_display, extent=extent, origin='lower',
|
||
alpha=0.5, zorder=0)
|
||
print("备用方法绘制成功")
|
||
except Exception as e2:
|
||
print(f"备用方法也失败: {e2}")
|
||
|
||
print(f"底图已绘制,显示范围: {extent}")
|
||
|
||
def add_legend(self, ax, show_uncertainty=False):
|
||
"""
|
||
添加图例
|
||
|
||
Parameters:
|
||
-----------
|
||
show_uncertainty : bool
|
||
是否显示不确定性图例项
|
||
"""
|
||
legend_elements = [
|
||
# 移除边界标签
|
||
# plt.Line2D([0], [0], color='red', linewidth=2, label='边界'),
|
||
# 移除采样点和等值线图例项,以突出平滑的颜色分布效果
|
||
# plt.Line2D([0], [0], marker='+', color='w', markerfacecolor='black',
|
||
# markersize=8, label='采样点'),
|
||
]
|
||
|
||
# 如果显示了不确定性,添加图例项(已禁用)
|
||
# if show_uncertainty:
|
||
# from matplotlib.patches import Patch
|
||
# legend_elements.append(
|
||
# Patch(facecolor='red', alpha=0.5, label='高不确定性区域')
|
||
# )
|
||
|
||
# 如果图例为空,则不显示图例
|
||
if legend_elements:
|
||
ax.legend(handles=legend_elements, loc='upper left',
|
||
framealpha=0.9, fontsize=10)
|
||
|
||
def process_data(self, csv_file, shp_file, output_file='content_map.png',
|
||
resolution=100, show_sample_points=False, base_map_tif=None,
|
||
uncertainty_col=None, uncertainty_method='auto',
|
||
calculate_uncertainty=True, show_uncertainty=True,
|
||
uncertainty_alpha=0.5, uncertainty_threshold=0.5,
|
||
use_distance_diffusion=True, max_diffusion_distance=None,
|
||
diffusion_power=2, diffusion_n_neighbors=15, cmap=None,
|
||
expand_ratio=0.05):
|
||
"""
|
||
主处理函数
|
||
|
||
Parameters:
|
||
-----------
|
||
base_map_tif : str, optional
|
||
TIF正射底图文件路径。如果提供,将在水域掩膜外显示底图
|
||
uncertainty_col : str, optional
|
||
不确定性数据列名。如果为None,将自动检测
|
||
uncertainty_method : str, default='auto'
|
||
不确定性计算方法:'auto', 'kriging', 'idw', 'rbf', 'model_variance'
|
||
calculate_uncertainty : bool, default=True
|
||
是否计算不确定性
|
||
show_uncertainty : bool, default=True
|
||
是否在图上显示不确定性叠加层
|
||
uncertainty_alpha : float, default=0.5
|
||
不确定性叠加层透明度(0-1)
|
||
uncertainty_threshold : float, default=0.5
|
||
不确定性显示阈值(0-1),只显示高于此阈值的不确定性区域
|
||
use_distance_diffusion : bool, default=True
|
||
是否使用距离扩散方法填充边界空白区域
|
||
max_diffusion_distance : float, optional
|
||
最大扩散距离(单位与坐标相同)。如果为None,自动计算为网格分辨率的5倍
|
||
diffusion_power : float, default=2
|
||
距离扩散的IDW幂参数,值越大,距离衰减越快
|
||
diffusion_n_neighbors : int, default=15
|
||
距离扩散使用的最近邻点数
|
||
cmap : str, optional
|
||
颜色映射。如果为None,将从CSV文件名或内容中自动识别参数并选择对应的colormap
|
||
expand_ratio : float, default=0.05
|
||
边界外扩比例(5%),确保图像边界不完全挨着地图
|
||
"""
|
||
try:
|
||
# 自动识别参数名称并获取colormap
|
||
if cmap is None:
|
||
param_name = self._extract_param_name(csv_file)
|
||
cmap = self._get_colormap(param_name)
|
||
else:
|
||
print(f"使用指定的颜色映射: {cmap}")
|
||
|
||
# 读取数据
|
||
points_gdf = self.read_csv_data(csv_file, uncertainty_col=uncertainty_col)
|
||
boundary_gdf = self.read_boundary_shapefile(shp_file)
|
||
|
||
# 对边缘采样点进行外扩处理(外扩到整个图像边界,按照resolution间距)
|
||
points_gdf = self._expand_edge_points(points_gdf, boundary_gdf, resolution=resolution, expand_ratio=expand_ratio)
|
||
|
||
# 创建插值网格(包含不确定性)
|
||
result = self.create_interpolation_grid(
|
||
points_gdf, boundary_gdf, resolution,
|
||
expand_ratio=expand_ratio,
|
||
uncertainty_method=uncertainty_method,
|
||
calculate_uncertainty=calculate_uncertainty,
|
||
use_distance_diffusion=use_distance_diffusion,
|
||
max_diffusion_distance=max_diffusion_distance,
|
||
diffusion_power=diffusion_power,
|
||
diffusion_n_neighbors=diffusion_n_neighbors
|
||
)
|
||
|
||
# 根据返回值解包
|
||
if len(result) == 5:
|
||
grid_xx, grid_yy, grid_content, bounds, grid_uncertainty = result
|
||
else:
|
||
grid_xx, grid_yy, grid_content, bounds = result
|
||
grid_uncertainty = None
|
||
|
||
# 生成含量图(包含不确定性叠加)
|
||
self.create_content_map(
|
||
points_gdf, boundary_gdf, grid_xx, grid_yy,
|
||
grid_content, bounds, output_file, show_sample_points, base_map_tif,
|
||
grid_uncertainty=grid_uncertainty,
|
||
show_uncertainty=show_uncertainty,
|
||
uncertainty_alpha=uncertainty_alpha,
|
||
uncertainty_threshold=uncertainty_threshold,
|
||
cmap=cmap
|
||
)
|
||
|
||
print("处理完成!")
|
||
|
||
# 输出统计信息
|
||
print(f"\n统计信息:")
|
||
print(f"数据点数量: {len(points_gdf)}")
|
||
print(f"含量值范围: {points_gdf['content'].min():.2f} - {points_gdf['content'].max():.2f}")
|
||
print(f"含量值平均: {points_gdf['content'].mean():.2f}")
|
||
print(f"含量值标准差: {points_gdf['content'].std():.2f}")
|
||
|
||
except Exception as e:
|
||
print(f"处理过程中出现错误: {str(e)}")
|
||
raise
|
||
|
||
def process_batch(self, csv_folder, shp_file, output_folder=None,
|
||
resolution=100, show_sample_points=False, base_map_tif=None,
|
||
uncertainty_col=None, uncertainty_method='auto',
|
||
calculate_uncertainty=True, show_uncertainty=True,
|
||
uncertainty_alpha=0.5, uncertainty_threshold=0.5,
|
||
use_distance_diffusion=True, max_diffusion_distance=None,
|
||
diffusion_power=2, diffusion_n_neighbors=15):
|
||
"""
|
||
批量处理文件夹中的CSV文件
|
||
|
||
Parameters:
|
||
-----------
|
||
csv_folder : str
|
||
包含CSV文件的文件夹路径
|
||
shp_file : str
|
||
边界shapefile文件路径
|
||
output_folder : str, optional
|
||
输出文件夹路径。如果为None,将在CSV文件所在文件夹创建'map_output'子文件夹
|
||
resolution : int, default=100
|
||
网格分辨率(米)
|
||
show_sample_points : bool, default=False
|
||
是否显示采样点
|
||
base_map_tif : str, optional
|
||
TIF正射底图文件路径
|
||
uncertainty_col : str, optional
|
||
不确定性数据列名
|
||
uncertainty_method : str, default='auto'
|
||
不确定性计算方法
|
||
calculate_uncertainty : bool, default=True
|
||
是否计算不确定性
|
||
show_uncertainty : bool, default=True
|
||
是否显示不确定性叠加层
|
||
uncertainty_alpha : float, default=0.5
|
||
不确定性叠加层透明度
|
||
uncertainty_threshold : float, default=0.5
|
||
不确定性显示阈值
|
||
use_distance_diffusion : bool, default=True
|
||
是否使用距离扩散方法
|
||
max_diffusion_distance : float, optional
|
||
最大扩散距离
|
||
diffusion_power : float, default=2
|
||
距离扩散的IDW幂参数
|
||
diffusion_n_neighbors : int, default=15
|
||
距离扩散使用的最近邻点数
|
||
"""
|
||
print("=" * 60)
|
||
print("开始批量处理CSV文件")
|
||
print("=" * 60)
|
||
|
||
# 检查输入文件夹是否存在
|
||
if not os.path.isdir(csv_folder):
|
||
raise ValueError(f"输入文件夹不存在: {csv_folder}")
|
||
|
||
# 获取所有CSV文件
|
||
csv_files = glob.glob(os.path.join(csv_folder, "*.csv"))
|
||
if len(csv_files) == 0:
|
||
raise ValueError(f"在文件夹 {csv_folder} 中未找到CSV文件")
|
||
|
||
print(f"找到 {len(csv_files)} 个CSV文件")
|
||
|
||
# 创建输出文件夹
|
||
if output_folder is None:
|
||
output_folder = os.path.join(csv_folder, "map_output")
|
||
|
||
if not os.path.exists(output_folder):
|
||
os.makedirs(output_folder)
|
||
print(f"创建输出文件夹: {output_folder}")
|
||
else:
|
||
print(f"使用输出文件夹: {output_folder}")
|
||
|
||
# 统计信息
|
||
success_count = 0
|
||
fail_count = 0
|
||
failed_files = []
|
||
|
||
# 批量处理每个CSV文件
|
||
for i, csv_file in enumerate(csv_files, 1):
|
||
print("\n" + "=" * 60)
|
||
print(f"处理文件 {i}/{len(csv_files)}: {os.path.basename(csv_file)}")
|
||
print("=" * 60)
|
||
|
||
try:
|
||
# 生成输出文件名(使用CSV文件名,但扩展名为.png)
|
||
csv_basename = os.path.splitext(os.path.basename(csv_file))[0]
|
||
output_file = os.path.join(output_folder, f"{csv_basename}.png")
|
||
|
||
# 处理单个文件(自动识别参数并选择colormap)
|
||
self.process_data(
|
||
csv_file=csv_file,
|
||
shp_file=shp_file,
|
||
output_file=output_file,
|
||
resolution=resolution,
|
||
show_sample_points=show_sample_points,
|
||
base_map_tif=base_map_tif,
|
||
uncertainty_col=uncertainty_col,
|
||
uncertainty_method=uncertainty_method,
|
||
calculate_uncertainty=calculate_uncertainty,
|
||
show_uncertainty=show_uncertainty,
|
||
uncertainty_alpha=uncertainty_alpha,
|
||
uncertainty_threshold=uncertainty_threshold,
|
||
use_distance_diffusion=use_distance_diffusion,
|
||
max_diffusion_distance=max_diffusion_distance,
|
||
diffusion_power=diffusion_power,
|
||
diffusion_n_neighbors=diffusion_n_neighbors,
|
||
cmap=None # 自动识别
|
||
)
|
||
|
||
success_count += 1
|
||
print(f"✓ 成功处理: {csv_basename}.png")
|
||
|
||
except Exception as e:
|
||
fail_count += 1
|
||
failed_files.append((os.path.basename(csv_file), str(e)))
|
||
print(f"✗ 处理失败: {os.path.basename(csv_file)}")
|
||
print(f" 错误信息: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# 输出批量处理结果统计
|
||
print("\n" + "=" * 60)
|
||
print("批量处理完成")
|
||
print("=" * 60)
|
||
print(f"总文件数: {len(csv_files)}")
|
||
print(f"成功: {success_count}")
|
||
print(f"失败: {fail_count}")
|
||
print(f"输出文件夹: {output_folder}")
|
||
|
||
if failed_files:
|
||
print("\n失败的文件列表:")
|
||
for file_name, error in failed_files:
|
||
print(f" - {file_name}: {error}")
|
||
|
||
return {
|
||
'total': len(csv_files),
|
||
'success': success_count,
|
||
'failed': fail_count,
|
||
'output_folder': output_folder,
|
||
'failed_files': failed_files
|
||
}
|
||
|
||
|
||
def main():
|
||
"""主函数 - 使用示例"""
|
||
# 创建处理器实例
|
||
mapper = ContentMapper()
|
||
|
||
# 示例1:处理单个文件
|
||
# csv_file = r"E:\code\WQ\yaobao925\预测样点_无耀斑\BGA.csv" # 采样点的预测值
|
||
# shp_file = r"E:\code\WQ\yaobao925\roi\shp\roi.shp" # 水体边界shapefile路径
|
||
# output_file = r"E:\code\WQ\yaobao925\map\test\BGA.png" # 输出图片路径
|
||
#
|
||
# mapper.process_data(
|
||
# csv_file=csv_file,
|
||
# shp_file=shp_file,
|
||
# output_file=output_file,
|
||
# resolution=30, # 网格分辨率(米),更小的值产生更平滑的效果
|
||
# show_sample_points=False, # 设置为False以显示平滑的颜色分布,True则显示采样点位置
|
||
# base_map_tif=None, # 正射底图路径(可选)
|
||
# # 不确定性相关参数
|
||
# uncertainty_col=None, # 如果不确定性列名已知,可以指定;否则会自动检测
|
||
# uncertainty_method='auto', # 'auto', 'kriging', 'idw', 'rbf', 'model_variance'
|
||
# calculate_uncertainty=True, # 是否计算不确定性
|
||
# show_uncertainty=True, # 是否在图上显示不确定性叠加层
|
||
# uncertainty_alpha=0.2, # 不确定性叠加层透明度(0-1)
|
||
# uncertainty_threshold=0.2, # 不确定性显示阈值(0-1),只显示高于此阈值的不确定性区域
|
||
# cmap=None # 自动从文件名或内容中识别参数并选择对应的colormap
|
||
# )
|
||
|
||
# 示例2:批量处理文件夹中的所有CSV文件
|
||
csv_folder = r"D:\BaiduNetdiskDownload\yaobao\prediction" # CSV文件所在文件夹
|
||
shp_file = r"E:\code\WQ\yaobao925\roi\shp\roi.shp" # 水体边界shapefile路径
|
||
output_folder = r"D:\BaiduNetdiskDownload\yaobao\model" # 输出文件夹(可选,如果为None则在CSV文件夹下创建map_output)
|
||
|
||
# 批量处理(会自动识别每个CSV文件的参数名称并选择对应的colormap)
|
||
result = mapper.process_batch(
|
||
csv_folder=csv_folder,
|
||
shp_file=shp_file,
|
||
output_folder=output_folder, # 如果为None,将在CSV文件夹下创建map_output子文件夹
|
||
resolution=30, # 网格分辨率(米)
|
||
show_sample_points=False, # 是否显示采样点
|
||
base_map_tif=None, # 正射底图路径(可选)
|
||
# 不确定性相关参数
|
||
uncertainty_col=None,
|
||
uncertainty_method='auto',
|
||
calculate_uncertainty=True,
|
||
show_uncertainty=True,
|
||
uncertainty_alpha=0.2,
|
||
uncertainty_threshold=0.2
|
||
)
|
||
|
||
print(f"\n批量处理结果: {result}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 使用示例
|
||
print("含量分布图生成器")
|
||
print("=" * 50)
|
||
|
||
# 如果要直接运行,请取消下面的注释并修改文件路径
|
||
main()
|
||
|
||
# 或者交互式使用
|
||
# print("使用方法:")
|
||
# print("1. 准备CSV文件(前两列为WGS84经纬度,第三列为含量数据)")
|
||
# print("2. 准备边界Shapefile文件")
|
||
# print("3. 调用以下代码:")
|
||
# print("""
|
||
# mapper = ContentMapper()
|
||
# mapper.process_data(
|
||
# csv_file='your_data.csv',
|
||
# shp_file='your_boundary.shp',
|
||
# output_file='output_map.png',
|
||
# resolution=50
|
||
# )
|
||
# """)
|