Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
calum-chamberlain committed Apr 21, 2021
2 parents eafa0ca + 8ca2459 commit c193d63
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 156 deletions.
4 changes: 2 additions & 2 deletions .github/test_conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ dependencies:
- h5py
- pyyaml
- pyfftw>=0.12.0
- pyproj<2
- EQcorrscan>=0.4.1
- pyproj>2.1
- EQcorrscan>=0.4.3
- urllib3>=1.24.3
- obsplus>=0.1.0
- pytest>=2.0.0
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/runtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ jobs:
export CI="true"
py.test -n 2 --dist=loadscope --cov-report=xml
# - name: run streamer tests
# shell: bash -l {0}
# run: |
# export CI="true"
# py.test tests/streaming_tests -v --cov-report=xml

- name: upload coverage
uses: codecov/codecov-action@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ obspy>=1.0.3
h5py
pyyaml
pyfftw>=0.12.0
pyproj<2
pyproj>2.1
codecov
EQcorrscan>=0.4.2
urllib3>=1.24.3
Expand All @@ -17,4 +17,4 @@ pytest-pep8
pytest-xdist
pytest-rerunfailures
pytest-mpl
pillow>=6.2.3 # not directly required, pinned by Snyk to avoid a vulnerability
pillow>=6.2.3 # not directly required, pinned by Snyk to avoid a vulnerability
35 changes: 22 additions & 13 deletions rt_eqcorrscan/plotting/plot_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import datetime as dt
import asyncio

from pyproj import Proj, transform
from pyproj import Proj, Transformer

