238 lines
8.0 KiB
Python
238 lines
8.0 KiB
Python
import struct
|
||
import pandas as pd
|
||
import csv
|
||
import argparse
|
||
import sys
|
||
|
||
|
||
def parse_metadata(hex_data):
|
||
"""
|
||
解析元数据部分(前112个字符)
|
||
元数据结构:
|
||
type (1字节) : 位置 0-1
|
||
direction (1字节) : 位置 2-3
|
||
tuigan_stat (1字节) : 位置 4-5
|
||
year (1字节) : 位置 6-7
|
||
month (1字节) : 位置 8-9
|
||
day (1字节) : 位置 10-11
|
||
hour (1字节) : 位置 12-13
|
||
minute (1字节) : 位置 14-15
|
||
second (1字节) : 位置 16-17
|
||
NCa (1字节) : 位置 18-19
|
||
Ncb (1字节) : 位置 20-21
|
||
NCC (1字节) : 位置 22-23
|
||
shutter_time (4字节): 位置 24-31
|
||
index (8字节) : 位置 32-47
|
||
temperature (32字节): 位置 48-111 (8个float,每个4字节)
|
||
"""
|
||
# 验证元数据长度
|
||
if len(hex_data) < 112:
|
||
raise ValueError(f"元数据长度不足112个字符,实际长度: {len(hex_data)}")
|
||
|
||
# 解析单字节整数 (u_int8_t)
|
||
def parse_u8(hex_str):
|
||
return int(hex_str, 16)
|
||
|
||
# 解析多字节整数 (小端序)
|
||
def parse_u32(hex_str):
|
||
return struct.unpack('<I', bytes.fromhex(hex_str))[0]
|
||
|
||
def parse_u64(hex_str):
|
||
return struct.unpack('<Q', bytes.fromhex(hex_str))[0]
|
||
|
||
# 解析浮点数 (小端序)
|
||
def parse_float(hex_str):
|
||
return struct.unpack('<f', bytes.fromhex(hex_str))[0]
|
||
|
||
# 按位置解析各个字段
|
||
metadata = {
|
||
'type': parse_u8(hex_data[0:2]),
|
||
'direction': parse_u8(hex_data[2:4]),
|
||
'tuigan_stat': parse_u8(hex_data[4:6]),
|
||
'year': parse_u8(hex_data[6:8]),
|
||
'month': parse_u8(hex_data[8:10]),
|
||
'day': parse_u8(hex_data[10:12]),
|
||
'hour': parse_u8(hex_data[12:14]),
|
||
'minute': parse_u8(hex_data[14:16]),
|
||
'second': parse_u8(hex_data[16:18]),
|
||
'NCa': parse_u8(hex_data[18:20]),
|
||
'Ncb': parse_u8(hex_data[20:22]),
|
||
'NCC': parse_u8(hex_data[22:24]),
|
||
'shutter_time': parse_u32(hex_data[24:32]), # 4字节
|
||
'index': parse_u64(hex_data[32:48]), # 8字节
|
||
'temperature': []
|
||
}
|
||
|
||
# 解析8个温度值 (每个4字节)
|
||
for i in range(8):
|
||
start = 48 + i * 8
|
||
end = start + 8
|
||
temp_val = parse_float(hex_data[start:end])
|
||
metadata['temperature'].append(temp_val)
|
||
|
||
return metadata
|
||
|
||
|
||
def parse_spectral_data(hex_data):
|
||
"""解析光谱数据部分 (2048个浮点数)"""
|
||
if len(hex_data) < 16384:
|
||
raise ValueError(f"光谱数据长度不足16384个字符,实际长度: {len(hex_data)}")
|
||
|
||
spectral = []
|
||
for i in range(2048):
|
||
start = i * 8
|
||
end = start + 8
|
||
chunk = hex_data[start:end]
|
||
try:
|
||
byte_data = bytes.fromhex(chunk)
|
||
float_val = struct.unpack('<f', byte_data)[0]
|
||
spectral.append(float_val)
|
||
except Exception as e:
|
||
spectral.append(f"Error: {str(e)}")
|
||
|
||
return spectral
|
||
|
||
|
||
def parse_full_line(hex_line, row_index):
|
||
"""
|
||
解析完整的一行数据
|
||
格式: 元数据(112字符) + 光谱数据(16384字符) = 16496字符
|
||
"""
|
||
hex_line = hex_line.strip()
|
||
|
||
# 验证总长度
|
||
if len(hex_line) != 16496:
|
||
raise ValueError(f"行 {row_index + 1}: 数据长度应为16496字符,实际为{len(hex_line)}")
|
||
|
||
try:
|
||
# 解析元数据 (前112字符)
|
||
metadata = parse_metadata(hex_line[:112])
|
||
|
||
# 解析光谱数据 (后16384字符)
|
||
spectral = parse_spectral_data(hex_line[112:112 + 16384])
|
||
|
||
# 创建结果字典
|
||
result = {
|
||
'type': metadata['type'],
|
||
'direction': metadata['direction'],
|
||
'year': metadata['year'],
|
||
'month': metadata['month'],
|
||
'day': metadata['day'],
|
||
'hour': metadata['hour'],
|
||
'minute': metadata['minute'],
|
||
'second': metadata['second'],
|
||
'shutter_time': metadata['shutter_time'],
|
||
'index': metadata['index'],
|
||
}
|
||
|
||
# 添加温度列
|
||
for i, temp in enumerate(metadata['temperature']):
|
||
result[f'temp_{i}'] = temp
|
||
|
||
# 添加光谱列
|
||
for i, spec in enumerate(spectral):
|
||
result[f'spec_{i}'] = spec
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# 返回错误信息
|
||
return {
|
||
'row_index': row_index + 1,
|
||
'error': f"解析失败: {str(e)}"
|
||
}
|
||
|
||
|
||
def calculate_wavelengths(a1, a2, a3, a4):
|
||
"""计算2048个通道的波长值"""
|
||
wavelengths = []
|
||
for x in range(2048):
|
||
wl = ((a1 * x + a2) * x + a3) * x + a4
|
||
wavelengths.append(round(wl, 2)) # 四舍五入到小数点后两位
|
||
return wavelengths
|
||
|
||
|
||
def process_csv_to_csv(input_csv_path, output_csv_path):
|
||
"""
|
||
处理CSV文件并将解析结果保存为CSV
|
||
"""
|
||
try:
|
||
# 读取CSV文件,跳过第一行标题行
|
||
df = pd.read_csv(input_csv_path, sep=',', encoding='utf-8', skiprows=1)
|
||
df = df.dropna(subset=['DL_Data_Total'])
|
||
df.columns = df.columns.str.strip()
|
||
|
||
if "DL_Data_Total" not in df.columns:
|
||
raise ValueError("找不到列:DL_Data_Total")
|
||
|
||
# 检查必需的系数列
|
||
required_coeffs = ['a1_dec', 'a2_dec', 'a3_dec', 'a4_dec']
|
||
missing = [col for col in required_coeffs if col not in df.columns]
|
||
if missing:
|
||
raise ValueError(f"缺少必需的系数列: {', '.join(missing)}")
|
||
|
||
# 从第三行(第一个数据行,索引位置2)获取系数
|
||
coeff_row = df.iloc[2] # 第三行对应索引2
|
||
a1 = coeff_row['a1_dec']
|
||
a2 = coeff_row['a2_dec']
|
||
a3 = coeff_row['a3_dec']
|
||
a4 = coeff_row['a4_dec']
|
||
|
||
# 计算所有通道的波长
|
||
wavelengths = calculate_wavelengths(a1, a2, a3, a4)
|
||
|
||
hex_column = df["DL_Data_Total"].dropna()
|
||
|
||
# 准备结果列表
|
||
results = []
|
||
|
||
# 处理每一行数据(从第三行开始)
|
||
for idx in range(2, len(hex_column)):
|
||
hex_str = str(hex_column.iloc[idx])
|
||
result = parse_full_line(hex_str, idx)
|
||
results.append(result)
|
||
|
||
# 转换为DataFrame
|
||
result_df = pd.DataFrame(results)
|
||
|
||
# 添加时间戳
|
||
if "timestamp" in df.columns:
|
||
timestamp = df["timestamp"].iloc[2:len(results) + 2].reset_index(drop=True)
|
||
result_df.insert(0, "timestamp", timestamp)
|
||
else:
|
||
print("警告: 未找到timestamp列,跳过时间戳添加")
|
||
|
||
# 重命名光谱列:用波长值替换spec_X
|
||
spec_columns = [col for col in result_df.columns if col.startswith('spec_')]
|
||
if len(spec_columns) != 2048:
|
||
print(f"警告: 找到 {len(spec_columns)} 个光谱列,但需要2048列")
|
||
else:
|
||
# 按通道索引排序
|
||
spec_columns_sorted = sorted(spec_columns, key=lambda x: int(x.split('_')[1]))
|
||
# 创建列名映射 {旧列名: 新列名}
|
||
column_mapping = {old: f"{wl}" for old, wl in zip(spec_columns_sorted, wavelengths)}
|
||
result_df.rename(columns=column_mapping, inplace=True)
|
||
|
||
# 保存为CSV
|
||
result_df.to_csv(output_csv_path, index=False)
|
||
print(f"转换完成,结果已保存到:{output_csv_path}")
|
||
return result_df
|
||
|
||
except Exception as e:
|
||
print(f"处理过程中发生错误: {str(e)}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='解析高光谱CSV数据')
|
||
parser.add_argument('--input', required=True, help='输入CSV文件路径')
|
||
parser.add_argument('--output', required=True, help='输出CSV文件路径')
|
||
|
||
args = parser.parse_args()
|
||
|
||
print(f"开始处理: {args.input}")
|
||
process_csv_to_csv(args.input, args.output)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |