forked from tianrang-intelligence/TSCC2019
-
Notifications
You must be signed in to change notification settings - Fork 0
/
evaluate.py
217 lines (175 loc) · 7.87 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import engine
import json
import pandas as pd
import numpy as np
from sim_setting import sim_setting_control
import argparse
from utility import parse_arguments
def main():
args = parse_arguments()
sim_setting = sim_setting_control
sim_setting["num_step"] = args.num_step
evaluate_one_traffic(sim_setting, args.scenario)
def evaluate_one_traffic(dic_sim_setting, scenario):
roadnetFile = "data/{}/roadnet.json".format(scenario)
flowFile = "data/{}/flow.json".format(scenario)
planFile = "data/{}/signal_plan.txt".format(scenario)
outFile = "data/{}/evaluation.txt".format(scenario)
if check(planFile, dic_sim_setting["num_step"]):
df_vehicle_actual_enter_leave = test_run(dic_sim_setting, roadnetFile, flowFile, planFile)
df_vehicle_planed_enter = get_planed_entering(flowFile, dic_sim_setting)
# add planed entering to actual leaving
tt = cal_travel_time(df_vehicle_actual_enter_leave, df_vehicle_planed_enter, outFile, dic_sim_setting)
print("====================== travel time ======================")
print("scenario_{0}: {1:.2f} s".format(scenario, tt))
print("====================== travel time ======================")
else:
print("planFile is invalid, Rejected!")
def test_run(dic_sim_setting, roadnetFile, flowFile, planFile):
eng = engine.Engine(dic_sim_setting["interval"], dic_sim_setting["threadNum"],
dic_sim_setting["saveReplay"], dic_sim_setting["rlTrafficLight"],
dic_sim_setting["changeLane"])
eng.load_roadnet(roadnetFile)
eng.load_flow(flowFile)
plan = open(planFile, "r")
plan.readline()
# get lanes todo--need to be changed when lane setting or intersection setting is changed
inter_name = "intersection_1_1"
list_all_lanes = list(eng.get_lane_vehicles().keys())
dic_roadnet = json.load(open(roadnetFile, "r"))
dic_intersections = {}
for inter in dic_roadnet["intersections"]:
dic_intersections[inter["id"]] = inter
list_exiting_lanes = []
list_non_exiting_lanes = []
for lane in list_all_lanes:
int_x, int_y, direc, l_id = list(map(int, lane.split('_')[1:]))
if direc == 0:
int_x_end = int_x + 1
int_y_end = int_y
elif direc == 1:
int_x_end = int_x
int_y_end = int_y + 1
elif direc == 2:
int_x_end = int_x - 1
int_y_end = int_y
elif direc == 3:
int_x_end = int_x
int_y_end = int_y - 1
if dic_intersections["intersection_{0}_{1}".format(int_x_end, int_y_end)]["virtual"]:
list_exiting_lanes.append(lane)
else:
list_non_exiting_lanes.append(lane)
# maintain list of vehicles in exiting lanes and non-exiting lanes
list_non_exiting_lane_vehicles_prev = []
dic_vehicle_enter_leave_time = {}
for step in range(dic_sim_setting["num_step"]):
current_time = eng.get_current_time() # return a double, time past in seconds
lane_vehicles = eng.get_lane_vehicles() # return a dict, {lane_id: [vehicle1_id, vehicle2_id, ...], ...}
list_non_exiting_lane_vehicles_cur = []
for lane in list_non_exiting_lanes:
list_non_exiting_lane_vehicles_cur += lane_vehicles[lane]
for vec in (set(list_non_exiting_lane_vehicles_cur) - set(list_non_exiting_lane_vehicles_prev)):
# new entering vehicles
dic_vehicle_enter_leave_time[vec] = {"enter_time": current_time, "leave_time": float("nan")}
for vec in (set(list_non_exiting_lane_vehicles_prev) - set(list_non_exiting_lane_vehicles_cur)):
# new left vehicles
dic_vehicle_enter_leave_time[vec]["leave_time"] = current_time
if current_time % 100 == 0:
print("Time: {} / {}".format(current_time, dic_sim_setting["num_step"]))
phase = int(plan.readline())
eng.set_tl_phase(inter_name, phase) # set traffic light of intersection_1_1 to phase (phases of intersection is defined in roadnetFile)
eng.next_step()
list_non_exiting_lane_vehicles_prev = list_non_exiting_lane_vehicles_cur[:]
df_vehicle_enter_leave = pd.DataFrame(dic_vehicle_enter_leave_time).transpose()
assert np.all((df_vehicle_enter_leave["enter_time"] >= 0).values)
return df_vehicle_enter_leave
def get_planed_entering(flowFile, dic_sim_setting):
# todo--check with huichu about how each vehicle is inserted, according to the interval. 1s error may occur.
list_flow = json.load(open(flowFile, "r"))
dic_traj = {}
for flow_id, flow in enumerate(list_flow):
list_ts_this_flow = []
for step in range(flow["startTime"], min(flow["endTime"] + 1, dic_sim_setting["num_step"])):
if step == flow["startTime"]:
list_ts_this_flow.append(step)
elif step - list_ts_this_flow[-1] >= flow["interval"]:
list_ts_this_flow.append(step)
for vec_id, ts in enumerate(list_ts_this_flow):
dic_traj["flow_{0}_{1}".format(flow_id, vec_id)] = {"planed_enter_time": ts}
return pd.DataFrame(dic_traj).transpose()
def cal_travel_time(df_vehicle_actual_enter_leave, df_vehicle_planed_enter, outFile, dic_sim_setting):
df_res = pd.concat([df_vehicle_planed_enter, df_vehicle_actual_enter_leave], axis=1, sort=False)
assert len(df_res) == len(df_vehicle_planed_enter)
df_res["leave_time"].fillna(dic_sim_setting["num_step"])
df_res["travel_time"] = df_res["leave_time"] - df_res["planed_enter_time"]
travel_time = df_res["travel_time"].mean()
with open(outFile, "w") as f:
f.write("Travel time is %.2f seconds." % travel_time)
return travel_time
def check(planFile, num_step):
flag = True
error_info = ''
try:
plan = pd.read_csv(planFile, sep='\t', header=0, dtype=int)
except:
flag = False
error_info = 'The format of signal plan is not valid and cannot be read by pd.read_csv!'
print(error_info)
return flag
intersection_id = plan.columns[0]
if intersection_id != 'intersection_1_1':
flag = False
error_info = 'The header intersection_id is wrong (for example: intersection_1_1)!'
print(error_info)
return flag
phases = plan.values
current_phase = phases[0][0]
if len(phases) < num_step:
flag = False
error_info = 'The time of signal plan is less than the default time!'
print(error_info)
return flag
if current_phase == 0:
yellow_time = 1
else:
yellow_time = 0
# get first green phase and check
last_green_phase = '*'
for next_phase in phases[1:]:
next_phase = next_phase[0]
# check phase itself
if next_phase == '':
continue
if next_phase not in [0, 1, 2, 3, 4, 5, 6, 7, 8]:
flag = False
error_info = 'Phase must be in [0, 1, 2, 3, 4, 5, 6, 7, 8]!'
break
# check changing phase
if next_phase != current_phase and next_phase != 0 and current_phase != 0:
flag = False
error_info = '5 seconds of yellow time must be inserted between two different phase!'
break
# check unchangeable phase
if next_phase != 0 and next_phase == last_green_phase:
flag = False
error_info = 'No yellow light is allowed between the same phase!'
break
# check yellow time
if next_phase != 0 and yellow_time != 0 and yellow_time != 5:
flag = False
error_info = 'Yellow time must be 5 seconds!'
break
# normal
if next_phase == 0:
yellow_time += 1
if current_phase != 0:
last_green_phase = current_phase
else:
yellow_time = 0
current_phase = next_phase
if not flag:
print(error_info)
return flag
if __name__ == "__main__":
main()