import numpy as np from util import * from osgeo import gdal import json from shapely.geometry import Point, Polygon import xml.etree.ElementTree as ET def parse_xml(roifilepath_xml): # 读取 XML 文件 tree = ET.parse(roifilepath_xml) # 替换为您的 XML 文件路径 root = tree.getroot() # 打印根标签 # print("Root tag:", root.tag) # 遍历 XML 文件的所有子元素 # for child in root: # print("Tag:", child.tag, "| Attributes:", child.attrib) # 查找特定标签 polygons = [] for region in root.findall(".//Region"): # print("Region name:", region.get("name")) for polygon in region.findall(".//Polygon"): coordinates_str = polygon.find(".//Coordinates").text.strip() coordinates = list(map(float, coordinates_str.split())) points = [(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 2)] polygon = Polygon(points) polygons.append(polygon) # print("Polygon coordinates:", coordinates) return CoorType.depend_on_image, polygons def create_json(filepath_json): """ 用户创建sample json文件(parse_json函数需要能够解析),创建逆时针闭合不规则多边形 :param filepath_json: :return: """ polygons = { "Available_Coordinates_type": ["depend_on_image", "pixel"], "Coordinates_type": "pixel", "polygons": [ { "id": 1, "coordinates": [ {"x": 120, "y": 300}, {"x": 250, "y": 220}, {"x": 340, "y": 400}, {"x": 200, "y": 500}, {"x": 120, "y": 300} # 闭合 ] }, { "id": 2, "coordinates": [ {"x": 400, "y": 100}, {"x": 550, "y": 180}, {"x": 480, "y": 350}, {"x": 370, "y": 280}, {"x": 400, "y": 100} # 闭合 ] } ] } # 保存为 JSON 文件 with open(filepath_json, "w") as json_file: json.dump(polygons, json_file, indent=4) def parse_json(roifilepath_json): with open(roifilepath_json, "r") as json_file: loaded_data = json.load(json_file) coor_type_tmp = loaded_data["Coordinates_type"] if coor_type_tmp == "pixel": coor_type = CoorType.pixel elif coor_type_tmp == "depend_on_image": coor_type = CoorType.depend_on_image else: print("Error: unknown coordinates type!!!") return # 构建多边形 polygons = [] for poly in loaded_data["polygons"]: # 提取多边形坐标 coordinates = [(point["x"], point["y"]) for point in poly["coordinates"]] # 使用 Shapely 构造多边形 polygons.append(Polygon(coordinates)) return coor_type, polygons def parse_roifile(roifilepath): extension = os.path.splitext(roifilepath)[-1].removeprefix(".") if extension == "json": coor_type, polygons = parse_json(roifilepath) elif extension == "xml": coor_type, polygons = parse_xml(roifilepath) else: print("Error: unknown roi file type!!!") return return coor_type, polygons def get_coors_in_polygons(polygons): """ :param polygons: list类型,坐标类型必须为像素坐标 :return: """ x = [] y = [] for i in range(len(polygons)): polygon = polygons[i] bounds = polygon.bounds for x_tmp in range(int(bounds[0]), int(bounds[2])): for y_tmp in range(int(bounds[1]), int(bounds[3])): if polygon.contains(Point(x_tmp, y_tmp)): x.append(x_tmp) y.append(y_tmp) return x, y @timeit def get_pixel_coors_in_polygons(roifilepath, dataset): coor_type, polygons = parse_roifile(roifilepath) polygons_pixel_coors = [] # 如果polygons坐标不是像素坐标,则转换为像素坐标 if coor_type == CoorType.depend_on_image: geotransform_input = dataset.GetGeoTransform() inv_geotransform_input = gdal.InvGeoTransform(geotransform_input) for polygon in polygons: shifted_coords = [gdal.ApplyGeoTransform(inv_geotransform_input, x, y) for x, y in polygon.exterior.coords] shifted_polygon = Polygon(shifted_coords) polygons_pixel_coors.append(shifted_polygon) elif coor_type == CoorType.pixel: polygons_pixel_coors = polygons return get_coors_in_polygons(polygons_pixel_coors) def get_valid_pixel_value(band, x, y): value = band[y, x] value = np.sort(value) return value @timeit def deglint_regression_slope(imgpath, water_mask, out_imgpath, start_nir_wave, end_nir_wave, filepath_json): """ :param imgpath: :param out_imgpath: :param start_nir_wave: :param end_nir_wave: :param args: :param kwargs: :return: """ nir = average_bands(start_nir_wave, end_nir_wave, imgpath) dataset = gdal.Open(imgpath) im_width = dataset.RasterXSize im_height = dataset.RasterYSize num_bands = dataset.RasterCount geotransform = dataset.GetGeoTransform() im_proj = dataset.GetProjection() valid_pixel_coor1_x, valid_pixel_coor1_y = get_pixel_coors_in_polygons(filepath_json, dataset) x = get_valid_pixel_value(nir, valid_pixel_coor1_x, valid_pixel_coor1_y) format = "ENVI" driver = gdal.GetDriverByName(format) dst_ds = driver.Create(out_imgpath, im_width, im_height, num_bands, gdal.GDT_Int16, options=["INTERLEAVE=BSQ"]) dst_ds.SetGeoTransform(geotransform) dst_ds.SetProjection(im_proj) dataset_water_mask = gdal.Open(water_mask) data_water_mask = dataset_water_mask.GetRasterBand(1).ReadAsArray() del dataset_water_mask for i in range(num_bands): band = dataset.GetRasterBand(i + 1).ReadAsArray() y = get_valid_pixel_value(band, valid_pixel_coor1_x, valid_pixel_coor1_y) coefficients = np.polyfit(x, y, 1) glint = (nir - min(x)) * coefficients[0] glint[np.where(glint < 0)] = 0 glint[np.where(data_water_mask == 0)] = 0 band_deglint = band - glint # 确保值大于等于0 band_deglint[np.where(band_deglint < 0)] = 0 dst_ds.GetRasterBand(i + 1).WriteArray(band_deglint) del dataset, dst_ds write_fields_to_hdrfile(get_hdr_file_path(imgpath), get_hdr_file_path(out_imgpath))