diff --git a/.github/test_conda_env.yml b/.github/test_conda_env.yml index 5f7b15e..7f993c4 100644 --- a/.github/test_conda_env.yml +++ b/.github/test_conda_env.yml @@ -11,8 +11,8 @@ dependencies: - h5py - pyyaml - pyfftw>=0.12.0 - - pyproj<2 - - EQcorrscan>=0.4.1 + - pyproj>2.1 + - EQcorrscan>=0.4.3 - urllib3>=1.24.3 - obsplus>=0.1.0 - pytest>=2.0.0 diff --git a/.github/workflows/runtests.yml b/.github/workflows/runtests.yml index cdd1b5e..f3c2544 100644 --- a/.github/workflows/runtests.yml +++ b/.github/workflows/runtests.yml @@ -48,6 +48,12 @@ jobs: export CI="true" py.test -n 2 --dist=loadscope --cov-report=xml +# - name: run streamer tests +# shell: bash -l {0} +# run: | +# export CI="true" +# py.test tests/streaming_tests -v --cov-report=xml + - name: upload coverage uses: codecov/codecov-action@v1 with: diff --git a/requirements.txt b/requirements.txt index 132de75..4e261c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ obspy>=1.0.3 h5py pyyaml pyfftw>=0.12.0 -pyproj<2 +pyproj>2.1 codecov EQcorrscan>=0.4.2 urllib3>=1.24.3 @@ -17,4 +17,4 @@ pytest-pep8 pytest-xdist pytest-rerunfailures pytest-mpl -pillow>=6.2.3 # not directly required, pinned by Snyk to avoid a vulnerability \ No newline at end of file +pillow>=6.2.3 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/rt_eqcorrscan/plotting/plot_buffer.py b/rt_eqcorrscan/plotting/plot_buffer.py index a32dc72..b249091 100644 --- a/rt_eqcorrscan/plotting/plot_buffer.py +++ b/rt_eqcorrscan/plotting/plot_buffer.py @@ -7,7 +7,7 @@ import datetime as dt import asyncio -from pyproj import Proj, transform +from pyproj import Proj, Transformer from bokeh.document import Document from bokeh.plotting import figure @@ -195,6 +195,7 @@ def define_plot( will be real-time """ # Set up the data source + Logger.info("Getting stream to define plot") stream = rt_client.stream.copy().split().detrend() if lowcut and highcut: stream.filter("bandpass", freqmin=lowcut, freqmax=highcut) @@ -208,6 +209,7 @@ def define_plot( else: title = "Raw streaming data" stream.merge() + Logger.info(f"Have the stream: \n{stream}") template_lats, template_lons, template_alphas, template_ids = ( [], [], [], []) @@ -218,7 +220,7 @@ def define_plot( except IndexError: continue template_lats.append(origin.latitude) - template_lons.append(origin.longitude) + template_lons.append(origin.longitude % 360) template_alphas.append(0) template_ids.append(template.event.resource_id.id.split("/")[-1]) @@ -226,12 +228,13 @@ def define_plot( for network in inventory: for station in network: station_lats.append(station.latitude) - station_lons.append(station.longitude) + station_lons.append(station.longitude % 360) station_ids.append(station.code) # Get plot bounds in web mercator - wgs_84 = Proj(init='epsg:4326') - wm = Proj(init='epsg:3857') + Logger.info("Defining map") + transformer = Transformer.from_crs( + "epsg:4326", "epsg:3857", always_xy=True) try: min_lat, min_lon, max_lat, max_lon = ( min(template_lats + station_lats), @@ -242,20 +245,21 @@ def define_plot( Logger.error(e) Logger.info("Setting map bounds to NZ") min_lat, min_lon, max_lat, max_lon = (-47., 165., -34., 179.9) - bottom_left = transform(wgs_84, wm, min_lon, min_lat) - top_right = transform(wgs_84, wm, max_lon, max_lat) + Logger.info(f"Map bounds: {min_lon}, {min_lat} - {max_lon}, {max_lat}") + bottom_left = transformer.transform(min_lon, min_lat) + top_right = transformer.transform(max_lon, max_lat) map_x_range = (bottom_left[0], top_right[0]) map_y_range = (bottom_left[1], top_right[1]) template_x, template_y = ([], []) for lon, lat in zip(template_lons, template_lats): - _x, _y = transform(wgs_84, wm, lon, lat) + _x, _y = transformer.transform(lon, lat) template_x.append(_x) template_y.append(_y) station_x, station_y = ([], []) for lon, lat in zip(station_lons, station_lats): - _x, _y = transform(wgs_84, wm, lon, lat) + _x, _y = transformer.transform(lon, lat) station_x.append(_x) station_y.append(_y) @@ -267,6 +271,7 @@ def define_plot( 'y': station_y, 'x': station_x, 'lats': station_lats, 'lons': station_lons, 'id': station_ids}) + Logger.info("Allocated data sources") trace_sources = {} trace_data_range = {} # Allocate empty arrays @@ -282,6 +287,7 @@ def define_plot( trace_data_range.update({channel: (data.min(), data.max())}) # Set up the map to go on the left side + Logger.info("Adding features to map") map_plot = figure( title="Template map", x_range=map_x_range, y_range=map_y_range, x_axis_type="mercator", y_axis_type="mercator", **map_options) @@ -296,6 +302,7 @@ def define_plot( x="x", y="y", size=10, source=station_source, color="blue", alpha=1.0) # Set up the trace plots + Logger.info("Setting up streaming plot") trace_plots = [] if not offline: now = dt.datetime.utcnow() @@ -327,6 +334,7 @@ def define_plot( p1.xaxis.formatter = datetick_formatter # Add detection lines + Logger.info("Adding detection artists") detection_source = _get_pick_times(detections, channels[0]) detection_source.update( {"pick_values": [[ @@ -437,10 +445,11 @@ def update(): _update_template_alphas( detections, tribe, decay=plot_length, now=now, datastream=template_source) - + Logger.info("Adding callback") doc.add_periodic_callback(update, update_interval) doc.title = "EQcorrscan Real-time plotter" doc.add_root(plots) + Logger.info("Plot defined") def _update_template_alphas( @@ -464,8 +473,8 @@ def _update_template_alphas( datastream Data stream to update """ - wgs_84 = Proj(init='epsg:4326') - wm = Proj(init='epsg:3857') + transformer = Transformer.from_crs( + "epsg:4326", "epsg:3857", always_xy=True) template_lats, template_lons, template_alphas, template_ids = ( [], [], [], []) template_x, template_y = ([], []) @@ -479,7 +488,7 @@ def _update_template_alphas( template_lons.append(origin.longitude) template_ids.append(template.event.resource_id.id.split("/")[-1]) - _x, _y = transform(wgs_84, wm, origin.longitude, origin.latitude) + _x, _y = transformer.transform(origin.longitude, origin.latitude) template_x.append(_x) template_y.append(_y) template_detections = [ diff --git a/rt_eqcorrscan/rt_match_filter.py b/rt_eqcorrscan/rt_match_filter.py index 816463e..3687b03 100644 --- a/rt_eqcorrscan/rt_match_filter.py +++ b/rt_eqcorrscan/rt_match_filter.py @@ -426,6 +426,16 @@ def _start_streaming(self): else: Logger.info("Real-time streaming already running") + def _runtime_check(self, run_start, max_run_length): + run_time = UTCDateTime.now() - run_start + Logger.info( + f"Run time: {run_time:.2f}s, max_run_length: {max_run_length:.2f}s") + if max_run_length and run_time > max_run_length: + Logger.critical("Hit maximum run time, stopping.") + self.stop() + return False + return True + def run( self, threshold: float, @@ -598,8 +608,14 @@ def run( start_time = UTCDateTime.now() st = self.rt_client.stream.split().merge() if self.has_wavebank: - self._access_wavebank( - method="put_waveforms", timeout=120., stream=st) + st = _check_stream_is_int(st) + try: + self._access_wavebank( + method="put_waveforms", timeout=120., + stream=st) + except Exception as e: + Logger.error( + f"Could not write to wavebank due to {e}") last_data_received = self.rt_client.last_data # Split to remove trailing mask if len(st) == 0: @@ -673,6 +689,9 @@ def run( Logger.error("Out of memory, stopping this detector") self.stop() break + if not self._runtime_check( + run_start=run_start, max_run_length=max_run_length): + break Logger.info( "Waiting for {0:.2f}s and hoping this gets " "better".format(self.detect_interval)) @@ -709,9 +728,8 @@ def run( self._wait( wait=(self.detect_interval - run_time) / self._speed_up, detection_kwargs=detection_kwargs) - if max_run_length and UTCDateTime.now() > run_start + max_run_length: - Logger.critical("Hit maximum run time, stopping.") - self.stop() + if not self._runtime_check( + run_start=run_start, max_run_length=max_run_length): break if minimum_rate and UTCDateTime.now() > run_start + self._min_run_length: _rate = average_rate( @@ -732,6 +750,9 @@ def run( except Exception as e: Logger.critical(f"Uncaught error: {e}") Logger.error(traceback.format_exc()) + if not self._runtime_check( + run_start=run_start, max_run_length=max_run_length): + break finally: Logger.critical("Stopping") self.stop() @@ -1226,12 +1247,7 @@ def _write_detection( Logger.error(f"Could not write plot due to {e}") fig.clf() if save_waveform: - st = st.split() - for tr in st: - if tr.data.dtype == numpy.int32 and \ - tr.data.dtype.type != numpy.int32: - # Ensure data are int32, see https://github.com/obspy/obspy/issues/2683 - tr.data = tr.data.astype(numpy.int32) + st = _check_stream_is_int(st) try: st.write(f"{_filename}.ms", format="MSEED") except Exception as e: @@ -1239,6 +1255,18 @@ def _write_detection( return fig +def _check_stream_is_int(st): + st = st.split() + for tr in st: + # Ensure data are int32, see https://github.com/obspy/obspy/issues/2683 + if tr.data.dtype == numpy.int32 and \ + tr.data.dtype.type != numpy.int32: + tr.data = tr.data.astype(numpy.int32, subok=False) + if tr.data.dtype.type == numpy.intc: + tr.data = tr.data.astype(numpy.int32, subok=False) + return st + + def _numpy_len(arr: Union[numpy.ndarray, numpy.ma.MaskedArray]) -> int: """ Convenience function to return the length of a numpy array. diff --git a/rt_eqcorrscan/streaming/clients/obspy.py b/rt_eqcorrscan/streaming/clients/obspy.py index c0f0d66..cad74dd 100644 --- a/rt_eqcorrscan/streaming/clients/obspy.py +++ b/rt_eqcorrscan/streaming/clients/obspy.py @@ -134,10 +134,11 @@ def run(self) -> None: kill = self._killer_queue.get(block=False) except Empty: kill = False + Logger.info(f"Kill status: {kill}") if kill: Logger.warning("Termination called, stopping collect loop") self.on_terminate() - return + break _query_start = UTCDateTime.now() st = Stream() query_passed = True @@ -175,9 +176,8 @@ def run(self) -> None: return def stop(self) -> None: - self._stop_called = True - self.streaming = False - self.started = False + Logger.info("STOP!") + self._stop_called, self.started = True, False if __name__ == "__main__": diff --git a/rt_eqcorrscan/streaming/clients/seedlink.py b/rt_eqcorrscan/streaming/clients/seedlink.py index c4b32ef..31c2ed2 100644 --- a/rt_eqcorrscan/streaming/clients/seedlink.py +++ b/rt_eqcorrscan/streaming/clients/seedlink.py @@ -111,9 +111,10 @@ def run(self): kill = False if kill: Logger.warning( - "Run termination called - poison received is set.") + "Run termination called - poison received.") self.on_terminate() - return + self._stop_called = True + break if data == SLPacket.SLTERMINATE: Logger.warning("Received Terminate request from host") @@ -145,6 +146,7 @@ def run(self): # If we get to here, stop has been called so we can terminate self.on_terminate() + self.streaming = False return def copy(self, empty_buffer: bool = True): @@ -215,7 +217,6 @@ def stop(self) -> None: Logger.info("Closing connection") self.close() Logger.info("Stopped Streamer") - self.streaming = False def on_seedlink_error(self): # pragma: no cover """ Cope with seedlink errors.""" diff --git a/rt_eqcorrscan/streaming/streaming.py b/rt_eqcorrscan/streaming/streaming.py index 38939f8..ae7b5c5 100644 --- a/rt_eqcorrscan/streaming/streaming.py +++ b/rt_eqcorrscan/streaming/streaming.py @@ -8,9 +8,10 @@ GPL v3.0 """ -import multiprocessing import logging +import time import numpy as np +import warnings from abc import ABC, abstractmethod from typing import Union @@ -20,6 +21,18 @@ from rt_eqcorrscan.streaming.buffers import Buffer +import platform +if platform.system() != "Linux": + warnings.warn("Currently Process-based streaming is only supported on " + "Linux, defaulting to Thread-based streaming - you may run " + "into issues when detecting frequently") + import threading as multiprocessing + from queue import Queue + from threading import Thread as Process +else: + import multiprocessing + from multiprocessing import Queue, Process + Logger = logging.getLogger(__name__) @@ -65,16 +78,17 @@ def __init__( # Queues for communication # Outgoing data - self._stream_queue = multiprocessing.Queue(maxsize=1) + self._stream_queue = Queue(maxsize=1) # Incoming data - no limit on size, just empty it! - self._incoming_queue = multiprocessing.Queue() + self._incoming_queue = Queue() # Quereyable attributes to get a view of the size of the buffer - self._last_data_queue = multiprocessing.Queue(maxsize=1) - self._buffer_full_queue = multiprocessing.Queue(maxsize=1) + self._last_data_queue = Queue(maxsize=1) + self._buffer_full_queue = Queue(maxsize=1) # Poison! - self._killer_queue = multiprocessing.Queue(maxsize=1) + self._killer_queue = Queue(maxsize=1) + self._dead_queue = Queue(maxsize=1) # Private attributes for properties self.__stream = Stream() @@ -161,11 +175,13 @@ def last_data(self, timestamp: UTCDateTime): try: self._last_data_queue.put(timestamp, block=False) except Full: + Logger.debug("_last_data is full") # Empty it! try: self._last_data_queue.get(block=False) except Empty: # Just in case the state changed... + Logger.debug("_last_data is empty :(") pass try: self._last_data_queue.put(timestamp, timeout=10) @@ -239,12 +255,6 @@ def restart(self): Disconnect and reconnect and restart the Streaming Client. """ - def _bg_run(self): - while self.streaming: - self.run() - Logger.info("Running stopped, streaming set to False") - return - def _clear_killer(self): """ Clear the killer Queue. """ while True: @@ -252,12 +262,28 @@ def _clear_killer(self): self._killer_queue.get(block=False) except Empty: break + while True: + try: + self._dead_queue.get(block=False) + except Empty: + break + + def _bg_run(self): + while self.streaming: + self.run() + Logger.info("Running stopped, busy set to False") + try: + self._dead_queue.get(block=False) + except Empty: + pass + self._dead_queue.put(True) + return def background_run(self): """Run the client in the background.""" self.streaming, self.started, self.can_add_streams = True, True, False self._clear_killer() # Clear the kill queue - streaming_process = multiprocessing.Process( + streaming_process = Process( target=self._bg_run, name="StreamProcess") # streaming_process.daemon = True streaming_process.start() @@ -275,21 +301,20 @@ def background_stop(self): Logger.debug(f"Stream on termination: {st}") self._killer_queue.put(True) self.stop() - for process in self.processes: - Logger.info("Joining process") - process.join(5) - if process.exitcode: - Logger.info("Process failed to join, terminating") - process.terminate() - Logger.info("Terminated") - process.join() - Logger.info("Process joined") - self.processes = [] - self.streaming = False # Local buffer for tr in st: - Logger.debug("Adding trace to local buffer") + Logger.info("Adding trace to local buffer") self.buffer.add_stream(tr) + # Wait until streaming has stopped + Logger.debug( + f"Waiting for streaming to stop: status = {self.streaming}") + while self.streaming: + try: + self.streaming = not self._dead_queue.get(block=False) + except Empty: + time.sleep(1) + pass + Logger.debug("Streaming stopped") # Empty queues for queue in [self._incoming_queue, self._stream_queue, self._killer_queue, self._last_data_queue]: @@ -298,6 +323,18 @@ def background_stop(self): queue.get(block=False) except Empty: break + # join the processes + for process in self.processes: + Logger.info("Joining process") + process.join(5) + if hasattr(process, 'exitcode') and process.exitcode: + Logger.info("Process failed to join, terminating") + process.terminate() + Logger.info("Terminated") + process.join() + Logger.info("Process joined") + self.processes = [] + self.streaming = False def on_data(self, trace: Trace): """ @@ -395,6 +432,13 @@ def on_error(): # pragma: no cover pass +# def _bg_run(streamer: _StreamingClient): +# Logger.debug(streamer) +# streamer.run() +# Logger.info("Running stopped, streaming set to False") +# return + + if __name__ == "__main__": import doctest diff --git a/tests/streaming_tests/long_streamer_test.py b/tests/streaming_tests/long_streamer_test.py index adb3399..96f1fe4 100644 --- a/tests/streaming_tests/long_streamer_test.py +++ b/tests/streaming_tests/long_streamer_test.py @@ -43,7 +43,6 @@ def setUpClass(cls): ("NZ", "SYZ", "HHZ"), ("NZ", "MQZ", "HHZ"), ] - cls.clients = [] def run_streamer(self, rt_client, logger): for net, sta, chan in self.selectors: @@ -51,32 +50,29 @@ def run_streamer(self, rt_client, logger): rt_client.background_run() sleepy_time = 0 - while sleepy_time <= RUN_LENGTH: - time.sleep(SLEEP_INTERVAL) - now = UTCDateTime.now() - st = rt_client.stream.split().merge() - logger.info(f"Currently have the stream: \n{st}") - for tr in st: - if np.ma.is_masked(tr.data): - # Check that the data are not super gappy. - self.assertLess(tr.data.mask.sum(), len(tr.data) / 4) - # Check that data are recent. - self.assertLess(abs(now - tr.stats.endtime), 20.0) - sleepy_time += SLEEP_INTERVAL - rt_client.background_stop() + try: + while sleepy_time <= RUN_LENGTH: + time.sleep(SLEEP_INTERVAL) + now = UTCDateTime.now() + st = rt_client.stream.split().merge() + logger.info(f"Currently (at {now}) have the stream: \n{st}") + for tr in st: + if np.ma.is_masked(tr.data): + # Check that the data are not super gappy. + self.assertLess(tr.data.mask.sum(), len(tr.data) / 4) + # Check that data are recent. + self.assertLess(abs(now - rt_client.last_data), 60.0) + self.assertLess(abs(now - tr.stats.endtime), 120.0) + sleepy_time += SLEEP_INTERVAL + finally: # We MUST stop the streamer even if we fail. + rt_client.background_stop() def test_no_wavebank(self): """ Run without a wavebank. """ logger = logging.getLogger("NoWaveBank") rt_client = self.rt_client.copy() - self.clients.append(rt_client) self.run_streamer(rt_client=rt_client, logger=logger) - @classmethod - def tearDownClass(cls): - for rt_client in cls.clients: - rt_client.background_stop() - if __name__ == "__main__": unittest.main() diff --git a/tests/streaming_tests/obspy_simulate_test.py b/tests/streaming_tests/obspy_simulate_test.py index 59b5dc9..9eed0b5 100644 --- a/tests/streaming_tests/obspy_simulate_test.py +++ b/tests/streaming_tests/obspy_simulate_test.py @@ -4,13 +4,18 @@ import unittest import time +import logging from obspy import UTCDateTime from obspy.clients.fdsn import Client from rt_eqcorrscan.streaming.clients.obspy import RealTimeClient -SLEEP_STEP = 60 +SLEEP_STEP = 20 + +logging.basicConfig( + level="INFO", + format="%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s") class FDSNTest(unittest.TestCase): @@ -24,22 +29,19 @@ def setUpClass(cls): def test_background_streaming(self): rt_client = self.rt_client.copy() - rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") + rt_client.select_stream(net="NZ", station="JCZ", selector="HHZ") rt_client.background_run() + try: + self.assertFalse(rt_client.buffer_full) + except Exception as e: + rt_client.background_stop() + raise e time.sleep(SLEEP_STEP) rt_client.background_stop() + self.assertTrue(rt_client.buffer_full) self.assertEqual(rt_client.buffer_length, rt_client.buffer_capacity) - def test_full_buffer(self): - rt_client = self.rt_client.copy() - rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") - rt_client.background_run() - self.assertFalse(rt_client.buffer_full) - time.sleep(SLEEP_STEP) - rt_client.background_stop() - self.assertTrue(rt_client.buffer_full) - def test_always_started(self): rt_client = self.rt_client.copy() rt_client.start() diff --git a/tests/streaming_tests/seedlink_test.py b/tests/streaming_tests/seedlink_test.py index 8225d2e..df4f1b4 100644 --- a/tests/streaming_tests/seedlink_test.py +++ b/tests/streaming_tests/seedlink_test.py @@ -6,95 +6,83 @@ import time import shutil import os -import numpy as np +import pytest from obspy import Stream -from obsplus import WaveBank from rt_eqcorrscan.streaming.clients.seedlink import RealTimeClient import logging -logging.basicConfig(level="DEBUG") +logging.basicConfig( + level="INFO", + format="%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s") +SLEEP_STEP = 30 + +# Note:: Must always have try: finally: to stop the streamer to avoid +# continuous running on fail! class SeedLinkTest(unittest.TestCase): - def setUp(self): - self.rt_client = RealTimeClient( + @classmethod + def setUpClass(cls): + cls.rt_client = RealTimeClient( server_url="link.geonet.org.nz", buffer_capacity=10.) + @pytest.mark.flaky(reruns=1) def test_background_streaming(self): rt_client = self.rt_client.copy() rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") rt_client.background_run() - time.sleep(30) - rt_client.background_stop() - self.assertEqual(rt_client.buffer_length, - rt_client.buffer_capacity) - + time.sleep(SLEEP_STEP) + try: + self.assertEqual(rt_client.buffer_length, + rt_client.buffer_capacity) + finally: + rt_client.background_stop() + + @pytest.mark.flaky(reruns=1) def test_full_buffer(self): rt_client = self.rt_client.copy() rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") rt_client.clear_buffer() rt_client.background_run() - self.assertFalse(rt_client.buffer_full) - time.sleep(30) - rt_client.background_stop() - self.assertTrue(rt_client.buffer_full) - + try: + self.assertFalse(rt_client.buffer_full) + time.sleep(SLEEP_STEP) + self.assertTrue(rt_client.buffer_full) + finally: + rt_client.background_stop() + + @pytest.mark.flaky(reruns=1) def test_can_add_streams(self): rt_client = self.rt_client.copy() self.assertTrue(rt_client.can_add_streams) rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") rt_client.background_run() - self.assertFalse(rt_client.can_add_streams) - rt_client.background_stop() + try: + self.assertFalse(rt_client.can_add_streams) + finally: + rt_client.background_stop() self.assertFalse(rt_client.can_add_streams) rt_client = self.rt_client.copy(empty_buffer=False) self.assertTrue(rt_client.can_add_streams) + @pytest.mark.flaky(reruns=1) def test_get_stream(self): + initial_sleep = 10 rt_client = self.rt_client.copy() rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") rt_client.background_run() - time.sleep(10) + time.sleep(initial_sleep) stream = rt_client.stream - self.assertIsInstance(stream, Stream) - time.sleep(20) + try: + self.assertIsInstance(stream, Stream) + time.sleep(SLEEP_STEP - initial_sleep) + finally: + rt_client.background_stop() stream2 = rt_client.stream self.assertNotEqual(stream, stream2) - rt_client.background_stop() - - # def test_wavebank_integration(self): - # rt_client = self.rt_client.copy() - # rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") - # rt_client.wavebank = WaveBank(base_path="test_wavebank") - # rt_client.background_run() - # time.sleep(30) - # rt_client.background_stop() - # self.assertTrue(rt_client.buffer_full) # Need a full buffer to work - # wavebank_traces = rt_client.wavebank.get_waveforms() - # wavebank_stream = wavebank_traces.merge() - # buffer_stream = rt_client.stream - # wavebank_stream.sort() - # buffer_stream.sort() - # self.assertEqual(buffer_stream[0].id, wavebank_stream[0].id) - # print(buffer_stream[0]) - # print(wavebank_stream[0]) - # self.assertLessEqual( - # abs(buffer_stream[0].stats.endtime - - # wavebank_stream[0].stats.endtime), - # buffer_stream[0].stats.delta * 10) - # endtime = min(buffer_stream[0].stats.endtime, - # wavebank_stream[0].stats.endtime) - # starttime = max(buffer_stream[0].stats.starttime, - # wavebank_stream[0].stats.starttime) - # self.assertTrue( - # np.all(wavebank_stream.slice( - # starttime=starttime, endtime=endtime)[0].data == - # buffer_stream.slice( - # starttime=starttime, endtime=endtime)[0].data)) - # shutil.rmtree("test_wavebank") class SeedLinkThreadedTests(unittest.TestCase): @@ -116,11 +104,11 @@ def test_read_write_from_multiple_threads(self): rt_client.background_run() tic, toc = time.time(), time.time() st = Stream() - while toc - tic < 20.0: + while toc - tic < SLEEP_STEP: st = rt_client.stream toc = time.time() rt_client.background_stop() - assert len(st) != 0 + self.assertNotEqual(len(st), 0) # If we got here without error, then we should be safe. def test_add_trace_from_mainprocess(self): @@ -129,28 +117,48 @@ def test_add_trace_from_mainprocess(self): server_url="link.geonet.org.nz", buffer_capacity=600.) rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") rt_client.background_run() - time.sleep(20) - st = rt_client.stream.split() - assert len(st) > 0, "Empty Stream, cannot perform test" + # Wait for some data to perform the test + tic, toc = time.time(), time.time() + while toc - tic < SLEEP_STEP: + time.sleep(1) + st = rt_client.stream.split() + if len(st): + break + toc = time.time() + else: + rt_client.background_stop() # Must stop the streamer! + raise NotImplementedError("Did not accumulate any data") tr = st[0] tr.stats.starttime -= 100 rt_client.on_data(tr) - time.sleep(20) - rt_client.background_stop() + time.sleep(SLEEP_STEP) st = rt_client.stream.split().merge() - assert len(st) == 1, "More than one trace in stream!" - self.assertLess(st[0].stats.starttime - tr.stats.starttime, 0.01) - self.assertGreater(st[0].stats.endtime, tr.stats.endtime) + try: + self.assertEqual(len(st), 1) # "More than one trace in stream!" + self.assertLess(st[0].stats.starttime - tr.stats.starttime, 0.01) + self.assertGreater(st[0].stats.endtime, tr.stats.endtime) + finally: + rt_client.background_stop() def test_add_data_not_streaming(self): rt_client = RealTimeClient( server_url="link.geonet.org.nz", buffer_capacity=600.) rt_client.select_stream(net="NZ", station="FOZ", selector="HHZ") rt_client.background_run() - time.sleep(30) + # Wait for some data to perform the test + tic, toc = time.time(), time.time() + while toc - tic < SLEEP_STEP: + time.sleep(1) + st = rt_client.stream.split() + if len(st): + break + toc = time.time() + else: + rt_client.background_stop() # Must stop the streamer! + raise NotImplementedError("Did not accumulate any data") + # Stop the streamer here. rt_client.background_stop() - st = rt_client.stream.split().merge() - self.assertGreater(len(st), 0) + tr = st[0].copy() tr.stats.starttime -= 100 # Try to add the trace.