Skip to content

Commit

Permalink
Merge pull request #13 from eqcorrscan/streamer-process
Browse files Browse the repository at this point in the history
Move Streamers from Threads to Processes
  • Loading branch information
calum-chamberlain authored Apr 21, 2021
2 parents aae9169 + c193d63 commit 1101a79
Show file tree
Hide file tree
Showing 21 changed files with 813 additions and 383 deletions.
4 changes: 2 additions & 2 deletions .github/test_conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ dependencies:
- h5py
- pyyaml
- pyfftw>=0.12.0
- pyproj<2
- EQcorrscan>=0.4.1
- pyproj>2.1
- EQcorrscan>=0.4.3
- urllib3>=1.24.3
- obsplus>=0.1.0
- pytest>=2.0.0
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/runtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ jobs:
export CI="true"
py.test -n 2 --dist=loadscope --cov-report=xml
# - name: run streamer tests
# shell: bash -l {0}
# run: |
# export CI="true"
# py.test tests/streaming_tests -v --cov-report=xml

- name: upload coverage
uses: codecov/codecov-action@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ obspy>=1.0.3
h5py
pyyaml
pyfftw>=0.12.0
pyproj<2
pyproj>2.1
codecov
EQcorrscan>=0.4.2
urllib3>=1.24.3
Expand All @@ -17,4 +17,4 @@ pytest-pep8
pytest-xdist
pytest-rerunfailures
pytest-mpl
pillow>=6.2.3 # not directly required, pinned by Snyk to avoid a vulnerability
pillow>=6.2.3 # not directly required, pinned by Snyk to avoid a vulnerability
9 changes: 1 addition & 8 deletions rt_eqcorrscan/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class RTMatchFilterConfig(_ConfigAttribDict):
"save_waveforms": True,
"plot_detections": False,
"max_correlation_cores": None,
"local_wave_bank": None,
}
readonly = []

