Files
WQ_GUI/降采样光谱.py

122 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import spectral
import spectral.io.envi as envi
def downsample_bands_extract(data, factor=3, offset=0):
"""抽取降采样:每 factor 个波段取第 (offset+1) 个波段"""
rows, cols, bands = data.shape
new_bands = bands // factor
indices = [offset + i * factor for i in range(new_bands)]
# 边界保护
indices = [idx for idx in indices if idx < bands]
return data[:, :, indices].astype(np.float32)
def process_bsq_chunked(input_path, output_path, scale=10000, factor=3, offset=0, chunk_lines=500):
"""
分块处理,抽取降采样,正确写入 BSQ 格式
BSQ 格式:每个波段的所有行数据连续存储
"""
img = spectral.open_image(input_path)
hdr = img.metadata.copy()
n_rows, n_cols, n_bands = img.nrows, img.ncols, img.nbands
new_bands = n_bands // factor # 抽取后的波段数
new_hdr = hdr.copy()
new_hdr['samples'] = n_cols
new_hdr['lines'] = n_rows
new_hdr['bands'] = new_bands
new_hdr['data type'] = 12
new_hdr['interleave'] = 'bsq'
# 波长抽取(优先使用原始 header 中的波长)
if 'wavelength' in hdr:
waves = np.array(hdr['wavelength'])
if len(waves) >= new_bands * factor:
extracted_waves = waves[offset::factor][:new_bands]
new_hdr['wavelength'] = extracted_waves.tolist()
else:
print("警告: 原始波长数量不足,跳过")
# FWHM 处理
if 'fwhm' in hdr:
fwhm = np.array(hdr['fwhm'])
if len(fwhm) >= new_bands * factor:
new_hdr['fwhm'] = fwhm[offset::factor][:new_bands].tolist()
out_file = output_path + '.bsq'
# 关键修改BSQ 格式要求每个波段的所有行连续存储
# 先处理所有数据到内存缓冲区(按波段组织),再写入文件
# 对于大文件,我们按波段分批处理
print(f"开始处理 {n_rows} 行 x {n_cols} 列 x {new_bands} 波段...")
# 为每个波段创建一个临时文件(避免内存溢出)
import tempfile
import os
temp_files = []
temp_band_data = []
try:
# 初始化每个波段的临时文件
for b in range(new_bands):
fd, temp_path = tempfile.mkstemp(suffix=f'_band_{b}.tmp')
os.close(fd)
temp_files.append(temp_path)
temp_band_data.append(open(temp_path, 'wb'))
# 第一遍:分块读取,按波段写入临时文件
for start_row in range(0, n_rows, chunk_lines):
end_row = min(start_row + chunk_lines, n_rows)
print(f"处理行 {start_row}-{end_row-1}...")
chunk = img.read_subregion((start_row, end_row), (0, n_cols))
chunk_down = downsample_bands_extract(chunk, factor, offset)
chunk_scaled = chunk_down * scale
chunk_uint16 = np.clip(chunk_scaled, 0, 65535).astype(np.uint16)
# 将每个波段的数据写入对应的临时文件
for b in range(new_bands):
band_data = chunk_uint16[:, :, b].tobytes()
temp_band_data[b].write(band_data)
del chunk, chunk_down, chunk_scaled, chunk_uint16
# 关闭所有临时文件
for f in temp_band_data:
f.close()
# 第二遍:按 BSQ 格式合并波段0全部数据 → 波段1全部数据 → ...
print("合并为 BSQ 格式...")
with open(out_file, 'wb') as fout:
for b in range(new_bands):
with open(temp_files[b], 'rb') as fband:
# 读取该波段的全部行数据并写入最终文件
while True:
data = fband.read(1024 * 1024) # 1MB 块读取
if not data:
break
fout.write(data)
print(f" 波段 {b+1}/{new_bands} 写入完成")
print(f"BSQ 文件写入完成: {out_file}")
finally:
# 清理临时文件
for temp_path in temp_files:
try:
os.remove(temp_path)
except:
pass
# 写入头文件
envi.write_envi_header(output_path + '.hdr', new_hdr)
print(f"完成!输出: {out_file}")
print(f" 尺寸: {n_rows} 行 x {n_cols} 列 x {new_bands} 波段")
if __name__ == '__main__':
input_base = r"D:\BaiduNetdiskDownload\yaobao\caijain.hdr"
output_base = r"D:\BaiduNetdiskDownload\yaobao\test"
process_bsq_chunked(input_base, output_base, scale=10000, factor=3, offset=0, chunk_lines=2000)