上传文件至 /

This commit is contained in:
2025-07-22 10:27:31 +08:00
commit bd82850ce4
5 changed files with 658 additions and 0 deletions

238
exportDSData2csv.py Normal file
View File

@ -0,0 +1,238 @@
import struct
import pandas as pd
import csv
import argparse
import sys
def parse_metadata(hex_data):
"""
解析元数据部分前112个字符
元数据结构:
type (1字节) : 位置 0-1
direction (1字节) : 位置 2-3
tuigan_stat (1字节) : 位置 4-5
year (1字节) : 位置 6-7
month (1字节) : 位置 8-9
day (1字节) : 位置 10-11
hour (1字节) : 位置 12-13
minute (1字节) : 位置 14-15
second (1字节) : 位置 16-17
NCa (1字节) : 位置 18-19
Ncb (1字节) : 位置 20-21
NCC (1字节) : 位置 22-23
shutter_time (4字节): 位置 24-31
index (8字节) : 位置 32-47
temperature (32字节): 位置 48-111 (8个float每个4字节)
"""
# 验证元数据长度
if len(hex_data) < 112:
raise ValueError(f"元数据长度不足112个字符实际长度: {len(hex_data)}")
# 解析单字节整数 (u_int8_t)
def parse_u8(hex_str):
return int(hex_str, 16)
# 解析多字节整数 (小端序)
def parse_u32(hex_str):
return struct.unpack('<I', bytes.fromhex(hex_str))[0]
def parse_u64(hex_str):
return struct.unpack('<Q', bytes.fromhex(hex_str))[0]
# 解析浮点数 (小端序)
def parse_float(hex_str):
return struct.unpack('<f', bytes.fromhex(hex_str))[0]
# 按位置解析各个字段
metadata = {
'type': parse_u8(hex_data[0:2]),
'direction': parse_u8(hex_data[2:4]),
'tuigan_stat': parse_u8(hex_data[4:6]),
'year': parse_u8(hex_data[6:8]),
'month': parse_u8(hex_data[8:10]),
'day': parse_u8(hex_data[10:12]),
'hour': parse_u8(hex_data[12:14]),
'minute': parse_u8(hex_data[14:16]),
'second': parse_u8(hex_data[16:18]),
'NCa': parse_u8(hex_data[18:20]),
'Ncb': parse_u8(hex_data[20:22]),
'NCC': parse_u8(hex_data[22:24]),
'shutter_time': parse_u32(hex_data[24:32]), # 4字节
'index': parse_u64(hex_data[32:48]), # 8字节
'temperature': []
}
# 解析8个温度值 (每个4字节)
for i in range(8):
start = 48 + i * 8
end = start + 8
temp_val = parse_float(hex_data[start:end])
metadata['temperature'].append(temp_val)
return metadata
def parse_spectral_data(hex_data):
"""解析光谱数据部分 (2048个浮点数)"""
if len(hex_data) < 16384:
raise ValueError(f"光谱数据长度不足16384个字符实际长度: {len(hex_data)}")
spectral = []
for i in range(2048):
start = i * 8
end = start + 8
chunk = hex_data[start:end]
try:
byte_data = bytes.fromhex(chunk)
float_val = struct.unpack('<f', byte_data)[0]
spectral.append(float_val)
except Exception as e:
spectral.append(f"Error: {str(e)}")
return spectral
def parse_full_line(hex_line, row_index):
"""
解析完整的一行数据
格式: 元数据(112字符) + 光谱数据(16384字符) = 16496字符
"""
hex_line = hex_line.strip()
# 验证总长度
if len(hex_line) != 16496:
raise ValueError(f"{row_index + 1}: 数据长度应为16496字符实际为{len(hex_line)}")
try:
# 解析元数据 (前112字符)
metadata = parse_metadata(hex_line[:112])
# 解析光谱数据 (后16384字符)
spectral = parse_spectral_data(hex_line[112:112 + 16384])
# 创建结果字典
result = {
'type': metadata['type'],
'direction': metadata['direction'],
'year': metadata['year'],
'month': metadata['month'],
'day': metadata['day'],
'hour': metadata['hour'],
'minute': metadata['minute'],
'second': metadata['second'],
'shutter_time': metadata['shutter_time'],
'index': metadata['index'],
}
# 添加温度列
for i, temp in enumerate(metadata['temperature']):
result[f'temp_{i}'] = temp
# 添加光谱列
for i, spec in enumerate(spectral):
result[f'spec_{i}'] = spec
return result
except Exception as e:
# 返回错误信息
return {
'row_index': row_index + 1,
'error': f"解析失败: {str(e)}"
}
def calculate_wavelengths(a1, a2, a3, a4):
"""计算2048个通道的波长值"""
wavelengths = []
for x in range(2048):
wl = ((a1 * x + a2) * x + a3) * x + a4
wavelengths.append(round(wl, 2)) # 四舍五入到小数点后两位
return wavelengths
def process_csv_to_csv(input_csv_path, output_csv_path):
"""
处理CSV文件并将解析结果保存为CSV
"""
try:
# 读取CSV文件跳过第一行标题行
df = pd.read_csv(input_csv_path, sep=',', encoding='utf-8', skiprows=1)
df = df.dropna(subset=['DL_Data_Total'])
df.columns = df.columns.str.strip()
if "DL_Data_Total" not in df.columns:
raise ValueError("找不到列DL_Data_Total")
# 检查必需的系数列
required_coeffs = ['a1_dec', 'a2_dec', 'a3_dec', 'a4_dec']
missing = [col for col in required_coeffs if col not in df.columns]
if missing:
raise ValueError(f"缺少必需的系数列: {', '.join(missing)}")
# 从第三行第一个数据行索引位置2获取系数
coeff_row = df.iloc[2] # 第三行对应索引2
a1 = coeff_row['a1_dec']
a2 = coeff_row['a2_dec']
a3 = coeff_row['a3_dec']
a4 = coeff_row['a4_dec']
# 计算所有通道的波长
wavelengths = calculate_wavelengths(a1, a2, a3, a4)
hex_column = df["DL_Data_Total"].dropna()
# 准备结果列表
results = []
# 处理每一行数据(从第三行开始)
for idx in range(2, len(hex_column)):
hex_str = str(hex_column.iloc[idx])
result = parse_full_line(hex_str, idx)
results.append(result)
# 转换为DataFrame
result_df = pd.DataFrame(results)
# 添加时间戳
if "timestamp" in df.columns:
timestamp = df["timestamp"].iloc[2:len(results) + 2].reset_index(drop=True)
result_df.insert(0, "timestamp", timestamp)
else:
print("警告: 未找到timestamp列跳过时间戳添加")
# 重命名光谱列用波长值替换spec_X
spec_columns = [col for col in result_df.columns if col.startswith('spec_')]
if len(spec_columns) != 2048:
print(f"警告: 找到 {len(spec_columns)} 个光谱列但需要2048列")
else:
# 按通道索引排序
spec_columns_sorted = sorted(spec_columns, key=lambda x: int(x.split('_')[1]))
# 创建列名映射 {旧列名: 新列名}
column_mapping = {old: f"{wl}" for old, wl in zip(spec_columns_sorted, wavelengths)}
result_df.rename(columns=column_mapping, inplace=True)
# 保存为CSV
result_df.to_csv(output_csv_path, index=False)
print(f"转换完成,结果已保存到:{output_csv_path}")
return result_df
except Exception as e:
print(f"处理过程中发生错误: {str(e)}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='解析高光谱CSV数据')
parser.add_argument('--input', required=True, help='输入CSV文件路径')
parser.add_argument('--output', required=True, help='输出CSV文件路径')
args = parser.parse_args()
print(f"开始处理: {args.input}")
process_csv_to_csv(args.input, args.output)
if __name__ == "__main__":
main()