用LSTM预测交通工具的行踪
最编程
2024-08-09 11:16:41
...
网络上利用LSTM预测轨迹的文章不多,仅有的几篇比较粗略。本文对一些大佬开源的代码进行修改,增添了轨迹连续预测代码。不足之处欢迎批评。
本文参考Muzi_Water大佬的文章“LSTM模型 轨迹经纬度预测”https://blog.****.net/Muzi_Water/article/details/103921115
大佬的文章预测代码部分仅仅测试了一组坐标,预测了一个坐标。我在其代码基础上进行修改,预测了整条连续的轨迹。而且大佬的文章里用的TensorFlow比较老了,我将部分代码改成适用于TensorFlow2.2.0的。
数据集可以直接用Geolife,我是用自己的数据,如图所示。利用前两列经纬度信息
用前六个位置信息预测下一个位置
训练代码
import numpy as np
from keras.layers.core import Dense, Activation, Dropout
from keras.layers import LSTM
from keras.models import Sequential, load_model
from keras.callbacks import Callback
import keras.backend.tensorflow_backend as KTF
import tensorflow as tf
import pandas as pd
import os
import keras.callbacks
import matplotlib.pyplot as plt
# 设定为自增长
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
config=tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session=tf.compat.v1.Session(config=config)
KTF.tf.compat.v1.keras.backend.set_session(session)
def create_dataset(data, n_predictions, n_next):
'''
对数据进行处理
'''
dim = data.shape[1]
train_X, train_Y = [], []
for i in range(data.shape[0] - n_predictions - n_next - 1):
a = data[i:(i + n_predictions), :]
train_X.append(a)
tempb = data[(i + n_predictions):(i + n_predictions + n_next), :]
b = []
for j in range(len(tempb)):
for k in range(dim):
b.append(tempb[j, k])
train_Y.append(b)
train_X = np.array(train_X, dtype='float64')
train_Y = np.array(train_Y, dtype='float64')
test_X, test_Y = [], []
i = data.shape[0] - n_predictions - n_next - 1
a = data[i:(i + n_predictions), :]
test_X.append(a)
tempb = data[(i + n_predictions):(i + n_predictions + n_next), :]
b = []
for j in range(len(tempb)):
for k in range(dim):
b.append(tempb[j, k])
test_Y.append(b)
test_X = np.array(test_X, dtype='float64')
test_Y = np.array(test_Y, dtype='float64')
return train_X, train_Y, test_X, test_Y
def NormalizeMult(data, set_range):
'''
返回归一化后的数据和最大最小值
'''
normalize = np.arange(2 * data.shape[1], dtype='float64')
normalize = normalize.reshape(data.shape[1], 2)
for i in range(0, data.shape[1]):
if set_range == True:
list = data[:, i]
listlow, listhigh = np.percentile(list, [0, 100])
else:
if i == 0:
listlow = -90
listhigh = 90
else:
listlow = -180
listhigh = 180
normalize[i, 0] = listlow
normalize[i, 1] = listhigh
delta = listhigh - listlow
if delta != 0:
for j in range(0, data.shape[0]):
data[j, i] = (data[j, i] - listlow) / delta
return data, normalize
def trainModel(train_X, train_Y):
'''
trainX,trainY: 训练LSTM模型所需要的数据
'''
model = Sequential()
model.add(LSTM(
120,
input_shape=(train_X.shape[1], train_X.shape[2]),
return_sequences=True))
model.add(Dropout(0.3))
model.add(LSTM(
120,
return_sequences=False))
model.add(Dropout(0.3))
model.add(Dense(
train_Y.shape[1]))
model.add(Activation("relu"))
model.compile(loss='mse', optimizer='adam', metrics=['acc'])
model.fit(train_X, train_Y, epochs=100, batch_size=64, verbose=1)
model.summary()
return model
if __name__ == "__main__":
train_num = 6
per_num = 1
# set_range = False
set_range = True
# 读入时间序列的文件数据
data = pd.read_csv('20081024020959.txt', sep=',').iloc[:, 0:2].values
print(data)
print("样本数:{0},维度:{1}".format(data.shape[0], data.shape[1]))
# print(data)
# 画样本数据库
plt.scatter(data[:, 1], data[:, 0], c='r', marker='o', label='result of recognition')
plt.legend(loc='upper left')
plt.grid()
plt.show()
# 归一化
data, normalize = NormalizeMult(data, set_range)
# print(normalize)
# 生成训练数据
train_X, train_Y, test_X, test_Y = create_dataset(data, train_num, per_num)
print("x\n", train_X.shape)
print("y\n", train_Y.shape)
# 训练模型
model = trainModel(train_X, train_Y)
loss, acc = model.evaluate(train_X, train_Y, verbose=2)
print('Loss : {}, Accuracy: {}'.format(loss, acc * 100))
# 保存模型
np.save("./traj_model_trueNorm.npy", normalize)
model.save("./traj_model_120.h5")
原始轨迹如下图
预测轨迹代码
import numpy as np
from keras.layers.core import Dense, Activation, Dropout
from keras.layers import LSTM
from keras.models import Sequential, load_model
from keras.callbacks import Callback
import keras.backend.tensorflow_backend as KTF
import tensorflow as tf
import pandas as pd
import os
import keras.callbacks
import matplotlib.pyplot as plt
import copy
# 设定为自增长
config=tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session=tf.compat.v1.Session(config=config)
KTF.tf.compat.v1.keras.backend.set_session(session)
def rmse(predictions, targets):
return np.sqrt(((predictions - targets) ** 2).mean())
def mse(predictions, targets):
return ((predictions - targets) ** 2).mean()
def reshape_y_hat(y_hat, dim):
re_y = []
i = 0
while i < len(y_hat):
tmp = []
for j in range(dim):
tmp.append(y_hat[i + j])
i = i + dim
re_y.append(tmp)
re_y = np.array(re_y, dtype='float64')
return re_y
#数据切分
def data_set(dataset,test_num):#创建时间序列数据样本
dataX,dataY=[],[]
for i in range(len(dataset)-test_num-1):
a=dataset[i:(i+test_num)]
dataX.append(a)
dataY.append(dataset[i+test_num])
return np.array(dataX),np.array(dataY)
# 多维反归一化
def FNormalizeMult(data, normalize):
data = np.array(data, dtype='float64')
# 列
for i in range(0, data.shape[1]):
listlow = normalize[i, 0]
listhigh = normalize[i, 1]
delta = listhigh - listlow
print("listlow, listhigh, delta", listlow, listhigh, delta)
# 行
if delta != 0:
for j in range(0, data.shape[0]):
data[j, i] = data[j, i] * delta + listlow
return data
# 使用训练数据的归一化
def NormalizeMultUseData(data, normalize):
for i in range(0, data.shape[1]):
listlow = normalize[i, 0]
listhigh = normalize[i, 1]
delta = listhigh - listlow
if delta != 0:
for j in range(0, data.shape[0]):
data[j, i] = (data[j, i] - listlow) / delta
return data
from math import sin, asin, cos, radians, fabs, sqrt
EARTH_RADIUS = 6371 # 地球平均半径,6371km
# 计算两个经纬度之间的直线距离
def hav(theta):
s = sin(theta / 2)
return s * s
def get_distance_hav(lat0, lng0, lat1, lng1):
# "用haversine公式计算球面两点间的距离。"
# 经纬度转换成弧度
lat0 = radians(lat0)
lat1 = radians(lat1)
lng0 = radians(lng0)
lng1 = radians(lng1)
dlng = fabs(lng0 - lng1)
dlat = fabs(lat0 - lat1)
h = hav(dlat) + cos(lat0) * cos(lat1) * hav(dlng)
distance = 2 * EARTH_RADIUS * asin(sqrt(h))
return distance
if __name__ == '__main__':
test_num = 6
per_num = 1
yuanshi=pd.read_csv('原始轨迹.txt', sep=',').iloc[:, 0:2].values
ex_data = pd.read_csv('原始轨迹.txt', sep=',').iloc[:, 0:2].values #原始数据
data, dataY = data_set(ex_data, test_num)
data.dtype = 'float64'
y = dataY
# #归一化
normalize = np.load("./traj_model_trueNorm.npy")
data_guiyi=[]
for i in range (len(data)):
data[i]=list(NormalizeMultUseData(data[i], normalize))
data_guiyi.append(data[i])
model = load_model("./traj_model_120.h5")
y_hat=[]
for i in range(len(data)):
test_X = data_guiyi[i].reshape(1, data_guiyi[i].shape[0], data_guiyi[i].shape[1])
dd = model.predict(test_X)
dd = dd.reshape(dd.shape[1])
dd = reshape_y_hat(dd, 2)
dd = FNormalizeMult(dd, normalize)
dd=dd.tolist()
y_hat.append(dd[0])
y_hat=np.array(y_hat)
# 画测试样本数据库
plt.rcParams['font.sans-serif'] = ['simhei'] # 用来正常显示中文标签
print(len(y_hat))
p1=plt.scatter(yuanshi[:, 1], yuanshi[:, 0], c='r', marker='o', label='识别结果')#原始轨迹
p2 = plt.scatter(y_hat[:, 1], y_hat[:, 0], c='b', marker='o', label='预测结果')
plt.legend(loc='upper left')
plt.grid()
plt.show()
预测轨迹如下图
推荐阅读
-
【视频】Python用LSTM长短期记忆神经网络对不稳定降雨量时间序列进行预测分析|数据分享|附代码数据
-
用Apache Spark进行客户流失预测的机器学习方法
-
使用Matlab进行LSTM-Adaboost-ABKDE集成学习:长短期记忆神经网络与自适应带宽核密度估计在多变量回归中的区间预测应用
-
使用LSTM进行人行为轨迹预测的Matlab代码示例
-
用卡尔曼滤波在Matlab中预测运动轨迹的源代码
-
用Java实现车辆轨迹预测:为什么它很重要?宋浩然博士的分享
-
Python中的LSTM实现轨迹预测:理解其重要性与意义
-
用深度学习预测目标轨迹的方法
-
用LSTM预测交通工具的行踪
-
用PyTorch和Python实现LSTM:时间序列预测的实战教程