from bokeh.document import Document
from bokeh.plotting import figure
Expand Down Expand Up @@ -195,6 +195,7 @@ def define_plot(
will be real-time
"""
# Set up the data source
Logger.info("Getting stream to define plot")
stream = rt_client.stream.copy().split().detrend()
if lowcut and highcut:
stream.filter("bandpass", freqmin=lowcut, freqmax=highcut)
Expand All @@ -208,6 +209,7 @@ def define_plot(
else:
title = "Raw streaming data"
stream.merge()
Logger.info(f"Have the stream: \n{stream}")

template_lats, template_lons, template_alphas, template_ids = (
[], [], [], [])
Expand All @@ -218,20 +220,21 @@ def define_plot(
except IndexError:
continue
template_lats.append(origin.latitude)
template_lons.append(origin.longitude)
template_lons.append(origin.longitude % 360)
template_alphas.append(0)
template_ids.append(template.event.resource_id.id.split("/")[-1])

station_lats, station_lons, station_ids = ([], [], [])
for network in inventory:
for station in network:
station_lats.append(station.latitude)
station_lons.append(station.longitude)
station_lons.append(station.longitude % 360)
station_ids.append(station.code)

# Get plot bounds in web mercator
wgs_84 = Proj(init='epsg:4326')
wm = Proj(init='epsg:3857')
Logger.info("Defining map")
transformer = Transformer.from_crs(
"epsg:4326", "epsg:3857", always_xy=True)
try:
min_lat, min_lon, max_lat, max_lon = (
min(template_lats + station_lats),
Expand All @@ -242,20 +245,21 @@ def define_plot(
Logger.error(e)
Logger.info("Setting map bounds to NZ")
min_lat, min_lon, max_lat, max_lon = (-47., 165., -34., 179.9)
bottom_left = transform(wgs_84, wm, min_lon, min_lat)
top_right = transform(wgs_84, wm, max_lon, max_lat)
Logger.info(f"Map bounds: {min_lon}, {min_lat} - {max_lon}, {max_lat}")
bottom_left = transformer.transform(min_lon, min_lat)
top_right = transformer.transform(max_lon, max_lat)
map_x_range = (bottom_left[0], top_right[0])
map_y_range = (bottom_left[1], top_right[1])

template_x, template_y = ([], [])
for lon, lat in zip(template_lons, template_lats):
_x, _y = transform(wgs_84, wm, lon, lat)
_x, _y = transformer.transform(lon, lat)
template_x.append(_x)
template_y.append(_y)

station_x, station_y = ([], [])
for lon, lat in zip(station_lons, station_lats):
_x, _y = transform(wgs_84, wm, lon, lat)
_x, _y = transformer.transform(lon, lat)
station_x.append(_x)
station_y.append(_y)

Expand All @@ -267,6 +271,7 @@ def define_plot(
'y': station_y, 'x': station_x,
'lats': station_lats, 'lons': station_lons, 'id': station_ids})

Logger.info("Allocated data sources")
trace_sources = {}
trace_data_range = {}
# Allocate empty arrays
Expand All @@ -282,6 +287,7 @@ def define_plot(
trace_data_range.update({channel: (data.min(), data.max())})

# Set up the map to go on the left side
Logger.info("Adding features to map")
map_plot = figure(
title="Template map", x_range=map_x_range, y_range=map_y_range,
x_axis_type="mercator", y_axis_type="mercator", **map_options)
Expand All @@ -296,6 +302,7 @@ def define_plot(
x="x", y="y", size=10, source=station_source, color="blue", alpha=1.0)

# Set up the trace plots
Logger.info("Setting up streaming plot")
trace_plots = []
if not offline:
now = dt.datetime.utcnow()
Expand Down Expand Up @@ -327,6 +334,7 @@ def define_plot(
p1.xaxis.formatter = datetick_formatter

# Add detection lines
Logger.info("Adding detection artists")
detection_source = _get_pick_times(detections, channels[0])
detection_source.update(
{"pick_values": [[
Expand Down Expand Up @@ -437,10 +445,11 @@ def update():
_update_template_alphas(
detections, tribe, decay=plot_length, now=now,
datastream=template_source)

Logger.info("Adding callback")
doc.add_periodic_callback(update, update_interval)
doc.title = "EQcorrscan Real-time plotter"
doc.add_root(plots)
Logger.info("Plot defined")


def _update_template_alphas(
Expand All @@ -464,8 +473,8 @@ def _update_template_alphas(
datastream
Data stream to update
"""
wgs_84 = Proj(init='epsg:4326')
wm = Proj(init='epsg:3857')
transformer = Transformer.from_crs(
"epsg:4326", "epsg:3857", always_xy=True)
template_lats, template_lons, template_alphas, template_ids = (
[], [], [], [])
template_x, template_y = ([], [])
Expand All @@ -479,7 +488,7 @@ def _update_template_alphas(
template_lons.append(origin.longitude)

template_ids.append(template.event.resource_id.id.split("/")[-1])
_x, _y = transform(wgs_84, wm, origin.longitude, origin.latitude)
_x, _y = transformer.transform(origin.longitude, origin.latitude)
template_x.append(_x)
template_y.append(_y)
template_detections = [
Expand Down
50 changes: 39 additions & 11 deletions rt_eqcorrscan/rt_match_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,16 @@ def _start_streaming(self):
else:
Logger.info("Real-time streaming already running")

def _runtime_check(self, run_start, max_run_length):
run_time = UTCDateTime.now() - run_start
Logger.info(
f"Run time: {run_time:.2f}s, max_run_length: {max_run_length:.2f}s")
if max_run_length and run_time > max_run_length:
Logger.critical("Hit maximum run time, stopping.")
self.stop()
return False
return True

def run(
self,
threshold: float,
Expand Down Expand Up @@ -598,8 +608,14 @@ def run(
start_time = UTCDateTime.now()
st = self.rt_client.stream.split().merge()
if self.has_wavebank:
self._access_wavebank(
method="put_waveforms", timeout=120., stream=st)
st = _check_stream_is_int(st)
try:
self._access_wavebank(
method="put_waveforms", timeout=120.,
stream=st)
except Exception as e:
Logger.error(
f"Could not write to wavebank due to {e}")
last_data_received = self.rt_client.last_data
# Split to remove trailing mask
if len(st) == 0:
Expand Down Expand Up @@ -673,6 +689,9 @@ def run(
Logger.error("Out of memory, stopping this detector")
self.stop()
break
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
Logger.info(
"Waiting for {0:.2f}s and hoping this gets "
"better".format(self.detect_interval))
Expand Down Expand Up @@ -709,9 +728,8 @@ def run(
self._wait(
wait=(self.detect_interval - run_time) / self._speed_up,
detection_kwargs=detection_kwargs)
if max_run_length and UTCDateTime.now() > run_start + max_run_length:
Logger.critical("Hit maximum run time, stopping.")
self.stop()
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
if minimum_rate and UTCDateTime.now() > run_start + self._min_run_length:
_rate = average_rate(
Expand All @@ -732,6 +750,9 @@ def run(
except Exception as e:
Logger.critical(f"Uncaught error: {e}")
Logger.error(traceback.format_exc())
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
finally:
Logger.critical("Stopping")
self.stop()
Expand Down Expand Up @@ -1226,19 +1247,26 @@ def _write_detection(
Logger.error(f"Could not write plot due to {e}")
fig.clf()
if save_waveform:
st = st.split()
for tr in st:
if tr.data.dtype == numpy.int32 and \
tr.data.dtype.type != numpy.int32:
# Ensure data are int32, see https://github.com/obspy/obspy/issues/2683
tr.data = tr.data.astype(numpy.int32)
st = _check_stream_is_int(st)
try:
st.write(f"{_filename}.ms", format="MSEED")
except Exception as e:
Logger.error(f"Could not write stream due to {e}")
return fig


def _check_stream_is_int(st):
st = st.split()
for tr in st:
# Ensure data are int32, see https://github.com/obspy/obspy/issues/2683
if tr.data.dtype == numpy.int32 and \
tr.data.dtype.type != numpy.int32:
tr.data = tr.data.astype(numpy.int32, subok=False)
if tr.data.dtype.type == numpy.intc:
tr.data = tr.data.astype(numpy.int32, subok=False)
return st


def _numpy_len(arr: Union[numpy.ndarray, numpy.ma.MaskedArray]) -> int:
"""
Convenience function to return the length of a numpy array.
Expand Down
8 changes: 4 additions & 4 deletions rt_eqcorrscan/streaming/clients/obspy.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ def run(self) -> None:
kill = self._killer_queue.get(block=False)
except Empty:
kill = False
Logger.info(f"Kill status: {kill}")
if kill:
Logger.warning("Termination called, stopping collect loop")
self.on_terminate()
return
break
_query_start = UTCDateTime.now()
st = Stream()
query_passed = True
Expand Down Expand Up @@ -175,9 +176,8 @@ def run(self) -> None:
return

def stop(self) -> None:
self._stop_called = True
self.streaming = False
self.started = False
Logger.info("STOP!")
self._stop_called, self.started = True, False


if __name__ == "__main__":
Expand Down
7 changes: 4 additions & 3 deletions rt_eqcorrscan/streaming/clients/seedlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ def run(self):
kill = False
if kill:
Logger.warning(
"Run termination called - poison received is set.")
"Run termination called - poison received.")
self.on_terminate()
return
self._stop_called = True
break

if data == SLPacket.SLTERMINATE:
Logger.warning("Received Terminate request from host")
Expand Down Expand Up @@ -145,6 +146,7 @@ def run(self):

# If we get to here, stop has been called so we can terminate
self.on_terminate()
self.streaming = False
return

def copy(self, empty_buffer: bool = True):
Expand Down Expand Up @@ -215,7 +217,6 @@ def stop(self) -> None:
Logger.info("Closing connection")
self.close()
Logger.info("Stopped Streamer")
self.streaming = False

def on_seedlink_error(self): # pragma: no cover
""" Cope with seedlink errors."""
Expand Down
Loading

0 comments on commit c193d63

Please sign in to comment.