Skip to content

Commit

Permalink
O4 realtime (#50)
Browse files Browse the repository at this point in the history
* updating public webpages

* document if on realtime

* don't label events if not 1000s tw

* look in albert home while lvk_dropbox is down

* new script to convert multiorder maps

* pointing back to dropbox

* nside checks, 512 for 1000s followup

* nside arg for moc conversion script

* public webpage labels for sig and subthr

* return if real event in heartbeat mode

* Merge branch 'master' of github.com:icecube/FastResponseAnalysis into o4_realtime

* load up to 5000s to check for events in realtime

* add internal public webpage fxn

* event time def earlier

* get gcn in case p<0.1

* try for pickle load

* index at 1 instead of 0 for events on skymap

* add a try to convert from moc

* add email/sms notif

* internal alerts pub webpage updates

* formatting for alerts webpage

* increment versioning

* dont send most prob direction if no coinc events

* change header for public wp

* make plots w file modification time

* precomp gw bg trials
  • Loading branch information
jessiethw authored Jul 23, 2023
1 parent 08d22c6 commit 3bd9f61
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 62 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
*.pyc
*.fits
*.fit*
*.xml
*.json
*.txt
Expand All @@ -21,4 +21,5 @@ fast_response/results_dataframe_bkup.pkl
fast_response/scripts/make_call.py
cascade_skymaps/
*.png
fast_response/test_plotting*
fast_response/tests/
fast_response/slack_posters/lvk_email_sms_notif.py
2 changes: 1 addition & 1 deletion fast_response/FastResponseAnalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def plot_skymap(self, with_contour=False, contour_files=None, label_events=False
hp.projscatter(theta,phi,c=cols,marker='x',label='GFU Event',coord='C', zorder=5)
if label_events:
for j in range(len(theta)):
hp.projtext(theta[j], phi[j]-0.11, '{}'.format(j), color='red', fontsize=18, zorder=6)
hp.projtext(theta[j], phi[j]-0.11, '{}'.format(j+1), color='red', fontsize=18, zorder=6)
handles.append(Line2D([0], [0], marker='x', ls='None', label='GFU Event'))

if (self.stop - self.start) <= 0.5: #Only plot contours if less than 2 days
Expand Down
17 changes: 8 additions & 9 deletions fast_response/GWFollowup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
mpl.use('agg')
import matplotlib.pyplot as plt
import pickle
from astropy.time import Time

from .FastResponseAnalysis import PriorFollowup
from .reports import GravitationalWaveReport
Expand Down Expand Up @@ -75,9 +74,9 @@ def run_background_trials(self, month=None, ntrials=1000):
self.duration * 86400., closest_rate, self.duration * 86400.))

#check for nside mismatch
if hp.pixelfunc.get_nside(pre_ts_array.shape[1]) != self.nside:
print('Error! Analysis uses nside of %i '.format(self.nside)+
'while precomputed BG is nside %i'.format(hp.pixelfunc.get_nside(pre_ts_array.shape[1])))
#if hp.pixelfunc.get_nside(pre_ts_array.shape[1]) != self.nside:
# print('Error! Analysis uses nside of %i '.format(self.nside)+
# 'while precomputed BG is nside %i'.format(hp.pixelfunc.get_nside(pre_ts_array.shape[1])))

