-
Notifications
You must be signed in to change notification settings - Fork 1
/
simulation.py
302 lines (271 loc) · 11.6 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""
The main framework for the dispatch scheduler simulation.
"""
import scheduler
import telescope
import datetime
import math
import numpy as np
import random
import sys
import os
import glob
import subprocess
from configobj import ConfigObj
import simbad_reader
class simulation:
def __init__(self,config_file,base_directory='.'):
self.base_directory = base_directory
self.config_file = config_file
self.dt_fmt = '%Y%m%dT%H:%M:%S'
self.load_config()
self.create_class_objects(tel_num=1)
self.init_infofile(self.scheduler)
self.update_time(self.starttime)
def load_config(self):
try:
config = ConfigObj(self.base_directory+'/config/'+self.config_file)
self.starttime = datetime.datetime.strptime(\
config['Setup']['STARTTIME'],self.dt_fmt)
self.endtime = datetime.datetime.strptime(\
config['Setup']['ENDTIME'],self.dt_fmt)
self.latitude = config['Setup']['LATITUDE']
self.longitude = config['Setup']['LONGITUDE']
self.elevation = float(config['Setup']['ELEVATION'])
self.sitename = config['Setup']['SITENAME']
except:
print('ERROR accessing configuration file: ' + self.config_file)
sys.exit()
def create_class_objects(self,tel_num=1):
# create a scheduler for the sim
self.scheduler = scheduler.scheduler('scheduler.ini')
# get the weather stats
self.get_weather_probs('dailyprob.txt')
# create the telescopes
self.telescopes = []
for ind in range(tel_num):
self.telescopes.append(telescope.telescope('telescope.ini',ind+1))
def update_time(self,time):
"""
A dinky function that can be used to update the time for all class
objects to make sure everything is in sync
"""
self.time = time
self.scheduler.time = time
self.scheduler.obs.date=time
def init_infofile(self,sheduler):
"""
a file that contains all the info for the simulation
"""
self.sim_ind = self.get_sim_index()
today_str = datetime.datetime.utcnow().strftime('%Y%m%d')
self.sim_name = today_str+'.%05d'%self.sim_ind
self.sim_path = './results/'+self.sim_name+'/'
# to give sim_path to scheduler
self.scheduler.sim_path = self.sim_path
try: os.stat(self.sim_path)
except: os.mkdir(self.sim_path)
with open(self.sim_path+self.sim_name+'.txt','w') as wfile:
wfile.write('DATE RUN: '+today_str+'\n')
wfile.write('STARTTIME: '+self.starttime.strftime(self.dt_fmt)+\
'\n')
wfile.write('ENDTIME: '+self.endtime.strftime(self.dt_fmt)+\
'\n')
def get_sim_index(self):
# see if there is a place to put the sim results, make one if not
try:
os.stat('./results')
except:
os.mkdir('./results')
# get the index for the simulation
files = glob.glob('./results/*')
ind_number = len(files)+1
return ind_number
def check_tele_list(self,tele_list):
if type(tele_list) is int:
if (tele_list < 1) or (tele_list > len(self.telescopes)):
tele_list = [x+1 for x in range(len(self.telescopes))]
else:
tele_list = [tele_list]
tele_list = [x-1 for x in tele_list]
def write_target_file(self,target):
header = 'obs_start \t obs_end \t duration \t altitude \t azimuth '+\
'\t quality'
with open(self.sim_path+target['name']+'.txt','w') as target_file:
target_file.write(header+'\n')
def calc_exptime(self,target):
return target['exptime']
def get_obs_history(self,target,prev_obs=1,simpath=None):
if simpath == None:
simpath = self.sim_path
# a function that 'tail's a target file to get the last prev_obs and
# places the details in a list?
target_file = simpath+target['name']+'.txt'
raw_obs=\
subprocess.check_output(['tail','-n',str(prev_obs),target_file])
obs_lines = raw_obs.split('\n')[:-1]
obs_list = []
for line in obs_lines:
line = line.split('\t')
line[0] = datetime.datetime.strptime(line[0],self.dt_fmt)
line[1] = datetime.datetime.strptime(line[1],self.dt_fmt)
line[2] = float(line[2])
line[3] = float(line[3])
line[4] = float(line[4])
obs_list.append(line)
return obs_list
def record_observation(self,target,telescopes=None):
obs_start = self.time
exptime = self.calc_exptime(target)
obs_end = self.time + datetime.timedelta(minutes=exptime)
duration = (obs_end-obs_start).total_seconds()
try: os.stat(self.sim_path+target['name']+'.txt')
except: self.write_target_file(target)
self.scheduler.obs.date=self.time
# the observation 'quality', or whether it was a good observation or
# not (1 is good, 0 is unusable)
obs_quality = 1
target['fixedbody'].compute(self.scheduler.obs)
alt = target['fixedbody'].alt
azm = target['fixedbody'].az
# if target['fixedbody'].alt < 0:
# ipdb.set_trace()
with open(self.sim_path+target['name']+'.txt','a') as target_file:
obs_string = obs_start.strftime(self.dt_fmt)+'\t'+\
obs_end.strftime(self.dt_fmt)+'\t'+\
'%08.2f'%duration+'\t'+\
'%06.2f'%math.degrees(alt)+'\t'+\
'%07.2f'%math.degrees(azm)+' \t '+\
'%i'%obs_quality+\
'\n'
print(target['name']+': '+obs_string)
target_file.write(obs_string)
obs_list = [obs_start,obs_end,duration,alt,azm,obs_quality]
target['last_obs'].append(obs_list)
pass
def record_target(self,target):
self.scheduler.obs.horizon=str(self.scheduler.target_horizon)
target['fixedbody'].compute(self.scheduler.obs)
if target['fixedbody'].neverup:
return
if target['fixedbody'].circumpolar:
return
with open(self.sim_path+target['name']+'set.txt','a') as targetfile:
tstime = self.scheduler.obs.next_setting(\
target['fixedbody'],start=self.time).datetime()
targetfile.write(tstime.strftime(self.dt_fmt+'\n'))
with open(self.sim_path+target['name']+'rise.txt','a') as targetfile:
trtime = self.scheduler.obs.next_rising(\
target['fixedbody'],start=self.time).datetime()
targetfile.write(trtime.strftime(self.dt_fmt+'\n'))
def record_sun(self):
with open(self.sim_path+'sunset.txt','a') as sunfile:
# the time the sunsets
sstime = self.scheduler.nextsunset(sim.time)
sunfile.write(sstime.strftime(self.dt_fmt+'\n'))
with open(self.sim_path+'sunrise.txt','a') as sunfile:
# the time the sunsets
srtime = self.scheduler.nextsunrise(sim.time)
sunfile.write(srtime.strftime(self.dt_fmt+'\n'))
def get_weather_probs(self,weatherfile=None):
self.weather_probs = np.genfromtxt(weatherfile)
def check_weather(self,time):
# weather stats are for 365 day year, reusing value of 0101 if it is a
# lead year.
daynumber = (time.timetuple().tm_yday-1)%364
random.seed()
prob = random.uniform(0,1)
if prob > self.weather_probs[daynumber]:
return False
else:
return True
if __name__ == '__main__':
import ipdb
# ipdb.set_trace()
# start off by making a simulation class
sim = simulation('simulation.ini')
# seed the random number generator so we get the same list of targets for
# alias evalutions
random.seed(1)
random.shuffle(sim.scheduler.target_list)
sim.scheduler.target_list=sim.scheduler.target_list[:60]
ipdb.set_trace()
# targetlist=simbad_reader.read_simbad('./secret/eta_list.txt')
# for target in targetlist:
# sim.write_target_file(target)
# sim.update_time(datetime.datetime.utcnow())
sim.scheduler.prep_night(init_run=True)
# just a holder for last obs, two days prior to start to make irelevant
for target in sim.scheduler.target_list:
sim.record_observation(target)
# target['last_obs'] = sim.starttime-datetime.timedelta(days=2)
sim.scheduler.calculate_weights()
weights = []
magvs = []
i=1
obs_count=0
total_exp = 0
setimes = []
ipdb.set_trace()
# while we are still in the simulation time frame
while sim.time < sim.endtime:
sim.update_time(sim.time)
# if the current time is before the next sunset and the previous
# sunrise is greater than the previous sunset, it is daytime
if sim.time < sim.scheduler.nextsunset(sim.time) and\
sim.scheduler.prevsunset(sim.time)<\
sim.scheduler.prevsunrise(sim.time):
# record the next sunrise and set times
sim.record_sun()
for target in sim.scheduler.target_list:
sim.record_target(target)
# change the current time to the time of sunset and add one second
sim.time = sim.scheduler.nextsunset(sim.time)+\
datetime.timedelta(seconds=1)
sim.update_time(sim.time)
if not sim.check_weather(sim.time):
print('NIGHT LOST DUE TO WEATHER')
sim.time = sim.scheduler.nextsunrise(sim.time)+\
datetime.timedelta(seconds=1)
sim.scheduler.prep_night()
# end iteration
continue
# if the current time is before the next sunrise and the previous
# sunset is greater than the previous sunrise, it is nighttime
if sim.time < sim.scheduler.nextsunrise(sim.time) and \
sim.scheduler.prevsunrise(sim.time)<\
sim.scheduler.prevsunset(sim.time):
# (re)calculate the weights, which also orders them by weight
sim.scheduler.calculate_weights()
# for target in sim.scheduler.target_list:
# print target['weight']
# ipdb.set_trace()
for target in sim.scheduler.target_list:
# if the top target is still less than zero, wait five minutes
if target['weight']<0.:
sim.time+=datetime.timedelta(minutes=5)
print('Nothing observed')
break
# if sim.scheduler.is_observable(target):
total_exp += sim.calc_exptime(target)
if target['observed']>2:
ipdb.set_trace()
target['observed']+=1
# target['last_obs']=sim.time
sim.record_observation(target)
obs_count+=1
sim.time+=datetime.timedelta(minutes=target['exptime'])
break
sim.time+=datetime.timedelta(minutes=5)
print obs_count
ipdb.set_trace()
pass
# plan
# if current time between last rising and next setting of sun
# change current time to next setting
# if current time between last setting and next rising
# try: observe a star
# else: change time to next rising if we can't?
# if out of simulation time frame
# end observation
print 'Completed simulation '+sim.sim_name