''' 1、此版本所需文件:dn影像、dark影像和asd测定的积分球辐亮度曲线 2、处理过程:1)扣除暗电流;2)通过辐亮度曲线生成定标文件 ''' import numpy as np import pandas as pd import sys, os, traceback, re import shutil from radiance_calibration_ui import Ui_MainWindow from library.multithread import Worker from library.image_reader_writer import ImageReaderWriter import sys, traceback from osgeo import gdal from scipy.interpolate import interp1d import spectral from PyQt5.QtWidgets import QMainWindow, QFileDialog, QApplication from PyQt5.QtCore import Qt class EnterWindow(QMainWindow, Ui_MainWindow): def __init__(self, parent=None): super(EnterWindow, self).__init__(parent) self.setupUi(self) self.radiance_calibration_object = RadianceCalibration() # self.setWindowState(Qt.WindowMaximized) # 初始化时就最大化窗口 self.rad_bt.clicked.connect(self.select_rad) self.dn_bt.clicked.connect(self.select_dn) self.dark_bt.clicked.connect(self.select_dark) self.out_file_bt.clicked.connect(self.select_out_file) self.operate_bt.clicked.connect(self.operate) self.dark_checkBox.stateChanged.connect(self.disabledDark) self.tmp_rad_file_path = None # 用于保存当前路径,下次打开就默认在此路径下 self.tmp_dn_file_path = None # 用于保存当前路径,下次打开就默认在此路径下 self.tmp_dark_file_path = None # 用于保存当前路径,下次打开就默认在此路径下 self.tmp_out_file_path = None # 用于保存当前路径,下次打开就默认在此路径下 def select_rad(self): if self.tmp_rad_file_path == None: rad_file_path = QFileDialog.getOpenFileName(self, '选择asd辐亮度文件', os.path.dirname(__file__))[0] self.tmp_rad_file_path = os.path.dirname(rad_file_path) elif self.tmp_rad_file_path is not None: rad_file_path = QFileDialog.getOpenFileName(self, '选择asd辐亮度文件', self.tmp_rad_file_path)[0] self.tmp_rad_file_path = os.path.dirname(rad_file_path) self.radiance_calibration_object.asd_radiance_file_path = rad_file_path self.rad_le.setText(rad_file_path) def select_dn(self): if self.tmp_dn_file_path == None: dn_file_path = QFileDialog.getOpenFileName(self, '选择dn影像', os.path.dirname(__file__))[0] self.tmp_dn_file_path = os.path.dirname(dn_file_path) elif self.tmp_dn_file_path is not None: dn_file_path = QFileDialog.getOpenFileName(self, '选择dn影像', self.tmp_dn_file_path)[0] self.tmp_dn_file_path = os.path.dirname(dn_file_path) self.radiance_calibration_object.dn_file_path = dn_file_path self.dn_le.setText(dn_file_path) def select_dark(self): if self.tmp_dark_file_path == None: dark_file_path = QFileDialog.getOpenFileName(self, '选择dark影像', os.path.dirname(__file__))[0] self.tmp_dark_file_path = os.path.dirname(dark_file_path) elif self.tmp_dark_file_path is not None: dark_file_path = QFileDialog.getOpenFileName(self, '选择dark影像', self.tmp_dark_file_path)[0] self.tmp_dark_file_path = os.path.dirname(dark_file_path) self.radiance_calibration_object.dark_file_path = dark_file_path self.dark_le.setText(dark_file_path) def select_out_file(self): if self.tmp_out_file_path == None: out_file_path = QFileDialog.getExistingDirectory(self, '选择输出路径', os.path.dirname(__file__)) self.tmp_out_file_path = out_file_path elif self.tmp_out_file_path is not None: out_file_path = QFileDialog.getExistingDirectory(self, '选择输出路径', self.tmp_out_file_path) self.tmp_out_file_path = out_file_path self.radiance_calibration_object.out_file_path = out_file_path self.out_file_le.setText(out_file_path) def operate(self): if(self.dark_checkBox.isChecked()): self.radiance_calibration_object.operate() else: self.radiance_calibration_object.operate_without_dark() def disabledDark(self, isEnable): self.dark_le.setEnabled(isEnable) self.dark_bt.setEnabled(isEnable) class RadianceCalibration(): def __init__(self): # asd_radiance_file = os.path.dirname(__file__) + '\\corning_radance.txt' # img_file_path = os.path.dirname(__file__) + '\\jfq_dn' # img_dark_file_path = os.path.dirname(__file__) + '\\jfq_dn_dark' # out_file = os.path.dirname(__file__) + '\\jfq_dn_gain' self.asd_radiance_file_path = None self.dn_file_path = None self.dark_file_path = None self.img_data = None self.img_dark_data = None self.out_file_path = None def get_envi_header_dict(self, hdr_file): with open(hdr_file, encoding='utf-8') as file_obj: contents = file_obj.read() # Get all "key = {val}" type matches regex = re.compile(r'^(.+?)\s*=\s*({\s*.*?\n*.*?})$', re.M | re.I) matches = regex.findall(contents) # Remove them from the header subhdr = regex.sub('', contents) # Get all "key = val" type matches regex = re.compile(r'^(.+?)\s*=\s*(.*?)$', re.M | re.I) matches.extend(regex.findall(subhdr)) hdr_fields = dict(matches) keys_delete = [] for key in hdr_fields.keys(): if ( key == "interleave" or key == "byte order" or key == "data type" or key == "samples" or key == "bands" or key == "lines" or key == "framerate"): keys_delete.append(key) for x in range(len(keys_delete)): hdr_fields.pop(keys_delete[x]) return hdr_fields def get_hdr_filename(self, file_path): return os.path.splitext(file_path)[0] + ".hdr" def get_hdr_filename_with_bip(self, file_path): return os.path.splitext(file_path)[0] + ".bip.hdr" def write_fields_to_hdrfile(self, fields, hdr_file): header_tmp = spectral.envi.read_envi_header(hdr_file) with open(hdr_file, "a", encoding='utf-8') as f: for key in fields.keys(): if key in header_tmp or key == "description": continue if type(fields[key]) == list: f.write(key + " = {" + ", ".join(fields[key]) + "}\n") else: f.write(key + " = " + fields[key] + "\n") def process_hdr(self, hdr_file_path, envi_header_dict): self.write_fields_to_hdrfile(envi_header_dict, self.get_hdr_filename(hdr_file_path)) shutil.copyfile(self.get_hdr_filename(hdr_file_path), self.get_hdr_filename_with_bip(hdr_file_path)) os.remove(self.get_hdr_filename(hdr_file_path)) # 读图像文件 def read_img(self, filename, xoff=0, yoff=0, im_width=None, im_height=None): try: dataset = gdal.Open(filename) # 打开文件 if im_width == None: im_width = dataset.RasterXSize # 栅格矩阵的列数 if im_height == None: im_height = dataset.RasterYSize # 栅格矩阵的行数 num_bands = dataset.RasterCount # 栅格矩阵的波段数 im_geotrans = dataset.GetGeoTransform() # 仿射矩阵 im_proj = dataset.GetProjection() # 地图投影信息 im_data = dataset.ReadAsArray(xoff, yoff, im_width, im_height) # 将数据写成数组,对应栅格矩阵 del dataset return im_proj, im_geotrans, im_data except AttributeError: print('gdal打开影像:没有文件!') except Exception: traceback.print_exc() pass # 写文件,以写成tif为例 def write_img(self, dst_filename, data): format = "ENVI" driver = gdal.GetDriverByName(format) RasterXSize = data.shape[2] # 遥感影像的sample(列数) RasterYSize = data.shape[1] # 遥感影像的line(行数) band = data.shape[0] # driver.Create()函数中RasterXSize代表影像的sample(列数),RasterYSize代表影像的line(行数) dst_ds = driver.Create(dst_filename, RasterXSize, RasterYSize, band, gdal.GDT_Float64, options=["INTERLEAVE=BIP"]) for i in range(band): dst_ds.GetRasterBand(i + 1).WriteArray(data[i, :, :]) # gdal的band从1开始,所以dst_ds.GetRasterBand(i+1) dst_ds = None def operate(self): try: # 读取asd辐亮度数据 data = pd.read_csv(self.asd_radiance_file_path, sep='\t', dtype=np.float64, header=None) self.asd_radiance = np.array(data) # 读取影像 img_proj, img_geotrans, self.img_data = self.read_img(self.dn_file_path) # dn_hdr = self.get_envi_header_dict(self.get_hdr_filename(self.dn_file_path)) dn_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dn_file_path)) img_dark_proj, img_dark_geotrans, self.img_dark_data = self.read_img(self.dark_file_path) # dark_hdr = self.get_envi_header_dict(self.get_hdr_filename(self.dark_file_path)) dark_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dark_file_path)) # 重采样 f = interp1d(self.asd_radiance[:, 0], self.asd_radiance[:, 1]) # wave_destination = dn_hdr["wavelength"].removeprefix("{").removesuffix("}").split(",") asd_radiance_interpolated = [f(float(i)) for i in dn_hdr["wavelength"]] asd_radiance_interpolated2 = np.array(asd_radiance_interpolated) # notuse = np.zeros((asd_radiance_interpolated2.shape[0], 2)) # notuse[:, 0] = np.array([float(i) for i in dn_hdr["wavelength"]]) # notuse[:, 1] =asd_radiance_interpolated2 # np.savetxt(os.path.join(self.out_file_path, "asd_radiance_interpolated.txt"), notuse) # 将影像所有行平均,得到一行(帧)影像 img_data_ave = np.mean(self.img_data, axis=1) img_dark_data_ave = np.mean(self.img_dark_data, axis=1) # 去除暗电流 img_data_ave_rmdark = img_data_ave - img_dark_data_ave gain = np.empty((img_data_ave_rmdark.shape[0], 1, img_data_ave_rmdark.shape[1])) offset = np.empty((img_data_ave_rmdark.shape[0], 1, img_data_ave_rmdark.shape[1])) for i in range(gain.shape[2]): gain[:, 0, i] = asd_radiance_interpolated2 / img_data_ave_rmdark[:, i] offset[:, 0, i] = img_dark_data_ave[:, i] gain_name = os.path.join(self.out_file_path, "gain.bip") offset_name = os.path.join(self.out_file_path, "offset.bip") self.write_img(gain_name, gain) self.process_hdr(gain_name, dn_hdr) self.write_img(offset_name, offset) self.process_hdr(offset_name, dark_hdr) except: traceback.print_exc() def operate_without_dark(self): try: # 读取asd辐亮度数据 data = pd.read_csv(self.asd_radiance_file_path, sep='\t', dtype=np.float64, header=None) self.asd_radiance = np.array(data) # 读取影像 img_proj, img_geotrans, self.img_data = self.read_img(self.dn_file_path) dn_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dn_file_path)) # 重采样 f = interp1d(self.asd_radiance[:, 0], self.asd_radiance[:, 1]) # wave_destination = dn_hdr["wavelength"].removeprefix("{").removesuffix("}").split(",") asd_radiance_interpolated = [f(float(i)) for i in dn_hdr["wavelength"]] asd_radiance_interpolated2 = np.array(asd_radiance_interpolated) # 将影像所有行平均,得到一行(帧)影像 img_data_ave = np.mean(self.img_data, axis=1) gain = np.empty((img_data_ave.shape[0], 1, img_data_ave.shape[1])) for i in range(gain.shape[2]): gain[:, 0, i] = asd_radiance_interpolated2 / img_data_ave[:, i] gain_name = os.path.join(self.out_file_path, "gain.bip") ImageReaderWriter.write_img(gain_name, gain) # self.write_img(gain_name, gain) except: traceback.print_exc() if __name__ == '__main__': app = QApplication(sys.argv) # 实例化主窗口 enter_window_instance = EnterWindow() enter_window_instance.show() sys.exit(app.exec_())