
1. dataframe数据结构和numpy数据结构,通过for循环遍历气象站数据到3400行(共16000行左右)时,变得特别慢,最后通过numpy向量化解决速度问题,详细尝试过程见函数GasAnalyzer.merge_data; 2. 拼接dataframe时需要加上关键字参数ignore_index=True,否则会记录拼接前的行信息,对后面的遍历造成不利影像,例如访问第一行,会返回多个值;
643 lines
32 KiB
Python
643 lines
32 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
import os
|
||
from datetime import datetime
|
||
import time
|
||
import math
|
||
import argparse
|
||
import copy
|
||
|
||
|
||
class GasAnalyzer():
|
||
def __init__(self, folderPath):
|
||
self.folderPath = folderPath
|
||
|
||
def read_data(self):
|
||
path_list = os.listdir(self.folderPath)
|
||
self.validFiles = []
|
||
for filename in path_list:
|
||
if os.path.splitext(filename)[1] == '.txt':
|
||
self.validFiles.append(filename)
|
||
|
||
# 这两个循环可以合并优化,但不确定是否可以提高性能
|
||
names = locals()
|
||
self.rowCountsBackup = []
|
||
for i, filename in enumerate(self.validFiles):
|
||
names[f'df_{i}'] = pd.read_csv(self.folderPath + "\\" + filename, skiprows=1, sep=',')
|
||
# names[f'df_{i}'] = pd.read_csv(self.folderPath + "\\" + filename, skiprows=1, sep=',', usecols=[0, 1]) # :读取csv的部分列
|
||
self.rowCountsBackup.append(names[f'df_{i}'].shape[0])
|
||
|
||
# 将时间转换为秒,便于numpy向量化计算
|
||
names[f'df_{i}'].insert(loc=2, column='time_inseconds', value=0)
|
||
for j in range(names[f'df_{i}'].shape[0]):
|
||
# time_tmp = names[f'df_{i}'].loc[j, " Time"].strip()
|
||
time_tmp = names[f'df_{i}'].iloc[j, 1].strip()
|
||
names[f'df_{i}'].loc[j, "time_inseconds"] = self.get_time_in_seconds(time_tmp)
|
||
|
||
for x in range(len(self.rowCountsBackup)):
|
||
if(x==0):
|
||
self.df_total = names[f'df_{0}']
|
||
else:
|
||
self.df_total = pd.concat([self.df_total, names[f'df_{x}']], ignore_index=True)
|
||
# self.df_total = self.df_total.append(names[f'df_{x}'])
|
||
|
||
if(sum(self.rowCountsBackup) != self.df_total.shape[0]):
|
||
print("拼接气体分析仪数据失败!拼接前后行数不一致!")
|
||
return 1
|
||
|
||
self.data_numpy = np.array(self.df_total)
|
||
|
||
return 0
|
||
|
||
def write_data(self, debug=False):
|
||
"""
|
||
将气体分析仪数据写入到csv中
|
||
:param debug: 控制是否写入调试信息(用于查看两个数据的时间是否匹配)
|
||
:return:
|
||
"""
|
||
# for file in self.validFiles:
|
||
# tmp = os.path.join(self.folderPath, "fixed", file)
|
||
# self.df_total.to_csv(self.folderPath, index=False, sep=',')
|
||
|
||
if not debug:
|
||
del self.df_total['meteorologicalStation_newtime']
|
||
del self.df_total['time_inseconds']
|
||
tmp = os.path.join(self.folderPath, "all.csv")
|
||
|
||
self.df_total.to_csv(tmp, index=False, sep=',')
|
||
|
||
def merge_data(self, MeteorologicalStation):
|
||
"""
|
||
根据时间融合气体分析仪和气象站的数据,第一步插入空的数据列,第二步为数据列填写数据
|
||
:param MeteorologicalStation: 气象站数据,pandas的dataframe形式
|
||
:return:
|
||
"""
|
||
self.insertMeteorologicalStationMetaData()
|
||
|
||
# self.updateData_dataframe_enumerate(MeteorologicalStation)
|
||
# self.updateData_loc(MeteorologicalStation)
|
||
# self.updateData_list(MeteorologicalStation)
|
||
# self.updateData_list_time_delta(MeteorologicalStation)
|
||
# self.updateData_numpy(MeteorologicalStation.data_numpy)
|
||
self.updateData_numpy_vectorize(MeteorologicalStation.data_numpy)
|
||
|
||
def insertMeteorologicalStationMetaData(self):
|
||
"""
|
||
为气体分析仪数据(pandas的dataframe形式)插入一些列,用于保存对应的气象站数据
|
||
:return:
|
||
"""
|
||
self.df_total.insert(loc=2, column='meteorologicalStation_newtime', value=0)
|
||
self.df_total.insert(loc=len(self.df_total.columns), column='humidity', value=0)
|
||
self.df_total.insert(loc=len(self.df_total.columns), column='temperature', value=0)
|
||
self.df_total.insert(loc=len(self.df_total.columns), column='pressure', value=0)
|
||
self.df_total.insert(loc=len(self.df_total.columns), column='windDirection', value=0)
|
||
self.df_total.insert(loc=len(self.df_total.columns), column='windVelocity_x', value=0)
|
||
self.df_total.insert(loc=len(self.df_total.columns), column='windVelocity_y', value=0)
|
||
|
||
# 使用enumerate生成迭代器进行遍历pandas的dataframe的每一行
|
||
def updateData_dataframe_enumerate(self, df):
|
||
time_start = time.time() # 记录开始时间
|
||
index_last_loop = 0
|
||
delta_in_seconds_first_loop = 0 # 记录第一行完全遍历取得的最小时间差
|
||
delta_backup = []
|
||
for index_GasAnalyzer, row_GasAnalyzer in enumerate(self.df_total.itertuples()):
|
||
time_GasAnalyzer = row_GasAnalyzer[2].strip() # 是从1开始
|
||
date_GasAnalyzer = datetime.strptime(time_GasAnalyzer, '%m/%d/%Y %H:%M:%S.%f')
|
||
|
||
if(index_GasAnalyzer == 400):
|
||
sdf = 0
|
||
|
||
index_min_delta = 0
|
||
delta_in_seconds_min = 10000000
|
||
delta_in_seconds_last_loop = 0
|
||
for index_MeteorologicalStation, row_MeteorologicalStation in enumerate(df.itertuples()): # 寻找最近的时间
|
||
if index_MeteorologicalStation < index_last_loop:
|
||
continue
|
||
|
||
time_MeteorologicalStation = row_MeteorologicalStation[1] # 是从1开始
|
||
# print(time_MeteorologicalStation)
|
||
date_MeteorologicalStation = datetime.strptime(time_MeteorologicalStation, '%m/%d/%Y %H:%M:%S.%f')
|
||
delta_in_seconds_tmp = abs((date_GasAnalyzer - date_MeteorologicalStation).total_seconds())
|
||
|
||
if index_MeteorologicalStation == index_last_loop: # 第一次循环时,为delta_in_seconds_last_loop赋初值
|
||
delta_in_seconds_last_loop = delta_in_seconds_tmp
|
||
|
||
if delta_in_seconds_tmp - delta_in_seconds_last_loop > 0: #
|
||
delta_in_seconds_min = delta_in_seconds_last_loop
|
||
delta_backup.append(delta_in_seconds_min)
|
||
index_min_delta = index_MeteorologicalStation - 1
|
||
index_last_loop = index_min_delta
|
||
break
|
||
|
||
if delta_in_seconds_min > max(delta_backup)*3 and index_GasAnalyzer != 0: # 如果时间偏差太离谱,就放弃给此行写入数据
|
||
continue
|
||
|
||
# 进行数据插入
|
||
# self.df_total.loc[index_GasAnalyzer, 'meteorologicalStation_newtime'] = df.loc[index_min_delta, 0]
|
||
self.df_total.loc[index_GasAnalyzer, 'humidity'] = df.loc[index_min_delta, 1]
|
||
self.df_total.loc[index_GasAnalyzer, 'temperature'] = df.loc[index_min_delta, 2]
|
||
self.df_total.loc[index_GasAnalyzer, 'pressure'] = df.loc[index_min_delta, 5]
|
||
|
||
windDirection = self.df_total.loc[index_GasAnalyzer, 'Vehicle Heading (degrees)'] + df.loc[index_min_delta, 3]
|
||
if windDirection >= 360:
|
||
windDirection = windDirection - 360
|
||
self.df_total.loc[index_GasAnalyzer, 'windDirection'] = windDirection
|
||
|
||
windVelocity = df.loc[index_min_delta, 4]
|
||
if 0 <= windDirection < 90:
|
||
windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
if 90 <= windDirection < 180:
|
||
windDirection = windDirection - 90
|
||
windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
if 180 <= windDirection < 270:
|
||
windDirection = windDirection - 180
|
||
windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
if 270 <= windDirection < 360:
|
||
windDirection = windDirection - 270
|
||
windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
self.df_total.loc[index_GasAnalyzer, 'windVelocity_x'] = windVelocity_x
|
||
self.df_total.loc[index_GasAnalyzer, 'windVelocity_y'] = windVelocity_y
|
||
|
||
time_end = time.time() # 记录结束时间
|
||
time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
|
||
print(time_sum)
|
||
|
||
# 不使用迭代器,直接通过df.loc存取pandas的dataframe的元素
|
||
def updateData_loc(self, df):
|
||
time_start = time.time() # 记录开始时间
|
||
index_last_loop = 0
|
||
delta_backup = []
|
||
|
||
for index_GasAnalyzer in range(self.df_total.shape[0]):
|
||
time_GasAnalyzer = self.df_total.loc[index_GasAnalyzer, " Time"].strip()
|
||
date_GasAnalyzer = datetime.strptime(time_GasAnalyzer, '%m/%d/%Y %H:%M:%S.%f')
|
||
|
||
if (index_GasAnalyzer == 100):
|
||
sdf = 0
|
||
|
||
index_min_delta = 0
|
||
delta_in_seconds_min = 10000000
|
||
delta_in_seconds_last_loop = 0
|
||
for index_MeteorologicalStation in range(index_last_loop, df.shape[0]):
|
||
# if index_MeteorologicalStation < index_last_loop:
|
||
# continue
|
||
|
||
time_MeteorologicalStation = df.loc[index_MeteorologicalStation, 0]
|
||
date_MeteorologicalStation = datetime.strptime(time_MeteorologicalStation, '%m/%d/%Y %H:%M:%S.%f')
|
||
delta_in_seconds_tmp = abs((date_GasAnalyzer - date_MeteorologicalStation).total_seconds())
|
||
|
||
if index_MeteorologicalStation == index_last_loop: # 第一次循环时,为delta_in_seconds_last_loop赋初值
|
||
delta_in_seconds_last_loop = delta_in_seconds_tmp
|
||
|
||
if delta_in_seconds_tmp - delta_in_seconds_last_loop > 0: # ?????????????????????
|
||
delta_in_seconds_min = delta_in_seconds_last_loop
|
||
delta_backup.append(delta_in_seconds_min)
|
||
index_min_delta = index_MeteorologicalStation - 1
|
||
index_last_loop = index_min_delta
|
||
break
|
||
|
||
if delta_in_seconds_min > max(delta_backup)*3 and index_GasAnalyzer != 0: # 如果时间偏差太离谱,就放弃给此行写入数据
|
||
continue
|
||
|
||
# 进行数据插入
|
||
# self.df_total.loc[index_GasAnalyzer, 'meteorologicalStation_newtime'] = df.loc[index_min_delta, 0]
|
||
# self.df_total.loc[index_GasAnalyzer, 'humidity'] = df.loc[index_min_delta, 1]
|
||
# self.df_total.loc[index_GasAnalyzer, 'temperature'] = df.loc[index_min_delta, 2]
|
||
# self.df_total.loc[index_GasAnalyzer, 'pressure'] = df.loc[index_min_delta, 5]
|
||
#
|
||
# windDirection = self.df_total.loc[index_GasAnalyzer, 'Vehicle Heading (degrees)'] + df.loc[index_min_delta, 3]
|
||
# if windDirection >= 360:
|
||
# windDirection = windDirection - 360
|
||
# self.df_total.loc[index_GasAnalyzer, 'windDirection'] = windDirection
|
||
#
|
||
# windVelocity = df.loc[index_min_delta, 4]
|
||
# if 0 <= windDirection < 90:
|
||
# windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
# if 90 <= windDirection < 180:
|
||
# windDirection = windDirection - 90
|
||
# windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
# if 180 <= windDirection < 270:
|
||
# windDirection = windDirection - 180
|
||
# windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
# if 270 <= windDirection < 360:
|
||
# windDirection = windDirection - 270
|
||
# windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
# self.df_total.loc[index_GasAnalyzer, 'windVelocity_x'] = windVelocity_x
|
||
# self.df_total.loc[index_GasAnalyzer, 'windVelocity_y'] = windVelocity_y
|
||
|
||
time_end = time.time() # 记录结束时间
|
||
time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
|
||
print(time_sum)
|
||
|
||
# 获取pandas的dataframe的时间列并返回成列表
|
||
def updateData_list(self, df):
|
||
time_start = time.time() # 记录开始时间
|
||
index_last_loop = 0
|
||
delta_backup = []
|
||
|
||
time_GasAnalyzer = self.df_total[" Time"].to_list()
|
||
vehicle_Heading_GasAnalyzer = self.df_total["Vehicle Heading (degrees)"].to_list()
|
||
|
||
time_MeteorologicalStation_tmp = df[0].to_list()
|
||
time_MeteorologicalStation = copy.deepcopy(time_MeteorologicalStation_tmp)
|
||
humidity_MeteorologicalStation = df[0].to_list()
|
||
temperature_MeteorologicalStation = df[0].to_list()
|
||
windDirection_MeteorologicalStation = df[0].to_list()
|
||
windVelocity_MeteorologicalStation = df[0].to_list()
|
||
pressure_MeteorologicalStation = df[0].to_list()
|
||
|
||
for index_GasAnalyzer in range(len(time_GasAnalyzer)):
|
||
time_GasAnalyzer_tmp = time_GasAnalyzer[index_GasAnalyzer].strip()
|
||
# date_GasAnalyzer = datetime.strptime(time_GasAnalyzer_tmp, '%m/%d/%Y %H:%M:%S.%f')
|
||
|
||
if (index_GasAnalyzer == 13):
|
||
sdf = 0
|
||
|
||
index_min_delta = 0
|
||
delta_in_seconds_min = 10000000
|
||
delta_in_seconds_last_loop = 0
|
||
|
||
time_start2 = time.time() # 记录开始时间
|
||
for index_MeteorologicalStation in range(index_last_loop, len(time_MeteorologicalStation)):
|
||
# if index_MeteorologicalStation < index_last_loop:
|
||
# continue
|
||
|
||
time_MeteorologicalStation_tmp = time_MeteorologicalStation[index_MeteorologicalStation]
|
||
|
||
# date_MeteorologicalStation = datetime.strptime(time_MeteorologicalStation_tmp, '%m/%d/%Y %H:%M:%S.%f')
|
||
# delta_in_seconds_tmp = abs((date_GasAnalyzer - date_MeteorologicalStation).total_seconds())
|
||
|
||
delta_in_seconds_tmp = self.delta_in_seconds(time_GasAnalyzer_tmp, time_MeteorologicalStation_tmp)
|
||
|
||
if index_MeteorologicalStation == index_last_loop: # 第一次循环时,为delta_in_seconds_last_loop赋初值
|
||
delta_in_seconds_last_loop = delta_in_seconds_tmp
|
||
|
||
if delta_in_seconds_tmp - delta_in_seconds_last_loop > 0: # ?????????????????????
|
||
delta_in_seconds_min = delta_in_seconds_last_loop
|
||
delta_backup.append(delta_in_seconds_min)
|
||
index_min_delta = index_MeteorologicalStation - 1
|
||
index_last_loop = index_min_delta
|
||
break
|
||
time_end2 = time.time() # 记录结束时间
|
||
time_sum2 = time_end2 - time_start2 # 计算的时间差为程序的执行时间,单位为秒/s
|
||
if index_GasAnalyzer % 100 == 0:
|
||
print("第%d次内层循环用时:%d" % (index_GasAnalyzer, time_sum2))
|
||
|
||
# if delta_in_seconds_min > max(delta_backup)*3 and index_GasAnalyzer != 0: # 如果时间偏差太离谱,就放弃给此行写入数据
|
||
# continue
|
||
|
||
# 进行数据插入
|
||
# self.df_total.loc[index_GasAnalyzer, 'meteorologicalStation_newtime'] = df.loc[index_min_delta, 0] # ??????????????????????????????????????????????????????????
|
||
# self.df_total.loc[index_GasAnalyzer, 'humidity'] = df.loc[index_min_delta, 1]
|
||
# self.df_total.loc[index_GasAnalyzer, 'temperature'] = df.loc[index_min_delta, 2]
|
||
# self.df_total.loc[index_GasAnalyzer, 'pressure'] = df.loc[index_min_delta, 5]
|
||
#
|
||
# windDirection = self.df_total.loc[index_GasAnalyzer, 'Vehicle Heading (degrees)'] + df.loc[index_min_delta, 3]
|
||
# if windDirection >= 360:
|
||
# windDirection = windDirection - 360
|
||
# self.df_total.loc[index_GasAnalyzer, 'windDirection'] = windDirection
|
||
#
|
||
# windVelocity = df.loc[index_min_delta, 4]
|
||
# if 0 <= windDirection < 90:
|
||
# windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
# if 90 <= windDirection < 180:
|
||
# windDirection = windDirection - 90
|
||
# windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
# if 180 <= windDirection < 270:
|
||
# windDirection = windDirection - 180
|
||
# windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
# if 270 <= windDirection < 360:
|
||
# windDirection = windDirection - 270
|
||
# windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
# windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
# self.df_total.loc[index_GasAnalyzer, 'windVelocity_x'] = windVelocity_x
|
||
# self.df_total.loc[index_GasAnalyzer, 'windVelocity_y'] = windVelocity_y
|
||
|
||
time_end = time.time() # 记录结束时间
|
||
time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
|
||
print("外层循环用时:%d" % time_sum)
|
||
|
||
# 不使用库datetime,自己写函数计算时间差
|
||
def updateData_list_time_delta(self, df):
|
||
time_start = time.time() # 记录开始时间
|
||
index_last_loop = 0
|
||
delta_backup = []
|
||
|
||
time_GasAnalyzer = self.df_total[" Time"].to_list()
|
||
time_MeteorologicalStation = df[0].to_list()
|
||
|
||
for index_GasAnalyzer in range(len(time_GasAnalyzer)):
|
||
time_GasAnalyzer_tmp = time_GasAnalyzer[index_GasAnalyzer].strip()
|
||
# date_GasAnalyzer = datetime.strptime(time_GasAnalyzer_tmp, '%m/%d/%Y %H:%M:%S.%f')
|
||
|
||
if (index_GasAnalyzer == 13):
|
||
sdf = 0
|
||
|
||
# continue
|
||
|
||
index_min_delta = 0
|
||
delta_in_seconds_min = 10000000
|
||
delta_in_seconds_last_loop = 0
|
||
|
||
time_start2 = time.time() # 记录开始时间
|
||
for index_MeteorologicalStation in range(index_last_loop, len(time_MeteorologicalStation)):
|
||
# if index_MeteorologicalStation < index_last_loop:
|
||
# continue
|
||
|
||
# continue
|
||
|
||
time_MeteorologicalStation_tmp = time_MeteorologicalStation[index_MeteorologicalStation]
|
||
|
||
# time_start3 = time.time() # 记录开始时间
|
||
|
||
# date_MeteorologicalStation = datetime.strptime(time_MeteorologicalStation_tmp, '%m/%d/%Y %H:%M:%S.%f')
|
||
# delta_in_seconds_tmp = abs((date_GasAnalyzer - date_MeteorologicalStation).total_seconds())
|
||
delta_in_seconds_tmp = self.delta_in_seconds(time_GasAnalyzer_tmp, time_MeteorologicalStation_tmp)
|
||
|
||
# time_end3 = time.time() # 记录结束时间
|
||
# time_sum3 = time_end3 - time_start2 # 计算的时间差为程序的执行时间,单位为秒/s
|
||
# if index_GasAnalyzer % 100 == 0:
|
||
# print("第%d次内层循环时间转换用时:%d" % (index_MeteorologicalStation, time_sum3))
|
||
|
||
if index_MeteorologicalStation == index_last_loop: # 第一次循环时,为delta_in_seconds_last_loop赋初值
|
||
delta_in_seconds_last_loop = delta_in_seconds_tmp
|
||
|
||
if delta_in_seconds_tmp - delta_in_seconds_last_loop > 0: # ?????????????????????
|
||
delta_in_seconds_min = delta_in_seconds_last_loop
|
||
delta_backup.append(delta_in_seconds_min)
|
||
index_min_delta = index_MeteorologicalStation - 1
|
||
index_last_loop = index_min_delta
|
||
break
|
||
time_end2 = time.time() # 记录结束时间
|
||
time_sum2 = time_end2 - time_start2 # 计算的时间差为程序的执行时间,单位为秒/s
|
||
if index_GasAnalyzer % 100 == 0:
|
||
print("第%d次内层循环用时:%d" % (index_GasAnalyzer, time_sum2))
|
||
|
||
# if delta_in_seconds_min > max(delta_backup)*3 and index_GasAnalyzer != 0: # 如果时间偏差太离谱,就放弃给此行写入数据
|
||
# continue
|
||
|
||
# 进行数据插入
|
||
self.df_total.loc[index_GasAnalyzer, 'meteorologicalStation_newtime'] = df.loc[index_min_delta, 0]
|
||
self.df_total.loc[index_GasAnalyzer, 'humidity'] = df.loc[index_min_delta, 1]
|
||
self.df_total.loc[index_GasAnalyzer, 'temperature'] = df.loc[index_min_delta, 2]
|
||
self.df_total.loc[index_GasAnalyzer, 'pressure'] = df.loc[index_min_delta, 5]
|
||
|
||
windDirection = self.df_total.loc[index_GasAnalyzer, 'Vehicle Heading (degrees)'] + df.loc[index_min_delta, 3]
|
||
if windDirection >= 360:
|
||
windDirection = windDirection - 360
|
||
self.df_total.loc[index_GasAnalyzer, 'windDirection'] = windDirection
|
||
|
||
windVelocity = df.loc[index_min_delta, 4]
|
||
if 0 <= windDirection < 90:
|
||
windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
if 90 <= windDirection < 180:
|
||
windDirection = windDirection - 90
|
||
windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
if 180 <= windDirection < 270:
|
||
windDirection = windDirection - 180
|
||
windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
if 270 <= windDirection < 360:
|
||
windDirection = windDirection - 270
|
||
windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
self.df_total.loc[index_GasAnalyzer, 'windVelocity_x'] = windVelocity_x
|
||
self.df_total.loc[index_GasAnalyzer, 'windVelocity_y'] = windVelocity_y
|
||
|
||
time_end = time.time() # 记录结束时间
|
||
time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
|
||
print("外层循环用时:%d" % time_sum)
|
||
|
||
# 遍历时使用numpy
|
||
def updateData_numpy(self, array_numpy):
|
||
time_start = time.time() # 记录开始时间
|
||
index_last_loop = 0
|
||
delta_backup = []
|
||
|
||
for index_GasAnalyzer in range(self.data_numpy.shape[0]):
|
||
time_GasAnalyzer = self.data_numpy[index_GasAnalyzer, 1].strip()
|
||
date_GasAnalyzer = datetime.strptime(time_GasAnalyzer, '%m/%d/%Y %H:%M:%S.%f')
|
||
|
||
if (index_GasAnalyzer == 100):
|
||
sdf = 0
|
||
|
||
index_min_delta = 0
|
||
delta_in_seconds_min = 10000000
|
||
delta_in_seconds_last_loop = 0
|
||
|
||
time_start2 = time.time() # 记录开始时间
|
||
for index_MeteorologicalStation in range(index_last_loop, array_numpy.shape[0]):
|
||
# if index_MeteorologicalStation < index_last_loop:
|
||
# continue
|
||
|
||
time_MeteorologicalStation = array_numpy[index_MeteorologicalStation, 0]
|
||
date_MeteorologicalStation = datetime.strptime(time_MeteorologicalStation, '%m/%d/%Y %H:%M:%S.%f')
|
||
|
||
delta_in_seconds_tmp = abs((date_GasAnalyzer - date_MeteorologicalStation).total_seconds())
|
||
|
||
if index_MeteorologicalStation == index_last_loop: # 第一次循环时,为delta_in_seconds_last_loop赋初值
|
||
delta_in_seconds_last_loop = delta_in_seconds_tmp
|
||
|
||
if delta_in_seconds_tmp - delta_in_seconds_last_loop > 0: # ?????????????????????
|
||
delta_in_seconds_min = delta_in_seconds_last_loop
|
||
delta_backup.append(delta_in_seconds_min)
|
||
index_min_delta = index_MeteorologicalStation - 1
|
||
index_last_loop = index_min_delta
|
||
break
|
||
|
||
time_end2 = time.time() # 记录结束时间
|
||
time_sum2 = time_end2 - time_start2 # 计算的时间差为程序的执行时间,单位为秒/s
|
||
if index_GasAnalyzer % 100 == 0:
|
||
print("第%d次内层循环用时:%d" % (index_GasAnalyzer, time_sum2))
|
||
|
||
if delta_in_seconds_min > max(delta_backup)*3 and index_GasAnalyzer != 0: # 如果时间偏差太离谱,就放弃给此行写入数据
|
||
continue
|
||
|
||
# 进行数据插入
|
||
|
||
time_end = time.time() # 记录结束时间
|
||
time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
|
||
print(time_sum)
|
||
|
||
# numpy向量化:最快
|
||
def updateData_numpy_vectorize(self, array_numpy):
|
||
"""
|
||
气体分析仪和气象站数据每一行都有时间
|
||
根据每一行气体分析仪的数据中的时间,找到时间差最小的气象站数据的那一行,并将气象站数据插入到气体分析仪数据中
|
||
:param array_numpy: 气象站数据的numpy形式
|
||
:return:
|
||
"""
|
||
time_start = time.time() # 记录开始时间
|
||
for index_GasAnalyzer in range(self.data_numpy.shape[0]):
|
||
delta_in_seconds = abs(array_numpy[:, 6] - self.data_numpy[index_GasAnalyzer, 2])
|
||
index_min = np.argmin(delta_in_seconds)
|
||
|
||
# if delta_in_seconds_min > max(delta_backup) * 3 and index_GasAnalyzer != 0: # 如果时间偏差太离谱,就放弃给此行写入数据
|
||
# continue
|
||
|
||
# 进行数据插入
|
||
self.df_total.loc[index_GasAnalyzer, 'meteorologicalStation_newtime'] = array_numpy[index_min, 0]
|
||
self.df_total.loc[index_GasAnalyzer, 'humidity'] = array_numpy[index_min, 1]
|
||
self.df_total.loc[index_GasAnalyzer, 'temperature'] = array_numpy[index_min, 2]
|
||
self.df_total.loc[index_GasAnalyzer, 'pressure'] = array_numpy[index_min, 5]
|
||
|
||
windDirection = self.df_total.loc[index_GasAnalyzer, 'Vehicle Heading (degrees)'] + array_numpy[index_min, 3]
|
||
if windDirection >= 360:
|
||
windDirection = windDirection - 360
|
||
self.df_total.loc[index_GasAnalyzer, 'windDirection'] = windDirection
|
||
|
||
windVelocity = array_numpy[index_min, 4]
|
||
if 0 <= windDirection < 90:
|
||
windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
if 90 <= windDirection < 180:
|
||
windDirection = windDirection - 90
|
||
windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
if 180 <= windDirection < 270:
|
||
windDirection = windDirection - 180
|
||
windVelocity_x = math.sin(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.cos(math.radians(windDirection)) * windVelocity
|
||
if 270 <= windDirection < 360:
|
||
windDirection = windDirection - 270
|
||
windVelocity_x = math.cos(math.radians(windDirection)) * windVelocity
|
||
windVelocity_y = math.sin(math.radians(windDirection)) * windVelocity
|
||
self.df_total.loc[index_GasAnalyzer, 'windVelocity_x'] = windVelocity_x
|
||
self.df_total.loc[index_GasAnalyzer, 'windVelocity_y'] = windVelocity_y
|
||
|
||
time_end = time.time() # 记录结束时间
|
||
time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
|
||
print(time_sum)
|
||
|
||
def delta_in_seconds(self, time1, time2):
|
||
"""
|
||
输入两个日期,返回两个日期的时间差(单位为秒)
|
||
:param time1: 字符串,形如:08/13/2022 10:24:26.022
|
||
:param time2: 字符串,形如:08/13/2022 10:24:26.022
|
||
:return:
|
||
"""
|
||
month_day_year1, hour_minute_second1 = self.get_time(time1)
|
||
month_day_year2, hour_minute_second2 = self.get_time(time2)
|
||
|
||
tmp0 = abs((int(month_day_year1[0]) - int(month_day_year2[0])) * 30 * 24 * 60 * 60)
|
||
tmp1 = abs((int(month_day_year1[1]) - int(month_day_year2[1])) * 24 * 60 * 60)
|
||
tmp2 = abs((int(month_day_year1[2]) - int(month_day_year2[2])) * 12 * 30 * 24 * 60 * 60)
|
||
|
||
tmp3 = abs((int(hour_minute_second1[0]) - int(hour_minute_second2[0])) * 60 * 60)
|
||
tmp4 = abs((int(hour_minute_second1[1]) - int(hour_minute_second2[1])) * 60)
|
||
tmp5 = abs((float(hour_minute_second1[2]) - float(hour_minute_second2[2])))
|
||
|
||
return tmp0 + tmp1 + tmp2 + tmp3 + tmp4 + tmp5
|
||
|
||
def get_time(self, time1):
|
||
"""
|
||
输入时间字符串,输出分割后的时间
|
||
:param time1: 字符串,形如:08/13/2022 10:24:26.022
|
||
:return:
|
||
"""
|
||
tmp1 = time1.split(sep=" ") # tmp = ['08/13/2022', '10:24:26.022']
|
||
month_day_year = tmp1[0].split(sep="/") # ['08', '13', '2022']
|
||
hour_minute_second = tmp1[1].split(sep=":") # ['10', '24', '26.022']
|
||
|
||
return month_day_year, hour_minute_second
|
||
|
||
def get_time_in_seconds(self, time1):
|
||
"""
|
||
计算日期的秒数(除去年月日,只保留时分秒)
|
||
:param time1: 字符串,形如:08/13/2022 10:24:26.022
|
||
:return:日期的秒数
|
||
"""
|
||
month_day_year1, hour_minute_second1 = self.get_time(time1)
|
||
|
||
tmp3 = int(hour_minute_second1[0]) * 60 * 60
|
||
tmp4 = int(hour_minute_second1[1]) * 60
|
||
tmp5 = float(hour_minute_second1[2])
|
||
|
||
return tmp3 + tmp4 + tmp5
|
||
|
||
|
||
class MeteorologicalStation():
|
||
def __init__(self, folderPath):
|
||
self.folderPath = folderPath
|
||
|
||
def read_data(self):
|
||
path_list = os.listdir(self.folderPath)
|
||
self.validFiles = []
|
||
for filename in path_list:
|
||
if os.path.splitext(filename)[1] == '.dat':
|
||
self.validFiles.append(filename)
|
||
|
||
# 这两个循环可以合并优化,但不确定是否可以提高性能
|
||
names = locals()
|
||
self.rowCountsBackup = []
|
||
for i, filename in enumerate(self.validFiles):
|
||
names[f'df_{i}'] = pd.read_csv(self.folderPath + "\\" + filename, header=None, sep=',')
|
||
self.rowCountsBackup.append(names[f'df_{i}'].shape[0])
|
||
|
||
# 将时间转换为秒,并插入到最右边的那一列(原本共6列,从0开始)
|
||
names[f'df_{i}'].insert(loc=6, column='time_inseconds', value=0)
|
||
for j in range(names[f'df_{i}'].shape[0]):
|
||
time_tmp = names[f'df_{i}'].iloc[j, 0].strip()
|
||
names[f'df_{i}'].loc[j, "time_inseconds"] = self.get_time_in_seconds(time_tmp)
|
||
|
||
for x in range(len(self.rowCountsBackup)):
|
||
if(x==0):
|
||
self.df_total = names[f'df_{0}']
|
||
else:
|
||
self.df_total = pd.concat([self.df_total, names[f'df_{x}']], ignore_index=True)
|
||
|
||
if(sum(self.rowCountsBackup) != self.df_total.shape[0]):
|
||
print("拼接气体分析仪数据失败!拼接前后行数不一致!")
|
||
return 1
|
||
|
||
self.data_numpy = np.array(self.df_total)
|
||
|
||
return 0
|
||
|
||
def get_time(self, time1):
|
||
tmp1 = time1.split(sep=" ") # tmp = ['08/13/2022', '10:24:26.022']
|
||
month_day_year = tmp1[0].split(sep="/") # ['08', '13', '2022']
|
||
hour_minute_second = tmp1[1].split(sep=":") # ['10', '24', '26.022']
|
||
|
||
return month_day_year, hour_minute_second
|
||
|
||
def get_time_in_seconds(self, time1):
|
||
month_day_year1, hour_minute_second1 = self.get_time(time1)
|
||
|
||
tmp3 = int(hour_minute_second1[0]) * 60 * 60
|
||
tmp4 = int(hour_minute_second1[1]) * 60
|
||
tmp5 = float(hour_minute_second1[2])
|
||
|
||
return tmp3 + tmp4 + tmp5
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("GasAnalyzer", help="气体分析仪数据路径。")
|
||
parser.add_argument("MeteorologicalStation", help="气象站路径。")
|
||
args = parser.parse_args()
|
||
|
||
gas_analyzer = GasAnalyzer(args.GasAnalyzer)
|
||
gas_analyzer.read_data()
|
||
|
||
meteorological_station = MeteorologicalStation(args.MeteorologicalStation)
|
||
meteorological_station.read_data()
|
||
|
||
gas_analyzer.merge_data(meteorological_station)
|
||
gas_analyzer.write_data(debug=True)
|
||
|
||
print("completed!!")
|