434 lines
19 KiB
Python
434 lines
19 KiB
Python
'''
|
||
1、此版本所需文件:dn影像、dark影像和asd测定的积分球辐亮度曲线
|
||
2、处理过程:1)扣除暗电流;2)通过辐亮度曲线生成定标文件
|
||
'''
|
||
import numpy as np
|
||
import pandas as pd
|
||
import sys, os, traceback, re
|
||
import shutil
|
||
from radiance_calibration_ui import Ui_MainWindow
|
||
from library.multithread import Worker
|
||
from library.image_reader_writer import ImageReaderWriter
|
||
import sys, traceback
|
||
from osgeo import gdal
|
||
from scipy.interpolate import interp1d
|
||
import spectral
|
||
import time
|
||
import copy
|
||
|
||
from PyQt5.QtWidgets import QMainWindow, QFileDialog, QApplication
|
||
from PyQt5.QtCore import Qt
|
||
|
||
|
||
class EnterWindow(QMainWindow, Ui_MainWindow):
|
||
def __init__(self, parent=None):
|
||
super(EnterWindow, self).__init__(parent)
|
||
self.setupUi(self)
|
||
|
||
self.radiance_calibration_object = RadianceCalibration()
|
||
self.changeBinForCorning_object = ChangeBinForCorning()
|
||
|
||
# self.setWindowState(Qt.WindowMaximized) # 初始化时就最大化窗口
|
||
|
||
self.rad_bt.clicked.connect(self.select_rad)
|
||
self.dn_bt.clicked.connect(self.select_dn)
|
||
self.dark_bt.clicked.connect(self.select_dark)
|
||
self.out_file_bt.clicked.connect(self.select_out_file)
|
||
self.calfile_bin1_bt.clicked.connect(self.select_calfile_changebin)
|
||
self.out_calfile_bt.clicked.connect(self.select_out_file_changebin)
|
||
|
||
|
||
self.operate_bt.clicked.connect(self.operate)
|
||
self.dark_checkBox.stateChanged.connect(self.disabledDark)
|
||
|
||
self.tmp_rad_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
|
||
self.tmp_dn_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
|
||
self.tmp_dark_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
|
||
self.tmp_out_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
|
||
|
||
self.spatialbin_comboBox.addItem("1")
|
||
# self.spatialbin_comboBox.addItem("2")
|
||
# self.spectral_comboBox.addItem("1")
|
||
self.spectral_comboBox.addItem("2")
|
||
|
||
self.change_bin_operate_bt.clicked.connect(self.operate_changebin)
|
||
|
||
|
||
def select_rad(self):
|
||
if self.tmp_rad_file_path == None:
|
||
rad_file_path = QFileDialog.getOpenFileName(self, '选择asd辐亮度文件', os.path.dirname(__file__))[0]
|
||
|
||
self.tmp_rad_file_path = os.path.dirname(rad_file_path)
|
||
elif self.tmp_rad_file_path is not None:
|
||
rad_file_path = QFileDialog.getOpenFileName(self, '选择asd辐亮度文件', self.tmp_rad_file_path)[0]
|
||
|
||
self.tmp_rad_file_path = os.path.dirname(rad_file_path)
|
||
|
||
self.radiance_calibration_object.asd_radiance_file_path = rad_file_path
|
||
self.rad_le.setText(rad_file_path)
|
||
|
||
def select_dn(self):
|
||
if self.tmp_dn_file_path == None:
|
||
dn_file_path = QFileDialog.getOpenFileName(self, '选择dn影像', os.path.dirname(__file__))[0]
|
||
|
||
self.tmp_dn_file_path = os.path.dirname(dn_file_path)
|
||
elif self.tmp_dn_file_path is not None:
|
||
dn_file_path = QFileDialog.getOpenFileName(self, '选择dn影像', self.tmp_dn_file_path)[0]
|
||
|
||
self.tmp_dn_file_path = os.path.dirname(dn_file_path)
|
||
|
||
self.radiance_calibration_object.dn_file_path = dn_file_path
|
||
self.dn_le.setText(dn_file_path)
|
||
|
||
def select_dark(self):
|
||
if self.tmp_dark_file_path == None:
|
||
dark_file_path = QFileDialog.getOpenFileName(self, '选择dark影像', os.path.dirname(__file__))[0]
|
||
|
||
self.tmp_dark_file_path = os.path.dirname(dark_file_path)
|
||
elif self.tmp_dark_file_path is not None:
|
||
dark_file_path = QFileDialog.getOpenFileName(self, '选择dark影像', self.tmp_dark_file_path)[0]
|
||
|
||
self.tmp_dark_file_path = os.path.dirname(dark_file_path)
|
||
|
||
self.radiance_calibration_object.dark_file_path = dark_file_path
|
||
self.dark_le.setText(dark_file_path)
|
||
|
||
def select_out_file(self):
|
||
if self.tmp_out_file_path == None:
|
||
out_file_path = QFileDialog.getExistingDirectory(self, '选择输出路径', os.path.dirname(__file__))
|
||
|
||
self.tmp_out_file_path = out_file_path
|
||
elif self.tmp_out_file_path is not None:
|
||
out_file_path = QFileDialog.getExistingDirectory(self, '选择输出路径', self.tmp_out_file_path)
|
||
|
||
self.tmp_out_file_path = out_file_path
|
||
|
||
self.radiance_calibration_object.out_file_path = out_file_path
|
||
self.out_file_le.setText(out_file_path)
|
||
|
||
def select_calfile_changebin(self):
|
||
tmp = QFileDialog.getExistingDirectory(self, 'bin1定标文件夹', os.path.dirname(__file__))
|
||
|
||
self.changeBinForCorning_object.calfile_path = tmp
|
||
self.calfile_bin1_le.setText(tmp)
|
||
|
||
def select_out_file_changebin(self):
|
||
tmp = QFileDialog.getExistingDirectory(self, '选择输出路径', os.path.dirname(__file__))
|
||
self.changeBinForCorning_object.out_file_path = tmp
|
||
self.out_calfile_changebin_le.setText(tmp)
|
||
|
||
def operate(self):
|
||
if(self.dark_checkBox.isChecked()):
|
||
self.radiance_calibration_object.operate()
|
||
else:
|
||
self.radiance_calibration_object.operate_without_dark()
|
||
|
||
def operate_changebin(self):
|
||
self.changeBinForCorning_object.spatialbin = int(self.spatialbin_comboBox.currentText())
|
||
self.changeBinForCorning_object.spectralbin = int(self.spectral_comboBox.currentText())
|
||
|
||
self.changeBinForCorning_object.changebin()
|
||
|
||
def disabledDark(self, isEnable):
|
||
self.dark_le.setEnabled(isEnable)
|
||
self.dark_bt.setEnabled(isEnable)
|
||
|
||
|
||
class RadianceCalibration():
|
||
def __init__(self):
|
||
# asd_radiance_file = os.path.dirname(__file__) + '\\corning_radance.txt'
|
||
# img_file_path = os.path.dirname(__file__) + '\\jfq_dn'
|
||
# img_dark_file_path = os.path.dirname(__file__) + '\\jfq_dn_dark'
|
||
# out_file = os.path.dirname(__file__) + '\\jfq_dn_gain'
|
||
|
||
self.asd_radiance_file_path = None
|
||
self.dn_file_path = None
|
||
self.dark_file_path = None
|
||
|
||
self.img_data = None
|
||
self.img_dark_data = None
|
||
|
||
self.out_file_path = None
|
||
|
||
def get_envi_header_dict(self, hdr_file):
|
||
with open(hdr_file, encoding='utf-8') as file_obj:
|
||
contents = file_obj.read()
|
||
|
||
# Get all "key = {val}" type matches
|
||
regex = re.compile(r'^(.+?)\s*=\s*({\s*.*?\n*.*?})$', re.M | re.I)
|
||
matches = regex.findall(contents)
|
||
|
||
# Remove them from the header
|
||
subhdr = regex.sub('', contents)
|
||
|
||
# Get all "key = val" type matches
|
||
regex = re.compile(r'^(.+?)\s*=\s*(.*?)$', re.M | re.I)
|
||
matches.extend(regex.findall(subhdr))
|
||
|
||
hdr_fields = dict(matches)
|
||
keys_delete = []
|
||
for key in hdr_fields.keys():
|
||
if (
|
||
key == "interleave" or key == "byte order" or key == "data type" or key == "samples" or key == "bands" or key == "lines" or key == "framerate"):
|
||
keys_delete.append(key)
|
||
|
||
for x in range(len(keys_delete)):
|
||
hdr_fields.pop(keys_delete[x])
|
||
|
||
return hdr_fields
|
||
|
||
def get_hdr_filename(self, file_path):
|
||
return os.path.splitext(file_path)[0] + ".hdr"
|
||
|
||
def get_hdr_filename_with_bip(self, file_path):
|
||
return os.path.splitext(file_path)[0] + ".bip.hdr"
|
||
|
||
def write_fields_to_hdrfile(self, fields, hdr_file):
|
||
header_tmp = spectral.envi.read_envi_header(hdr_file)
|
||
|
||
fields['generated time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
with open(hdr_file, "a", encoding='utf-8') as f:
|
||
for key in fields.keys():
|
||
if key in header_tmp or key == "description":
|
||
continue
|
||
|
||
if type(fields[key]) == list:
|
||
f.write(key + " = {" + ", ".join(fields[key]) + "}\n")
|
||
else:
|
||
f.write(key + " = " + fields[key] + "\n")
|
||
|
||
def process_hdr(self, hdr_file_path, envi_header_dict):
|
||
self.write_fields_to_hdrfile(envi_header_dict, self.get_hdr_filename(hdr_file_path))
|
||
shutil.copyfile(self.get_hdr_filename(hdr_file_path), self.get_hdr_filename_with_bip(hdr_file_path))
|
||
os.remove(self.get_hdr_filename(hdr_file_path))
|
||
|
||
# 读图像文件
|
||
def read_img(self, filename, xoff=0, yoff=0, im_width=None, im_height=None):
|
||
try:
|
||
dataset = gdal.Open(filename) # 打开文件
|
||
if im_width == None:
|
||
im_width = dataset.RasterXSize # 栅格矩阵的列数
|
||
if im_height == None:
|
||
im_height = dataset.RasterYSize # 栅格矩阵的行数
|
||
num_bands = dataset.RasterCount # 栅格矩阵的波段数
|
||
im_geotrans = dataset.GetGeoTransform() # 仿射矩阵
|
||
im_proj = dataset.GetProjection() # 地图投影信息
|
||
im_data = dataset.ReadAsArray(xoff, yoff, im_width, im_height) # 将数据写成数组,对应栅格矩阵
|
||
del dataset
|
||
return im_proj, im_geotrans, im_data
|
||
except AttributeError:
|
||
print('gdal打开影像:没有文件!')
|
||
except Exception:
|
||
traceback.print_exc()
|
||
pass
|
||
|
||
# 写文件,以写成tif为例
|
||
def write_img(self, dst_filename, data):
|
||
format = "ENVI"
|
||
driver = gdal.GetDriverByName(format)
|
||
RasterXSize = data.shape[2] # 遥感影像的sample(列数)
|
||
RasterYSize = data.shape[1] # 遥感影像的line(行数)
|
||
band = data.shape[0]
|
||
# driver.Create()函数中RasterXSize代表影像的sample(列数),RasterYSize代表影像的line(行数)
|
||
dst_ds = driver.Create(dst_filename, RasterXSize, RasterYSize, band, gdal.GDT_Float64,
|
||
options=["INTERLEAVE=BIP"])
|
||
for i in range(band):
|
||
dst_ds.GetRasterBand(i + 1).WriteArray(data[i, :, :]) # gdal的band从1开始,所以dst_ds.GetRasterBand(i+1)
|
||
dst_ds = None
|
||
|
||
def operate(self):
|
||
try:
|
||
# 读取asd辐亮度数据
|
||
data = pd.read_csv(self.asd_radiance_file_path, sep='\t', dtype=np.float64, header=None)
|
||
self.asd_radiance = np.array(data)
|
||
|
||
# 读取影像
|
||
img_proj, img_geotrans, self.img_data = self.read_img(self.dn_file_path)
|
||
# dn_hdr = self.get_envi_header_dict(self.get_hdr_filename(self.dn_file_path))
|
||
dn_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dn_file_path))
|
||
img_dark_proj, img_dark_geotrans, self.img_dark_data = self.read_img(self.dark_file_path)
|
||
# dark_hdr = self.get_envi_header_dict(self.get_hdr_filename(self.dark_file_path))
|
||
dark_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dark_file_path))
|
||
|
||
# 重采样
|
||
f = interp1d(self.asd_radiance[:, 0], self.asd_radiance[:, 1])
|
||
# wave_destination = dn_hdr["wavelength"].removeprefix("{").removesuffix("}").split(",")
|
||
asd_radiance_interpolated = [f(float(i)) for i in dn_hdr["wavelength"]]
|
||
asd_radiance_interpolated2 = np.array(asd_radiance_interpolated)
|
||
|
||
# notuse = np.zeros((asd_radiance_interpolated2.shape[0], 2))
|
||
# notuse[:, 0] = np.array([float(i) for i in dn_hdr["wavelength"]])
|
||
# notuse[:, 1] =asd_radiance_interpolated2
|
||
# np.savetxt(os.path.join(self.out_file_path, "asd_radiance_interpolated.txt"), notuse)
|
||
|
||
|
||
# 将影像所有行平均,得到一行(帧)影像
|
||
img_data_ave = np.mean(self.img_data, axis=1)
|
||
img_dark_data_ave = np.mean(self.img_dark_data, axis=1)
|
||
|
||
# 去除暗电流
|
||
img_data_ave_rmdark = img_data_ave - img_dark_data_ave
|
||
|
||
gain = np.empty((img_data_ave_rmdark.shape[0], 1, img_data_ave_rmdark.shape[1]))
|
||
offset = np.empty((img_data_ave_rmdark.shape[0], 1, img_data_ave_rmdark.shape[1]))
|
||
for i in range(gain.shape[2]):
|
||
gain[:, 0, i] = asd_radiance_interpolated2 / img_data_ave_rmdark[:, i]
|
||
offset[:, 0, i] = img_dark_data_ave[:, i]
|
||
|
||
gain_name = os.path.join(self.out_file_path, "gain.bip")
|
||
offset_name = os.path.join(self.out_file_path, "offset.bip")
|
||
self.write_img(gain_name, gain)
|
||
self.process_hdr(gain_name, dn_hdr)
|
||
self.write_img(offset_name, offset)
|
||
self.process_hdr(offset_name, dark_hdr)
|
||
except:
|
||
traceback.print_exc()
|
||
|
||
def operate_without_dark(self):
|
||
try:
|
||
# 读取asd辐亮度数据
|
||
data = pd.read_csv(self.asd_radiance_file_path, sep='\t', dtype=np.float64, header=None)
|
||
self.asd_radiance = np.array(data)
|
||
|
||
# 读取影像
|
||
img_proj, img_geotrans, self.img_data = self.read_img(self.dn_file_path)
|
||
dn_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dn_file_path))
|
||
|
||
# 重采样
|
||
f = interp1d(self.asd_radiance[:, 0], self.asd_radiance[:, 1])
|
||
# wave_destination = dn_hdr["wavelength"].removeprefix("{").removesuffix("}").split(",")
|
||
asd_radiance_interpolated = [f(float(i)) for i in dn_hdr["wavelength"]]
|
||
asd_radiance_interpolated2 = np.array(asd_radiance_interpolated)
|
||
|
||
# 将影像所有行平均,得到一行(帧)影像
|
||
img_data_ave = np.mean(self.img_data, axis=1)
|
||
|
||
gain = np.empty((img_data_ave.shape[0], 1, img_data_ave.shape[1]))
|
||
for i in range(gain.shape[2]):
|
||
gain[:, 0, i] = asd_radiance_interpolated2 / img_data_ave[:, i]
|
||
|
||
gain_name = os.path.join(self.out_file_path, "gain.bip")
|
||
ImageReaderWriter.write_img(gain_name, gain)
|
||
# self.write_img(gain_name, gain)
|
||
except:
|
||
traceback.print_exc()
|
||
|
||
|
||
class ChangeBinForCorning():
|
||
def __init__(self):
|
||
self.calfile_path = None
|
||
self.out_file_path = None
|
||
self.spatialbin = None
|
||
self.spectralbin = None
|
||
|
||
def get_hdr_filename(self, file_path):
|
||
return os.path.splitext(file_path)[0] + ".hdr"
|
||
|
||
def get_hdr_filename_with_bip(self, file_path):
|
||
return os.path.splitext(file_path)[0] + ".bip.hdr"
|
||
|
||
def write_img(self, dst_filename, data):
|
||
format = "ENVI"
|
||
driver = gdal.GetDriverByName(format)
|
||
RasterXSize = data.shape[2] # 遥感影像的sample(列数)
|
||
RasterYSize = data.shape[1] # 遥感影像的line(行数)
|
||
band = data.shape[0]
|
||
# driver.Create()函数中RasterXSize代表影像的sample(列数),RasterYSize代表影像的line(行数)
|
||
dst_ds = driver.Create(dst_filename, RasterXSize, RasterYSize, band, gdal.GDT_Float64,
|
||
options=["INTERLEAVE=BIP"])
|
||
for i in range(band):
|
||
dst_ds.GetRasterBand(i + 1).WriteArray(data[i, :, :]) # gdal的band从1开始,所以dst_ds.GetRasterBand(i+1)
|
||
dst_ds = None
|
||
|
||
def write_fields_to_hdrfile(self, fields, hdr_file):
|
||
header_tmp = spectral.envi.read_envi_header(hdr_file)
|
||
|
||
fields['generated time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
with open(hdr_file, "a", encoding='utf-8') as f:
|
||
for key in fields.keys():
|
||
if key in header_tmp or key == "description":
|
||
continue
|
||
|
||
if type(fields[key]) == list:
|
||
f.write(key + " = {" + ", ".join(fields[key]) + "}\n")
|
||
else:
|
||
f.write(key + " = " + fields[key] + "\n")
|
||
|
||
def process_hdr(self, hdr_file_path, envi_header_dict):
|
||
self.write_fields_to_hdrfile(envi_header_dict, self.get_hdr_filename(hdr_file_path))
|
||
shutil.copyfile(self.get_hdr_filename(hdr_file_path), self.get_hdr_filename_with_bip(hdr_file_path))
|
||
os.remove(self.get_hdr_filename(hdr_file_path))
|
||
|
||
def changebin(self):
|
||
gain_name_in = os.path.join(self.calfile_path, "gain.bip")
|
||
offset_name_in = os.path.join(self.calfile_path, "offset.bip")
|
||
img_proj, img_geotrans, img_data = ImageReaderWriter.read_img(gain_name_in)
|
||
img_proj1, img_geotrans1, img_dark = ImageReaderWriter.read_img(offset_name_in)
|
||
|
||
in_hdr_dict = spectral.envi.read_envi_header(self.get_hdr_filename_with_bip(gain_name_in))
|
||
|
||
if in_hdr_dict["spectral binning"] != "1" and in_hdr_dict["sample binning"] != "1":
|
||
print("输入定标文件不是bin1。")
|
||
return
|
||
|
||
if self.spatialbin == 1 and self.spectralbin == 1:
|
||
print("Bin1 is not need to convert.")
|
||
return
|
||
|
||
if self.spatialbin == 1 and self.spectralbin == 2:
|
||
samples = int(in_hdr_dict["samples"])
|
||
|
||
if int(in_hdr_dict["bands"]) % 2 == 0:
|
||
bands = int(int(in_hdr_dict["bands"]) / 2)
|
||
else:
|
||
print("Bands is not divisible by 2.")
|
||
return
|
||
|
||
out_hdr_dict = copy.deepcopy(in_hdr_dict)
|
||
out_hdr_dict["bands"] = str(bands)
|
||
out_hdr_dict["spectral binning"] = str(2)
|
||
wave = []
|
||
|
||
gain = np.empty((bands, 1, samples))
|
||
dark = np.empty((bands, 1, samples))
|
||
for i in range(gain.shape[0]):
|
||
gain[i, :, :] = (img_data[2 * i, :, :] + img_data[2 * i + 1, :, :]) / 2
|
||
dark[i, :, :] = (img_dark[2 * i, :, :] + img_dark[2 * i + 1, :, :]) / 2
|
||
|
||
tmp = (float(out_hdr_dict["wavelength"][2 * i]) + float(out_hdr_dict["wavelength"][2 * i + 1])) / 2
|
||
wave.append(str(round(tmp, 3)))
|
||
out_hdr_dict["wavelength"] = wave
|
||
|
||
gain_name = os.path.join(self.out_file_path, "gain.bip")
|
||
offset_name = os.path.join(self.out_file_path, "offset.bip")
|
||
self.write_img(gain_name, gain)
|
||
self.process_hdr(gain_name, out_hdr_dict)
|
||
self.write_img(offset_name, dark)
|
||
self.process_hdr(offset_name, out_hdr_dict)
|
||
|
||
if self.spatialbin == 2 and self.spectralbin == 2:
|
||
samples = int(in_hdr_dict["samples"])
|
||
|
||
if int(in_hdr_dict["bands"]) % 2 == 0:
|
||
bands = int(int(in_hdr_dict["bands"]) / 2)
|
||
else:
|
||
print("Bands is not divisible by 2.")
|
||
return
|
||
# # 空间bin
|
||
# gain = np.empty((300, 1, 688))
|
||
# for i in range(gain.shape[2]):
|
||
# if i <= 683:
|
||
# gain[:, :, i] = img_data[:, :, i*2] # 丢弃
|
||
# else:
|
||
# gain[:, :, i] = img_data[:, :, 683] # 重复
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
|
||
# 实例化主窗口
|
||
enter_window_instance = EnterWindow()
|
||
enter_window_instance.show()
|
||
|
||
sys.exit(app.exec_())
|