Files
corning410_radiance_calibra…/1.1radiance_calibration_300tc.py

434 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
1、此版本所需文件dn影像、dark影像和asd测定的积分球辐亮度曲线
2、处理过程1扣除暗电流2通过辐亮度曲线生成定标文件
'''
import numpy as np
import pandas as pd
import sys, os, traceback, re
import shutil
from radiance_calibration_ui import Ui_MainWindow
from library.multithread import Worker
from library.image_reader_writer import ImageReaderWriter
import sys, traceback
from osgeo import gdal
from scipy.interpolate import interp1d
import spectral
import time
import copy
from PyQt5.QtWidgets import QMainWindow, QFileDialog, QApplication
from PyQt5.QtCore import Qt
class EnterWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
super(EnterWindow, self).__init__(parent)
self.setupUi(self)
self.radiance_calibration_object = RadianceCalibration()
self.changeBinForCorning_object = ChangeBinForCorning()
# self.setWindowState(Qt.WindowMaximized) # 初始化时就最大化窗口
self.rad_bt.clicked.connect(self.select_rad)
self.dn_bt.clicked.connect(self.select_dn)
self.dark_bt.clicked.connect(self.select_dark)
self.out_file_bt.clicked.connect(self.select_out_file)
self.calfile_bin1_bt.clicked.connect(self.select_calfile_changebin)
self.out_calfile_bt.clicked.connect(self.select_out_file_changebin)
self.operate_bt.clicked.connect(self.operate)
self.dark_checkBox.stateChanged.connect(self.disabledDark)
self.tmp_rad_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
self.tmp_dn_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
self.tmp_dark_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
self.tmp_out_file_path = None # 用于保存当前路径,下次打开就默认在此路径下
self.spatialbin_comboBox.addItem("1")
# self.spatialbin_comboBox.addItem("2")
# self.spectral_comboBox.addItem("1")
self.spectral_comboBox.addItem("2")
self.change_bin_operate_bt.clicked.connect(self.operate_changebin)
def select_rad(self):
if self.tmp_rad_file_path == None:
rad_file_path = QFileDialog.getOpenFileName(self, '选择asd辐亮度文件', os.path.dirname(__file__))[0]
self.tmp_rad_file_path = os.path.dirname(rad_file_path)
elif self.tmp_rad_file_path is not None:
rad_file_path = QFileDialog.getOpenFileName(self, '选择asd辐亮度文件', self.tmp_rad_file_path)[0]
self.tmp_rad_file_path = os.path.dirname(rad_file_path)
self.radiance_calibration_object.asd_radiance_file_path = rad_file_path
self.rad_le.setText(rad_file_path)
def select_dn(self):
if self.tmp_dn_file_path == None:
dn_file_path = QFileDialog.getOpenFileName(self, '选择dn影像', os.path.dirname(__file__))[0]
self.tmp_dn_file_path = os.path.dirname(dn_file_path)
elif self.tmp_dn_file_path is not None:
dn_file_path = QFileDialog.getOpenFileName(self, '选择dn影像', self.tmp_dn_file_path)[0]
self.tmp_dn_file_path = os.path.dirname(dn_file_path)
self.radiance_calibration_object.dn_file_path = dn_file_path
self.dn_le.setText(dn_file_path)
def select_dark(self):
if self.tmp_dark_file_path == None:
dark_file_path = QFileDialog.getOpenFileName(self, '选择dark影像', os.path.dirname(__file__))[0]
self.tmp_dark_file_path = os.path.dirname(dark_file_path)
elif self.tmp_dark_file_path is not None:
dark_file_path = QFileDialog.getOpenFileName(self, '选择dark影像', self.tmp_dark_file_path)[0]
self.tmp_dark_file_path = os.path.dirname(dark_file_path)
self.radiance_calibration_object.dark_file_path = dark_file_path
self.dark_le.setText(dark_file_path)
def select_out_file(self):
if self.tmp_out_file_path == None:
out_file_path = QFileDialog.getExistingDirectory(self, '选择输出路径', os.path.dirname(__file__))
self.tmp_out_file_path = out_file_path
elif self.tmp_out_file_path is not None:
out_file_path = QFileDialog.getExistingDirectory(self, '选择输出路径', self.tmp_out_file_path)
self.tmp_out_file_path = out_file_path
self.radiance_calibration_object.out_file_path = out_file_path
self.out_file_le.setText(out_file_path)
def select_calfile_changebin(self):
tmp = QFileDialog.getExistingDirectory(self, 'bin1定标文件夹', os.path.dirname(__file__))
self.changeBinForCorning_object.calfile_path = tmp
self.calfile_bin1_le.setText(tmp)
def select_out_file_changebin(self):
tmp = QFileDialog.getExistingDirectory(self, '选择输出路径', os.path.dirname(__file__))
self.changeBinForCorning_object.out_file_path = tmp
self.out_calfile_changebin_le.setText(tmp)
def operate(self):
if(self.dark_checkBox.isChecked()):
self.radiance_calibration_object.operate()
else:
self.radiance_calibration_object.operate_without_dark()
def operate_changebin(self):
self.changeBinForCorning_object.spatialbin = int(self.spatialbin_comboBox.currentText())
self.changeBinForCorning_object.spectralbin = int(self.spectral_comboBox.currentText())
self.changeBinForCorning_object.changebin()
def disabledDark(self, isEnable):
self.dark_le.setEnabled(isEnable)
self.dark_bt.setEnabled(isEnable)
class RadianceCalibration():
def __init__(self):
# asd_radiance_file = os.path.dirname(__file__) + '\\corning_radance.txt'
# img_file_path = os.path.dirname(__file__) + '\\jfq_dn'
# img_dark_file_path = os.path.dirname(__file__) + '\\jfq_dn_dark'
# out_file = os.path.dirname(__file__) + '\\jfq_dn_gain'
self.asd_radiance_file_path = None
self.dn_file_path = None
self.dark_file_path = None
self.img_data = None
self.img_dark_data = None
self.out_file_path = None
def get_envi_header_dict(self, hdr_file):
with open(hdr_file, encoding='utf-8') as file_obj:
contents = file_obj.read()
# Get all "key = {val}" type matches
regex = re.compile(r'^(.+?)\s*=\s*({\s*.*?\n*.*?})$', re.M | re.I)
matches = regex.findall(contents)
# Remove them from the header
subhdr = regex.sub('', contents)
# Get all "key = val" type matches
regex = re.compile(r'^(.+?)\s*=\s*(.*?)$', re.M | re.I)
matches.extend(regex.findall(subhdr))
hdr_fields = dict(matches)
keys_delete = []
for key in hdr_fields.keys():
if (
key == "interleave" or key == "byte order" or key == "data type" or key == "samples" or key == "bands" or key == "lines" or key == "framerate"):
keys_delete.append(key)
for x in range(len(keys_delete)):
hdr_fields.pop(keys_delete[x])
return hdr_fields
def get_hdr_filename(self, file_path):
return os.path.splitext(file_path)[0] + ".hdr"
def get_hdr_filename_with_bip(self, file_path):
return os.path.splitext(file_path)[0] + ".bip.hdr"
def write_fields_to_hdrfile(self, fields, hdr_file):
header_tmp = spectral.envi.read_envi_header(hdr_file)
fields['generated time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
with open(hdr_file, "a", encoding='utf-8') as f:
for key in fields.keys():
if key in header_tmp or key == "description":
continue
if type(fields[key]) == list:
f.write(key + " = {" + ", ".join(fields[key]) + "}\n")
else:
f.write(key + " = " + fields[key] + "\n")
def process_hdr(self, hdr_file_path, envi_header_dict):
self.write_fields_to_hdrfile(envi_header_dict, self.get_hdr_filename(hdr_file_path))
shutil.copyfile(self.get_hdr_filename(hdr_file_path), self.get_hdr_filename_with_bip(hdr_file_path))
os.remove(self.get_hdr_filename(hdr_file_path))
# 读图像文件
def read_img(self, filename, xoff=0, yoff=0, im_width=None, im_height=None):
try:
dataset = gdal.Open(filename) # 打开文件
if im_width == None:
im_width = dataset.RasterXSize # 栅格矩阵的列数
if im_height == None:
im_height = dataset.RasterYSize # 栅格矩阵的行数
num_bands = dataset.RasterCount # 栅格矩阵的波段数
im_geotrans = dataset.GetGeoTransform() # 仿射矩阵
im_proj = dataset.GetProjection() # 地图投影信息
im_data = dataset.ReadAsArray(xoff, yoff, im_width, im_height) # 将数据写成数组,对应栅格矩阵
del dataset
return im_proj, im_geotrans, im_data
except AttributeError:
print('gdal打开影像没有文件')
except Exception:
traceback.print_exc()
pass
# 写文件以写成tif为例
def write_img(self, dst_filename, data):
format = "ENVI"
driver = gdal.GetDriverByName(format)
RasterXSize = data.shape[2] # 遥感影像的sample列数
RasterYSize = data.shape[1] # 遥感影像的line行数
band = data.shape[0]
# driver.Create()函数中RasterXSize代表影像的sample列数RasterYSize代表影像的line行数
dst_ds = driver.Create(dst_filename, RasterXSize, RasterYSize, band, gdal.GDT_Float64,
options=["INTERLEAVE=BIP"])
for i in range(band):
dst_ds.GetRasterBand(i + 1).WriteArray(data[i, :, :]) # gdal的band从1开始所以dst_ds.GetRasterBand(i+1)
dst_ds = None
def operate(self):
try:
# 读取asd辐亮度数据
data = pd.read_csv(self.asd_radiance_file_path, sep='\t', dtype=np.float64, header=None)
self.asd_radiance = np.array(data)
# 读取影像
img_proj, img_geotrans, self.img_data = self.read_img(self.dn_file_path)
# dn_hdr = self.get_envi_header_dict(self.get_hdr_filename(self.dn_file_path))
dn_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dn_file_path))
img_dark_proj, img_dark_geotrans, self.img_dark_data = self.read_img(self.dark_file_path)
# dark_hdr = self.get_envi_header_dict(self.get_hdr_filename(self.dark_file_path))
dark_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dark_file_path))
# 重采样
f = interp1d(self.asd_radiance[:, 0], self.asd_radiance[:, 1])
# wave_destination = dn_hdr["wavelength"].removeprefix("{").removesuffix("}").split(",")
asd_radiance_interpolated = [f(float(i)) for i in dn_hdr["wavelength"]]
asd_radiance_interpolated2 = np.array(asd_radiance_interpolated)
# notuse = np.zeros((asd_radiance_interpolated2.shape[0], 2))
# notuse[:, 0] = np.array([float(i) for i in dn_hdr["wavelength"]])
# notuse[:, 1] =asd_radiance_interpolated2
# np.savetxt(os.path.join(self.out_file_path, "asd_radiance_interpolated.txt"), notuse)
# 将影像所有行平均,得到一行(帧)影像
img_data_ave = np.mean(self.img_data, axis=1)
img_dark_data_ave = np.mean(self.img_dark_data, axis=1)
# 去除暗电流
img_data_ave_rmdark = img_data_ave - img_dark_data_ave
gain = np.empty((img_data_ave_rmdark.shape[0], 1, img_data_ave_rmdark.shape[1]))
offset = np.empty((img_data_ave_rmdark.shape[0], 1, img_data_ave_rmdark.shape[1]))
for i in range(gain.shape[2]):
gain[:, 0, i] = asd_radiance_interpolated2 / img_data_ave_rmdark[:, i]
offset[:, 0, i] = img_dark_data_ave[:, i]
gain_name = os.path.join(self.out_file_path, "gain.bip")
offset_name = os.path.join(self.out_file_path, "offset.bip")
self.write_img(gain_name, gain)
self.process_hdr(gain_name, dn_hdr)
self.write_img(offset_name, offset)
self.process_hdr(offset_name, dark_hdr)
except:
traceback.print_exc()
def operate_without_dark(self):
try:
# 读取asd辐亮度数据
data = pd.read_csv(self.asd_radiance_file_path, sep='\t', dtype=np.float64, header=None)
self.asd_radiance = np.array(data)
# 读取影像
img_proj, img_geotrans, self.img_data = self.read_img(self.dn_file_path)
dn_hdr = spectral.envi.read_envi_header(self.get_hdr_filename(self.dn_file_path))
# 重采样
f = interp1d(self.asd_radiance[:, 0], self.asd_radiance[:, 1])
# wave_destination = dn_hdr["wavelength"].removeprefix("{").removesuffix("}").split(",")
asd_radiance_interpolated = [f(float(i)) for i in dn_hdr["wavelength"]]
asd_radiance_interpolated2 = np.array(asd_radiance_interpolated)
# 将影像所有行平均,得到一行(帧)影像
img_data_ave = np.mean(self.img_data, axis=1)
gain = np.empty((img_data_ave.shape[0], 1, img_data_ave.shape[1]))
for i in range(gain.shape[2]):
gain[:, 0, i] = asd_radiance_interpolated2 / img_data_ave[:, i]
gain_name = os.path.join(self.out_file_path, "gain.bip")
ImageReaderWriter.write_img(gain_name, gain)
# self.write_img(gain_name, gain)
except:
traceback.print_exc()
class ChangeBinForCorning():
def __init__(self):
self.calfile_path = None
self.out_file_path = None
self.spatialbin = None
self.spectralbin = None
def get_hdr_filename(self, file_path):
return os.path.splitext(file_path)[0] + ".hdr"
def get_hdr_filename_with_bip(self, file_path):
return os.path.splitext(file_path)[0] + ".bip.hdr"
def write_img(self, dst_filename, data):
format = "ENVI"
driver = gdal.GetDriverByName(format)
RasterXSize = data.shape[2] # 遥感影像的sample列数
RasterYSize = data.shape[1] # 遥感影像的line行数
band = data.shape[0]
# driver.Create()函数中RasterXSize代表影像的sample列数RasterYSize代表影像的line行数
dst_ds = driver.Create(dst_filename, RasterXSize, RasterYSize, band, gdal.GDT_Float64,
options=["INTERLEAVE=BIP"])
for i in range(band):
dst_ds.GetRasterBand(i + 1).WriteArray(data[i, :, :]) # gdal的band从1开始所以dst_ds.GetRasterBand(i+1)
dst_ds = None
def write_fields_to_hdrfile(self, fields, hdr_file):
header_tmp = spectral.envi.read_envi_header(hdr_file)
fields['generated time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
with open(hdr_file, "a", encoding='utf-8') as f:
for key in fields.keys():
if key in header_tmp or key == "description":
continue
if type(fields[key]) == list:
f.write(key + " = {" + ", ".join(fields[key]) + "}\n")
else:
f.write(key + " = " + fields[key] + "\n")
def process_hdr(self, hdr_file_path, envi_header_dict):
self.write_fields_to_hdrfile(envi_header_dict, self.get_hdr_filename(hdr_file_path))
shutil.copyfile(self.get_hdr_filename(hdr_file_path), self.get_hdr_filename_with_bip(hdr_file_path))
os.remove(self.get_hdr_filename(hdr_file_path))
def changebin(self):
gain_name_in = os.path.join(self.calfile_path, "gain.bip")
offset_name_in = os.path.join(self.calfile_path, "offset.bip")
img_proj, img_geotrans, img_data = ImageReaderWriter.read_img(gain_name_in)
img_proj1, img_geotrans1, img_dark = ImageReaderWriter.read_img(offset_name_in)
in_hdr_dict = spectral.envi.read_envi_header(self.get_hdr_filename_with_bip(gain_name_in))
if in_hdr_dict["spectral binning"] != "1" and in_hdr_dict["sample binning"] != "1":
print("输入定标文件不是bin1。")
return
if self.spatialbin == 1 and self.spectralbin == 1:
print("Bin1 is not need to convert.")
return
if self.spatialbin == 1 and self.spectralbin == 2:
samples = int(in_hdr_dict["samples"])
if int(in_hdr_dict["bands"]) % 2 == 0:
bands = int(int(in_hdr_dict["bands"]) / 2)
else:
print("Bands is not divisible by 2.")
return
out_hdr_dict = copy.deepcopy(in_hdr_dict)
out_hdr_dict["bands"] = str(bands)
out_hdr_dict["spectral binning"] = str(2)
wave = []
gain = np.empty((bands, 1, samples))
dark = np.empty((bands, 1, samples))
for i in range(gain.shape[0]):
gain[i, :, :] = (img_data[2 * i, :, :] + img_data[2 * i + 1, :, :]) / 2
dark[i, :, :] = (img_dark[2 * i, :, :] + img_dark[2 * i + 1, :, :]) / 2
tmp = (float(out_hdr_dict["wavelength"][2 * i]) + float(out_hdr_dict["wavelength"][2 * i + 1])) / 2
wave.append(str(round(tmp, 3)))
out_hdr_dict["wavelength"] = wave
gain_name = os.path.join(self.out_file_path, "gain.bip")
offset_name = os.path.join(self.out_file_path, "offset.bip")
self.write_img(gain_name, gain)
self.process_hdr(gain_name, out_hdr_dict)
self.write_img(offset_name, dark)
self.process_hdr(offset_name, out_hdr_dict)
if self.spatialbin == 2 and self.spectralbin == 2:
samples = int(in_hdr_dict["samples"])
if int(in_hdr_dict["bands"]) % 2 == 0:
bands = int(int(in_hdr_dict["bands"]) / 2)
else:
print("Bands is not divisible by 2.")
return
# # 空间bin
# gain = np.empty((300, 1, 688))
# for i in range(gain.shape[2]):
# if i <= 683:
# gain[:, :, i] = img_data[:, :, i*2] # 丢弃
# else:
# gain[:, :, i] = img_data[:, :, 683] # 重复
if __name__ == '__main__':
app = QApplication(sys.argv)
# 实例化主窗口
enter_window_instance = EnterWindow()
enter_window_instance.show()
sys.exit(app.exec_())