Files
hyperspectral-hongshengrese…/exportDSData2csv.py
2025-07-22 10:27:31 +08:00

238 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import pandas as pd
import csv
import argparse
import sys
def parse_metadata(hex_data):
"""
解析元数据部分前112个字符
元数据结构:
type (1字节) : 位置 0-1
direction (1字节) : 位置 2-3
tuigan_stat (1字节) : 位置 4-5
year (1字节) : 位置 6-7
month (1字节) : 位置 8-9
day (1字节) : 位置 10-11
hour (1字节) : 位置 12-13
minute (1字节) : 位置 14-15
second (1字节) : 位置 16-17
NCa (1字节) : 位置 18-19
Ncb (1字节) : 位置 20-21
NCC (1字节) : 位置 22-23
shutter_time (4字节): 位置 24-31
index (8字节) : 位置 32-47
temperature (32字节): 位置 48-111 (8个float每个4字节)
"""
# 验证元数据长度
if len(hex_data) < 112:
raise ValueError(f"元数据长度不足112个字符实际长度: {len(hex_data)}")
# 解析单字节整数 (u_int8_t)
def parse_u8(hex_str):
return int(hex_str, 16)
# 解析多字节整数 (小端序)
def parse_u32(hex_str):
return struct.unpack('<I', bytes.fromhex(hex_str))[0]
def parse_u64(hex_str):
return struct.unpack('<Q', bytes.fromhex(hex_str))[0]
# 解析浮点数 (小端序)
def parse_float(hex_str):
return struct.unpack('<f', bytes.fromhex(hex_str))[0]
# 按位置解析各个字段
metadata = {
'type': parse_u8(hex_data[0:2]),
'direction': parse_u8(hex_data[2:4]),
'tuigan_stat': parse_u8(hex_data[4:6]),
'year': parse_u8(hex_data[6:8]),
'month': parse_u8(hex_data[8:10]),
'day': parse_u8(hex_data[10:12]),
'hour': parse_u8(hex_data[12:14]),
'minute': parse_u8(hex_data[14:16]),
'second': parse_u8(hex_data[16:18]),
'NCa': parse_u8(hex_data[18:20]),
'Ncb': parse_u8(hex_data[20:22]),
'NCC': parse_u8(hex_data[22:24]),
'shutter_time': parse_u32(hex_data[24:32]), # 4字节
'index': parse_u64(hex_data[32:48]), # 8字节
'temperature': []
}
# 解析8个温度值 (每个4字节)
for i in range(8):
start = 48 + i * 8
end = start + 8
temp_val = parse_float(hex_data[start:end])
metadata['temperature'].append(temp_val)
return metadata
def parse_spectral_data(hex_data):
"""解析光谱数据部分 (2048个浮点数)"""
if len(hex_data) < 16384:
raise ValueError(f"光谱数据长度不足16384个字符实际长度: {len(hex_data)}")
spectral = []
for i in range(2048):
start = i * 8
end = start + 8
chunk = hex_data[start:end]
try:
byte_data = bytes.fromhex(chunk)
float_val = struct.unpack('<f', byte_data)[0]
spectral.append(float_val)
except Exception as e:
spectral.append(f"Error: {str(e)}")
return spectral
def parse_full_line(hex_line, row_index):
"""
解析完整的一行数据
格式: 元数据(112字符) + 光谱数据(16384字符) = 16496字符
"""
hex_line = hex_line.strip()
# 验证总长度
if len(hex_line) != 16496:
raise ValueError(f"{row_index + 1}: 数据长度应为16496字符实际为{len(hex_line)}")
try:
# 解析元数据 (前112字符)
metadata = parse_metadata(hex_line[:112])
# 解析光谱数据 (后16384字符)
spectral = parse_spectral_data(hex_line[112:112 + 16384])
# 创建结果字典
result = {
'type': metadata['type'],
'direction': metadata['direction'],
'year': metadata['year'],
'month': metadata['month'],
'day': metadata['day'],
'hour': metadata['hour'],
'minute': metadata['minute'],
'second': metadata['second'],
'shutter_time': metadata['shutter_time'],
'index': metadata['index'],
}
# 添加温度列
for i, temp in enumerate(metadata['temperature']):
result[f'temp_{i}'] = temp
# 添加光谱列
for i, spec in enumerate(spectral):
result[f'spec_{i}'] = spec
return result
except Exception as e:
# 返回错误信息
return {
'row_index': row_index + 1,
'error': f"解析失败: {str(e)}"
}
def calculate_wavelengths(a1, a2, a3, a4):
"""计算2048个通道的波长值"""
wavelengths = []
for x in range(2048):
wl = ((a1 * x + a2) * x + a3) * x + a4
wavelengths.append(round(wl, 2)) # 四舍五入到小数点后两位
return wavelengths
def process_csv_to_csv(input_csv_path, output_csv_path):
"""
处理CSV文件并将解析结果保存为CSV
"""
try:
# 读取CSV文件跳过第一行标题行
df = pd.read_csv(input_csv_path, sep=',', encoding='utf-8', skiprows=1)
df = df.dropna(subset=['DL_Data_Total'])
df.columns = df.columns.str.strip()
if "DL_Data_Total" not in df.columns:
raise ValueError("找不到列DL_Data_Total")
# 检查必需的系数列
required_coeffs = ['a1_dec', 'a2_dec', 'a3_dec', 'a4_dec']
missing = [col for col in required_coeffs if col not in df.columns]
if missing:
raise ValueError(f"缺少必需的系数列: {', '.join(missing)}")
# 从第三行第一个数据行索引位置2获取系数
coeff_row = df.iloc[2] # 第三行对应索引2
a1 = coeff_row['a1_dec']
a2 = coeff_row['a2_dec']
a3 = coeff_row['a3_dec']
a4 = coeff_row['a4_dec']
# 计算所有通道的波长
wavelengths = calculate_wavelengths(a1, a2, a3, a4)
hex_column = df["DL_Data_Total"].dropna()
# 准备结果列表
results = []
# 处理每一行数据(从第三行开始)
for idx in range(2, len(hex_column)):
hex_str = str(hex_column.iloc[idx])
result = parse_full_line(hex_str, idx)
results.append(result)
# 转换为DataFrame
result_df = pd.DataFrame(results)
# 添加时间戳
if "timestamp" in df.columns:
timestamp = df["timestamp"].iloc[2:len(results) + 2].reset_index(drop=True)
result_df.insert(0, "timestamp", timestamp)
else:
print("警告: 未找到timestamp列跳过时间戳添加")
# 重命名光谱列用波长值替换spec_X
spec_columns = [col for col in result_df.columns if col.startswith('spec_')]
if len(spec_columns) != 2048:
print(f"警告: 找到 {len(spec_columns)} 个光谱列但需要2048列")
else:
# 按通道索引排序
spec_columns_sorted = sorted(spec_columns, key=lambda x: int(x.split('_')[1]))
# 创建列名映射 {旧列名: 新列名}
column_mapping = {old: f"{wl}" for old, wl in zip(spec_columns_sorted, wavelengths)}
result_df.rename(columns=column_mapping, inplace=True)
# 保存为CSV
result_df.to_csv(output_csv_path, index=False)
print(f"转换完成,结果已保存到:{output_csv_path}")
return result_df
except Exception as e:
print(f"处理过程中发生错误: {str(e)}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='解析高光谱CSV数据')
parser.add_argument('--input', required=True, help='输入CSV文件路径')
parser.add_argument('--output', required=True, help='输出CSV文件路径')
args = parser.parse_args()
print(f"开始处理: {args.input}")
process_csv_to_csv(args.input, args.output)
if __name__ == "__main__":
main()