Expand Down Expand Up @@ -126,7 +127,6 @@ class StreamingConfig(_ConfigAttribDict):
"rt_client_url": "link.geonet.org.nz",
"rt_client_type": "seedlink",
"buffer_capacity": 300.,
"local_wave_bank": None,
}
readonly = []
rt_client_base = "rt_eqcorrscan.streaming.clients"
Expand All @@ -135,12 +135,6 @@ class StreamingConfig(_ConfigAttribDict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

@property
def _local_wave_bank(self):
if isinstance(self.local_wave_bank, str):
return WaveBank(self.local_wave_bank)
return None

@property
def rt_client_module(self):
return importlib.import_module(
Expand All @@ -167,7 +161,6 @@ def get_streaming_client(self):
rt_client = _client_module.RealTimeClient(
server_url=self.rt_client_url,
buffer_capacity=self.buffer_capacity,
wavebank=self._local_wave_bank,
**kwargs)
except Exception as e:
Logger.error(e)
Expand Down
3 changes: 2 additions & 1 deletion rt_eqcorrscan/console_scripts/real_time_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def run_real_time_matched_filter(**kwargs):
tribe=tribe, inventory=inventory, rt_client=rt_client,
detect_interval=config.rt_match_filter.detect_interval,
plot=config.rt_match_filter.plot, name=tribe_name,
plot_options=config.plot)
plot_options=config.plot,
wavebank=config.rt_match_filter.local_wave_bank)
try:
real_time_tribe._speed_up = config.streaming.speed_up
except AttributeError:
Expand Down
8 changes: 6 additions & 2 deletions rt_eqcorrscan/event_trigger/catalog_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,12 @@ def run(
else:
self.template_bank.put_events(new_events)
# Putting the events in the bank clears the catalog.
self.extend(event_info)
Logger.debug("Old events current state: {0}".format(
Logger.info(f"Putting events into old_events: \n{event_info}")
# Try looping... extend seems unstable?
for ev_info in event_info:
self.append(ev_info)
# self.extend(event_info)
Logger.info("Old events current state: {0}".format(
self.old_events))
self.previous_time = now
# Sleep in steps to make death responsive
Expand Down
10 changes: 8 additions & 2 deletions rt_eqcorrscan/event_trigger/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def extend(self, events: Union[EventInfo, List[EventInfo]]):
with self.lock:
self._old_events.extend(events)

def append(self, other: EventInfo):
assert isinstance(other, EventInfo)
with self.lock:
self._old_events.append(other)

def _remove_old_events(self, endtime: UTCDateTime) -> None:
"""
Expire old events from the cache.
Expand All @@ -72,10 +77,11 @@ def _remove_old_events(self, endtime: UTCDateTime) -> None:
if len(self.old_events) == 0:
return
time_diffs = np.array([endtime - tup[1] for tup in self.old_events])
filt = time_diffs <= self.keep
filt = time_diffs >= self.keep
# Need to remove in-place, without creating a new list
for i, old_event in enumerate(list(self.old_events)):
if not filt[i]:
if filt[i]:
Logger.info(f"Removing event {old_event} which is {time_diffs[i]} old, older than {self.keep}")
self.remove_old_event(old_event)

def background_run(self, *args, **kwargs):
Expand Down
21 changes: 15 additions & 6 deletions rt_eqcorrscan/event_trigger/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Standardised trigger functions for use with a Reactor.
"""
import logging
import numpy as np
from typing import Union, List, Optional

from obspy import UTCDateTime
Expand Down Expand Up @@ -42,6 +43,14 @@ def get_nearby_events(
return sub_catalog


def _event_magnitude(event):
try:
magnitude = event.preferred_magnitude() or event.magnitudes[-1]
except IndexError:
return -9
return magnitude.mag


def magnitude_rate_trigger_func(
catalog: Catalog,
magnitude_threshold: float = 5.5,
Expand Down Expand Up @@ -70,12 +79,12 @@ def magnitude_rate_trigger_func(
The events that forced the trigger.
"""
trigger_events = Catalog()
for event in catalog:
try:
magnitude = event.preferred_magnitude() or event.magnitudes[0]
except IndexError:
continue
if magnitude.mag >= magnitude_threshold:
# Sort by magnitude
magnitudes = np.array([_event_magnitude(ev) for ev in catalog])
sorted_indices = np.argsort(magnitudes)[::-1]
for event_index in sorted_indices:
event = catalog[event_index]
if magnitudes[event_index] >= magnitude_threshold:
trigger_events.events.append(event)
for event in catalog:
sub_catalog = get_nearby_events(event, catalog, radius=rate_bin)
Expand Down
35 changes: 22 additions & 13 deletions rt_eqcorrscan/plotting/plot_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import datetime as dt
import asyncio

from pyproj import Proj, transform
from pyproj import Proj, Transformer

from bokeh.document import Document
from bokeh.plotting import figure
Expand Down Expand Up @@ -195,6 +195,7 @@ def define_plot(
will be real-time
"""
# Set up the data source
Logger.info("Getting stream to define plot")
stream = rt_client.stream.copy().split().detrend()
if lowcut and highcut:
stream.filter("bandpass", freqmin=lowcut, freqmax=highcut)
Expand All @@ -208,6 +209,7 @@ def define_plot(
else:
title = "Raw streaming data"
stream.merge()
Logger.info(f"Have the stream: \n{stream}")

template_lats, template_lons, template_alphas, template_ids = (
[], [], [], [])
Expand All @@ -218,20 +220,21 @@ def define_plot(
except IndexError:
continue
template_lats.append(origin.latitude)
template_lons.append(origin.longitude)
template_lons.append(origin.longitude % 360)
template_alphas.append(0)
template_ids.append(template.event.resource_id.id.split("/")[-1])

station_lats, station_lons, station_ids = ([], [], [])
for network in inventory:
for station in network:
station_lats.append(station.latitude)
station_lons.append(station.longitude)
station_lons.append(station.longitude % 360)
station_ids.append(station.code)

# Get plot bounds in web mercator
wgs_84 = Proj(init='epsg:4326')
wm = Proj(init='epsg:3857')
Logger.info("Defining map")
transformer = Transformer.from_crs(
"epsg:4326", "epsg:3857", always_xy=True)
try:
min_lat, min_lon, max_lat, max_lon = (
min(template_lats + station_lats),
Expand All @@ -242,20 +245,21 @@ def define_plot(
Logger.error(e)
Logger.info("Setting map bounds to NZ")
min_lat, min_lon, max_lat, max_lon = (-47., 165., -34., 179.9)
bottom_left = transform(wgs_84, wm, min_lon, min_lat)
top_right = transform(wgs_84, wm, max_lon, max_lat)
Logger.info(f"Map bounds: {min_lon}, {min_lat} - {max_lon}, {max_lat}")
bottom_left = transformer.transform(min_lon, min_lat)
top_right = transformer.transform(max_lon, max_lat)
map_x_range = (bottom_left[0], top_right[0])
map_y_range = (bottom_left[1], top_right[1])

template_x, template_y = ([], [])
for lon, lat in zip(template_lons, template_lats):
_x, _y = transform(wgs_84, wm, lon, lat)
_x, _y = transformer.transform(lon, lat)
template_x.append(_x)
template_y.append(_y)

station_x, station_y = ([], [])
for lon, lat in zip(station_lons, station_lats):
_x, _y = transform(wgs_84, wm, lon, lat)
_x, _y = transformer.transform(lon, lat)
station_x.append(_x)
station_y.append(_y)

Expand All @@ -267,6 +271,7 @@ def define_plot(
'y': station_y, 'x': station_x,
'lats': station_lats, 'lons': station_lons, 'id': station_ids})

Logger.info("Allocated data sources")
trace_sources = {}
trace_data_range = {}
# Allocate empty arrays
Expand All @@ -282,6 +287,7 @@ def define_plot(
trace_data_range.update({channel: (data.min(), data.max())})

# Set up the map to go on the left side
Logger.info("Adding features to map")
map_plot = figure(
title="Template map", x_range=map_x_range, y_range=map_y_range,
x_axis_type="mercator", y_axis_type="mercator", **map_options)
Expand All @@ -296,6 +302,7 @@ def define_plot(
x="x", y="y", size=10, source=station_source, color="blue", alpha=1.0)

# Set up the trace plots
Logger.info("Setting up streaming plot")
trace_plots = []
if not offline:
now = dt.datetime.utcnow()
Expand Down Expand Up @@ -327,6 +334,7 @@ def define_plot(
p1.xaxis.formatter = datetick_formatter

# Add detection lines
Logger.info("Adding detection artists")
detection_source = _get_pick_times(detections, channels[0])
detection_source.update(
{"pick_values": [[
Expand Down Expand Up @@ -437,10 +445,11 @@ def update():
_update_template_alphas(
detections, tribe, decay=plot_length, now=now,
datastream=template_source)

Logger.info("Adding callback")
doc.add_periodic_callback(update, update_interval)
doc.title = "EQcorrscan Real-time plotter"
doc.add_root(plots)
Logger.info("Plot defined")


def _update_template_alphas(
Expand All @@ -464,8 +473,8 @@ def _update_template_alphas(
datastream
Data stream to update
"""
wgs_84 = Proj(init='epsg:4326')
wm = Proj(init='epsg:3857')
transformer = Transformer.from_crs(
"epsg:4326", "epsg:3857", always_xy=True)
template_lats, template_lons, template_alphas, template_ids = (
[], [], [], [])
template_x, template_y = ([], [])
Expand All @@ -479,7 +488,7 @@ def _update_template_alphas(
template_lons.append(origin.longitude)

template_ids.append(template.event.resource_id.id.split("/")[-1])
_x, _y = transform(wgs_84, wm, origin.longitude, origin.latitude)
_x, _y = transformer.transform(origin.longitude, origin.latitude)
template_x.append(_x)
template_y.append(_y)
template_detections = [
Expand Down
33 changes: 31 additions & 2 deletions rt_eqcorrscan/reactor/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,44 @@ def run(self, max_run_length: float = None) -> None:
max_run_length
Maximum run length in seconds, if None, will run indefinitely.
"""
# Get the original keep value - we will over-write this temporarily
listener_keep = deepcopy(self.listener.keep)
if self._listener_kwargs.get("starttime"):
# Set to keep at least until the starttime
_keep_len = (UTCDateTime.now() -
self._listener_kwargs["starttime"]) + 3600
self.listener.keep = max(_keep_len, listener_keep)
Logger.info(f"Setting listener keep to {self.listener.keep}")
# Try a get and put to make sure that threads have the same memory space...
old_events = self.listener.old_events
self.listener.old_events = old_events
# Run the listener!
self.listener.background_run(**self._listener_kwargs)
self._run_start = UTCDateTime.now()
# Query the catalog in the listener every so often and check
self._running = True
first_iteration = True
while self._running:
old_events = deepcopy(self.listener.old_events)
Logger.info(f"Old events from the listener has {len(old_events)} events")
# Get these locally to avoid accessing shared memory multiple times
if len(old_events) > 0:
working_ids = list(zip(*old_events))[0]
working_ids = [_[0] for _ in old_events]
working_cat = self.template_database.get_events(
eventid=working_ids)
if len(working_ids) and not len(working_cat):
Logger.warning("Error getting events from database, getting individually")
working_cat = Catalog()
for working_id in working_ids:
try:
working_cat += self.template_database.get_events(
eventid=working_id)
except Exception as e:
Logger.error(f"Could not read {working_id} due to {e}")
continue
else:
working_cat = []
Logger.debug("Currently analysing a catalog of {0} events".format(
Logger.info("Currently analysing a catalog of {0} events".format(
len(working_cat)))
self.process_new_events(new_events=working_cat)
Logger.debug("Finished processing new events")
Expand All @@ -176,6 +200,11 @@ def run(self, max_run_length: float = None) -> None:
Logger.debug(f"Sleeping for {self.sleep_step} seconds")
time.sleep(self.sleep_step)
Logger.debug("Waking up")
if first_iteration and len(working_cat):
# Revert keep to original value
Logger.info(f"Reverting keep to {listener_keep}")
self.listener.keep = listener_keep
first_iteration = False

def check_running_tribes(self) -> None:
"""
Expand Down
Loading

0 comments on commit 1101a79

Please sign in to comment.