韦伯斯特(Webster)算法的实施
最编程
2024-03-27 19:58:39
...
import pandas as pd
import re
data = pd.read_csv("D:\data\yonghelu.csv")
y_m_d = list((data['time']))[0].split()[0] #提取年月日
#从ip_lane列中提取车道信息如('172.18.53.140:1'-->'1'),并创建新的列lane保存
lane = []
a = data["ip_lane"].tolist()
for x in a:
x = x.split(":")
lane.append(x[1])
data["lane"] = lane
#从zhuanxiang列中提取方向信息如('望江西路与永和路_6017_由东向南'-->'由东向南'),并创建新的列zx保存
reg = re.compile(r'由.*')
zx = []
for x in data["zhuanxiang"].tolist():
x = reg.search(x).group()
zx.append(x)
data["zx"] = zx
#将time列改为Timedelta类型
data1 = data
data1['time'] = pd.to_datetime(data1['time'])
L = [] #存放相位信息
#读取txt文件中的相位信息存放到L列表中
with open('D:\\data\\phase.txt',"r",encoding="utf-8") as f:
for line in f.readlines():
line = re.sub("[0-9\:]", "", line).strip().split(",")
L.append(line)
#将time列表中的数据进行处理(只保留 时-分-秒)
data1['time'] =data1['time'] - pd.to_datetime(y_m_d)
#提取出t0~t1时间段的数据
t0 = pd.Timedelta('07:30:00')
t1 = pd.Timedelta('09:30:00')
#t0~t1时间段数据
data_earlypeak = data1[(data1['time']< t1) & (data1['time']>t0)]
data_earlypeak.head()
#t0~t1时间段间隔
T = (t1 - t0).seconds/3600
#按时间顺序重新排序表格
data2 = data1.sort_values(by = 'time')
data2.head()
#提取出表格中总共有哪些车道,存放入lane_列表
lane_ = data2['lane'].unique()
#计算每个车道的饱和流量
deta = [] #存放每条车道相邻两条数据的时间差
Time_deta = [] #存放由小到大排序的deta列表
for i in range(len(lane_)):
data2_ = data2[data2['lane'] == lane_[i]]
time_ = list(data2_['time'])
for i in range((len(time_) - 1)):
x = time_[i+1] - time_[i]
deta.append(x)
deta.sort()
Time_deta.append(deta)
deta = []
sum_deta = [] #存放 每条车道算出来的最小10个相邻时间差,进行求和
for i in range(len(Time_deta)):
d = Time_deta[i][0]
for j in range(1,10): #取10个最小相邻时间差值求和
d = d + Time_deta[i][j]
sum_deta.append(d)
#计算每条车道饱和流量
for i in range(len(sum_deta)):
sum_deta[i] = 3600/((sum_deta[i].seconds)/10) #10个最小相邻时间差值求和后要 /10 求平均
#用字典hs每条车道对应的饱和流量,即"车道":"饱和流量" 如{'1': 3600.0,'10': 3527.0,'11': 3600.0}
hs = {key : value for key, value in list(zip(lane_,sum_deta))}
y = [] #用来存储每个相位的每个方向的每条车道的流量比
Y = [] #用来存储各相位的所有车道流量比,如Y[0]存储的是第一个相位的每个方向各个车道的流量比,取max(Y[0])
#即为第一相位的流量比
for k in range(len(L)):
for i in range(len(L[k])):
#查找第K个相位的第i方向的所有记录
df = data_earlypeak[data_earlypeak["zx"] == L[k][i]]
#如果某个方向没有记录就几记为0
if len(df) == 0:
y.append(0)
continue
#查看该方向的所有车道,如:'由东向西'对应由车道['2', '3', '4', '5']
vehiclelane = list(df["lane"].unique())
#计算该方向的各个车道的流量比
for j in range(len(vehiclelane)):
# 统计每个方向的每条车道流量数
x = sum(data_earlypeak[data_earlypeak["lane"] == vehiclelane[j]]['flow'])
# t0~t1时间段单位时间的流量(小时为单位)
x = x/T
#计算流量比(实际流量/饱和流量)
x = x/hs[vehiclelane[j]]
#存储该方向的各车道流量比
y.append(x)
#存储该相位的各个方向的每条车道的流量比
Y.append(y)
#更新y为空,计算下一个相位的各个方向的每条车道的流量比
y = []
#记录每个相位的流量比(该相位的每个方向的每条车道的流量比的最大值)
yi = [max(Y[0]),max(Y[1]),max(Y[2]),max(Y[3])]
print(yi)
# 总流量比
Y_ = sum(yi)
print(Y_)
#总损失时间 L = n*Li+AR
sumL = 3*4+0
#由公式计算周期
C = round((1.5*sumL+5)/(1-Y_))
print("周期为: {}秒".format(C))
#有效绿灯时间
Ge = C - sumL
print("------------------------------------------------------")
#第i个相位的有效绿灯时间
ge = [0]*len(L)
for i in range(0,len(L)):
ge[i] = round(Ge*(yi[i]/Y_))
print("第 %d 个相位的绿灯有效时间为 %d:" % (i+1,ge[i]))
print("------------------------------------------------------")
#绿灯显示时间
G = [0]*len(L)
for i in range(0,len(L)):
G[i] = round(ge[i] - 0 +3) #0为全红时间即AR的值;3为每个相位启动损失时间
print("第 %d 个相位的绿灯显示时间为 %d:" % (i+1,G[i]))
上一篇: 音频调制算法摘要
下一篇: 安卓 SF 相位偏移 造成相位偏移的原因
推荐阅读