ts_norm = np.log(np.amax(self.skymap))
ts_prior = pre_ts_array.copy()
Expand Down Expand Up @@ -136,10 +135,10 @@ def check_events_after(self):
check_passed = False if there are no events after (should re-load!) '''

t1 = Time(datetime.datetime.utcnow()).mjd
if ((t1-self.stop)*86400.)>1000.:
if ((t1-self.stop)*86400.)>5000.:
#if it's been long enough, only load 1000s
print('Loading 1000s of data after the time window')
t1 = self.stop + 1000./86400.
print('Loading 2000s of data after the time window')
t1 = self.stop + 2000./86400.
exp_long, livetime_long, grl_long = self.dset.livestream(
self.start,
t1,
Expand Down Expand Up @@ -327,12 +326,12 @@ def write_circular(self):
except:
noticeID = 'NOTICEID'

if pvalue > 0.01:
if pvalue > 0.1:
template_path = os.path.join(base, 'circular_templates/gw_gcn_template_low.txt')
else:
template_path = os.path.join(base, 'circular_templates/gw_gcn_template_high.txt')

if pvalue>0.01:
if pvalue>0.1:
with open(template_path, 'r') as gcn_template:

gcn = gcn_template.read()
Expand Down
15 changes: 10 additions & 5 deletions fast_response/MonitoringAndMocks/Data_Display.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def dial_up(who="jessie"):
all_dictionary['Name'].append(file)
for file in all_dictionary["Name"]:
date_format = "%Y-%m-%d"
c_timestamp = os.path.getctime(file)
c_timestamp = os.path.getmtime(file)
c_datestamp = datetime.datetime.utcfromtimestamp(c_timestamp)
all_dictionary['Time_Stamp'].append(c_datestamp)
print('Finished loading all latencies.')
Expand All @@ -83,7 +83,7 @@ def dial_up(who="jessie"):
Quality_Pickles = []

for file in Pickle_Files:
c_timestamp = os.path.getctime(file)
c_timestamp = os.path.getmtime(file)
c_datestamp = datetime.datetime.fromtimestamp(c_timestamp)
if (c_datestamp >= datetime.datetime.strptime('2022-12-05', date_format)):
Quality_Pickles.append(file)
Expand Down Expand Up @@ -113,7 +113,7 @@ def dial_up(who="jessie"):
First_Batch['Name'].append(file)
for file in First_Batch["Name"]:
date_format = "%Y-%m-%d"
c_timestamp = os.path.getctime(file)
c_timestamp = os.path.getmtime(file)
c_datestamp = datetime.datetime.fromtimestamp(c_timestamp)
First_Batch['Time_Stamp'].append(c_datestamp)
print('Finished loading 1st map latencies.')
Expand Down Expand Up @@ -158,7 +158,7 @@ def dial_up(who="jessie"):
Last_Batch['Name'].append(file)
for file in Last_Batch["Name"]:
date_format = "%Y-%m-%d"
c_timestamp = os.path.getctime(file)
c_timestamp = os.path.getmtime(file)
c_datestamp = datetime.datetime.fromtimestamp(c_timestamp)
Last_Batch['Time_Stamp'].append(c_datestamp)
print('Finished loading last map latencies')
Expand Down Expand Up @@ -662,6 +662,7 @@ def outlier_name(outlier_list, outlier_names):

save_path='/home/mromfoe/public_html/O4_followup_monitoring/ReportsPerDay_liveupdate.png'

plt.tight_layout()
fig.savefig(save_path)
fig.savefig("ReportsPerDay.png")

Expand Down Expand Up @@ -746,7 +747,11 @@ def make_bg_pval_dist(fontsize=15, lower_y_bound=-3.5):
print('Loading %i mocks (may take a while)'%(len(saved_mock_pkl)))
for mock in saved_mock_pkl:
with open(mock,'rb') as f:
result=pickle.load(f)
try:
result=pickle.load(f)
except:
print('skipped {}'.format(mock))
continue
all_mocks[result['name']]=result['p']
print('Done loading mocks.')

Expand Down
2 changes: 1 addition & 1 deletion fast_response/listeners/gcn_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def process_gcn(payload, root):
for elem in root.iterfind('.//Param')}

stream = params['Stream']
eventtime = root.find('.//ISOTime').text
if stream == '26':
print("Detected cascade type alert, running cascade followup. . . ")
alert_type='cascade'
Expand All @@ -52,7 +53,6 @@ def process_gcn(payload, root):

event_id = params['event_id']
run_id = params['run_id']
eventtime = root.find('.//ISOTime').text
event_mjd = Time(eventtime, format='isot').mjd
try:
bot.send_message(f'Listener found {alert_type} type alert, {event_name} \n'+
Expand Down
47 changes: 31 additions & 16 deletions fast_response/listeners/gw_gcn_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,43 @@ def process_gcn(payload, root):

skymap = params['skymap_fits']

# Skymap distributed is a different format than previous, but previous is available.
# Download correct format from GraceDB
# Multiorder Coverage (MOC) map links are distributed over the GCNs.
# Download flattened (normal healpy) map from GraceDB
if 'multiorder' in skymap:
time.sleep(6.) #if we don't wait, the old format isn't uploaded
try:
#HERE: added .fits but not impl yet. need to keep baystar/bilby naming?
bayestar_map=skymap.replace('multiorder.','').split(',')
if len(bayestar_map)==1:
try:
#Try to get flat-res healpy map
flat_map=skymap.replace('multiorder.','').split(',')
if len(flat_map)==1:
suffix='.gz'
else:
suffix = '.gz,'+ bayestar_map[1]
new_map = bayestar_map[0]+suffix
map_type= bayestar_map[0].split('/')[-1]
wget.download(new_map, out=os.environ.get('FAST_RESPONSE_OUTPUT')+f'skymaps/{name}_{map_type}{suffix}')
skymap=os.environ.get('FAST_RESPONSE_OUTPUT')+f'skymaps/{name}_{map_type}{suffix}'
suffix = '.gz,'+ flat_map[1]
new_map = flat_map[0]+suffix
map_type= flat_map[0].split('/')[-1]

wget.download(new_map, out=os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'),f'skymaps/{name}_{map_type}{suffix}'))
skymap=os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'),f'skymaps/{name}_{map_type}{suffix}')
except:
print('Failed to download skymap in correct format! \nDownload skymap and then re-run script with')
print(f'args: --time {event_mjd} --name {name} --skymap PATH_TO_SKYMAP')
print('Failed to download flat-resolution skymap. Trying to convert MOC map')
log_file.flush()
return
#TODO: add make_call to me here if this fails

try:
filename=skymap.split('/')[-1]
new_output = os.path.join(os.environ.get('FAST_RESPONSE_OUTPUT'),f'skymaps/{name}_{filename}')
wget.download(skymap, out=new_output)
subprocess.call([os.path.join(analysis_path, 'convert_moc_to_healpix.py'),
'--skymap', new_output])
if os.path.exists(new_output.replace('multiorder','converted')):
skymap = new_output.replace('multiorder','converted')
print('Successfully converted map: {}'.format(skymap))
log_file.flush()
else:
raise Exception('Failed to convert map.')
except:
print('Failed to get skymap in correct format! \nDownload skymap and then re-run script with')
print(f'args: --time {event_mjd} --name {name} --skymap PATH_TO_SKYMAP')
log_file.flush()
return

if root.attrib['role'] != 'observation':
name=name+'_test'
Expand Down
2 changes: 2 additions & 0 deletions fast_response/precomputed_background/precompute_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
outdir = os.environ.get('FAST_RESPONSE_OUTPUT')
except:
outdir = './'
else:
outdir = args.outdir
if not os.path.exists(outdir+'/trials/'):
os.mkdir(outdir+'/trials/')
outdir=outdir+'/trials/'
Expand Down
23 changes: 13 additions & 10 deletions fast_response/precomputed_background/submit_precomputed_trials.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
'--seed_start', type=int,default=0,
help='Seed to start with when running trials')
parser.add_argument(
'--n_per_batch', default=50, type=int,
help='Number of trials to run in each set (default: 50)')
'--n_per_batch', default=500, type=int,
help='Number of trials to run in each set (default: 500)')
args = parser.parse_args()

username = pwd.getpwuid(os.getuid())[0]
Expand Down Expand Up @@ -53,16 +53,19 @@
'when_to_transfer_output = ON_EXIT']
)

prev_trials = glob.glob('/data/user/jthwaites/FastResponseAnalysis/output/trials/*.npz')
#prev_trials = glob.glob('/data/user/jthwaites/FastResponseAnalysis/output/trials/*.npz')
for bg_rate in [6.0, 6.2, 6.4, 6.6, 6.8, 7.0, 7.2]:
for seed in range(args.seed_start, int(args.ntrials/100)):#int(args.ntrials/args.n_per_batch)):
for seed in range(args.seed_start, int(args.ntrials/args.n_per_batch)):
seed = seed*100
if f'/data/user/jthwaites/FastResponseAnalysis/output/trials/gw_{bg_rate}_mHz_seed_{seed}_delta_t_1.2e+06.npz' not in prev_trials:
#deltaT {args.tw} --ntrials 100 --seed {seed} --bkg {bg}
job.add_arg('--deltaT %s --ntrials %i --seed %i --bkg %s'
%(args.tw, args.n_per_batch, seed, bg_rate))
job.add_arg('--deltaT %s --ntrials %i --seed %i --bkg %s'
%(args.tw, args.n_per_batch, seed+50, bg_rate))
job.add_arg('--deltaT %s --ntrials %i --seed %i --bkg %s --outdir %s --type gw'
%(args.tw, args.n_per_batch, seed, bg_rate,
'/data/user/jthwaites/new_processing/normal_1000/'))

#dagman = pycondor.Dagman(
# 'fra_1000s_precompbg',
# submit=submit, verbose=2)

#dagman.add_job(job)
#dagman.build_submit()
job.build_submit()

2 changes: 1 addition & 1 deletion fast_response/reports/ReportGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def make_coinc_events_table(self, f):
)]
else:
event_table+=[
('{}'.format(i),
('{}'.format(i+1),
'{:.0f}'.format((event['time']-self.source['trigger_mjd'])*86400.),
"{:3.2f}\degree".format(np.rad2deg(event['ra'])),
'{:3.2f}\degree'.format(np.rad2deg(event['dec'])),
Expand Down
32 changes: 26 additions & 6 deletions fast_response/scripts/combine_results_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import matplotlib as mpl
mpl.use('agg')
import matplotlib.pyplot as plt
import io, time, os, glob
import io, time, os, glob, subprocess
import urllib.request, urllib.error, urllib.parse
import argparse
import json, pickle
Expand Down Expand Up @@ -301,15 +301,12 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False):
collected_results['observation_livetime'] = 1000

### COLLECT RESULTS ###
#additional_website_params = {}
send_notif = False

if uml_results_finished:
with open(uml_results_path, 'rb') as f:
uml_results = pickle.load(f)

#for key in ['skymap_path', 'analysisid']:
# if key in uml_results.keys():
# additional_website_params[key] = uml_results[key]

if llama_results_finished:
with open(llama_results_path, 'r') as f:
llama_results = json.load(f)
Expand All @@ -323,6 +320,9 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False):
if uml_results_finished and llama_results_finished:
collected_results['pval_generic'] = round(uml_results['p'],4)
collected_results['pval_bayesian'] = round(llama_results['p_value'],4)
if (collected_results['pval_generic']<0.01) or (collected_results['pval_bayesian']<0.01):
send_notif=True

if (collected_results['pval_generic']<0.1) or (collected_results['pval_bayesian']<0.1):
uml_ontime = format_ontime_events_uml(uml_results['coincident_events'], event_mjd)
llama_ontime = format_ontime_events_llama(llama_results['single_neutrino'])
Expand Down Expand Up @@ -351,6 +351,9 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False):
elif uml_results_finished:
collected_results['pval_generic'] = round(uml_results['p'],4)
collected_results['pval_bayesian'] = 'null'
if collected_results['pval_generic']<0.01:
send_notif=True

if collected_results['pval_generic'] <0.1:
uml_ontime = format_ontime_events_uml(uml_results['coincident_events'], event_mjd)
coinc_events=[]
Expand Down Expand Up @@ -383,6 +386,9 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False):
elif llama_results_finished:
collected_results['pval_generic'] = 'null'
collected_results['pval_bayesian'] = round(llama_results['p_value'],4)
if collected_results['pval_bayesian']<0.01:
send_notif=True

if collected_results['pval_bayesian']<0.1:
llama_ontime = format_ontime_events_llama(llama_results['single_neutrino'])
coinc_events=[]
Expand Down Expand Up @@ -410,6 +416,9 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False):

if (collected_results['n_events_coincident'] == 0) and ('coincident_events' in collected_results.keys()):
c = collected_results.pop('coincident_events')
if ('most_likely_direction' in collected_results.keys()):
if (collected_results['n_events_coincident'] == 0):
c = collected_results.pop('most_likely_direction')
if ('most_likely_direction' in collected_results.keys()):
try:
if (collected_results['pval_generic']>0.1):
Expand Down Expand Up @@ -458,6 +467,17 @@ def parse_notice(record, wait_for_llama=False, heartbeat=False):
except:
logger.warning('Failed to push to public webpage.')

if send_notif:
sender_script = os.path.join(os.environ.get('FAST_RESPONSE_SCRIPTS'),
'../slack_posters/lvk_email_sms_notif.py')
try:
subprocess.call([sender_script, '--path_to_gcn',
os.path.join(save_location, f'{name}_collected_results.json')])
logger.info('Sent alert to ROC for p<0.01')
except:
logger.warning('Failed to send email/SMS notification.')
else:
logger.info('p>0.01: no email/sms sent')
else:
with open(os.path.join(save_location, f'mocks/{name}_collected_results.json'),'w') as f:
json.dump(collected_results, f, indent = 6)
Expand Down
2 changes: 2 additions & 0 deletions fast_response/scripts/convert_moc_to_healpix.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python

import numpy as np
import matplotlib as mpl
mpl.use('agg')
Expand Down
35 changes: 35 additions & 0 deletions fast_response/scripts/push_internal_public.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python

import pandas as pd
from fast_response.web_utils import update_internal_public
import argparse, glob, os, sys

parser = argparse.ArgumentParser(description='Fast Response Followup')
parser.add_argument('--alert_gcn', type=int, default=None,
help='GCN circular number for the initial alert, or (run_id, event_id) to link notice')
parser.add_argument('--fra_circular', type=int, default=None,
help='GCN circular number for FRA circular')
parser.add_argument('--alert_name', type=str, default=None,
help='Alert identifier. Format - track: IceCube-YYMMDDA, cascade: IceCube-Cascade YYMMDDA')
parser.add_argument('--save_location', type=str, default=None,
help='Path to saved FRA results (default uses FAST_RESPONSE_OUTPUT env variable)')
args = parser.parse_args()

output = os.environ.get('FAST_RESPONSE_OUTPUT') if args.save_location is None else args.save_location
name = args.alert_name.replace(' ','_')

results_1000s = glob.glob(os.path.join(output, '*{}_1.0e+03_s/*.pickle'.format(name)))
results_2d = glob.glob(os.path.join(output, '*{}_1.7e+05_s/*.pickle'.format(name)))

if len(results_1000s) != 1 or len(results_2d) != 1:
print('Results pickle files not found. Check output location and re-run')
sys.exit()

with open(results_1000s[0], 'rb') as f:
analysis_1000 = pd.read_pickle(f)
with open(results_2d[0], 'rb') as fi:
analysis_2d = pd.read_pickle(fi)

update_internal_public(analysis_1000, analysis_2d,
alert_gcn=args.alert_gcn, fra_circular=args.fra_circular)
print('done.')
Loading

0 comments on commit 3bd9f61

Please sign in to comment.