Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use factory function for publisher creation in trollstalker and switch to Watchdog #152

Merged
merged 5 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
python -m pip install \
--no-deps --pre --upgrade \
numpy; \
"watchdog!=4.0.0" numpy; \
python -m pip install \
--no-deps --upgrade \
git+https://github.com/pytroll/posttroll \
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies:
- paramiko
- responses
- netifaces
- watchdog
- watchdog!=4.0.0
- s3fs
- pyinotify
- requests
Expand Down
92 changes: 58 additions & 34 deletions pytroll_collectors/tests/test_trollstalker.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
"""Tests for trollstalker."""
import os
import time
from threading import Thread
import pytest

from posttroll.message import Message
from pytroll_collectors.trollstalker import main, stop
from pytroll_collectors.trollstalker import start_observer, stop_observer


def create_config_file(dir_to_watch, tmp_path):
LAG_SECONDS = 0.02


@pytest.fixture
def dir_to_watch(tmp_path):
"""Define a dir to watch."""
dir_to_watch = tmp_path / "to_watch"
return dir_to_watch


@pytest.fixture
def config_file(tmp_path, dir_to_watch):
"""Create a config file for trollstalker."""
config = """# This config is used in Trollstalker.

Expand All @@ -16,10 +27,9 @@ def create_config_file(dir_to_watch, tmp_path):
directory=""" + os.fspath(dir_to_watch) + """
filepattern={path}hrpt_{platform_name}_{start_time:%Y%m%d_%H%M}_{orbit_number:05d}.l1b
instruments=avhrr/3,mhs,amsu-b,amsu-a,hirs/3,hirs/4
#stalker_log_config=/usr/local/etc/pytroll/trollstalker_logging.ini
loglevel=DEBUG
event_names=IN_CLOSE_WRITE,IN_MOVED_TO
posttroll_port=0
loglevel=WARNING
posttroll_port=12234
nameservers=false
alias_platform_name = noaa18:NOAA-18|noaa19:NOAA-19
history=10"""
config_file = tmp_path / "config.ini"
Expand All @@ -28,45 +38,59 @@ def create_config_file(dir_to_watch, tmp_path):
return config_file


def test_trollstalker(tmp_path, caplog):
"""Test trollstalker functionality."""
dir_to_watch = tmp_path / "to_watch"
os.makedirs(dir_to_watch)
@pytest.fixture
def messages_from_observer(config_file):
"""Create an observer and yield the messages it published."""
from posttroll.testing import patched_publisher
with patched_publisher() as messages:
obs = start_observer(["-c", os.fspath(config_file), "-C", "noaa_hrpt"])
time.sleep(LAG_SECONDS)
yield messages
stop_observer(obs)

config_file = create_config_file(dir_to_watch, tmp_path)

thread = Thread(target=main, args=[["-c", os.fspath(config_file), "-C", "noaa_hrpt"]])
thread.start()
time.sleep(.5)
trigger_file = dir_to_watch / "hrpt_noaa18_20230524_1017_10101.l1b"
def test_trollstalker(messages_from_observer, dir_to_watch):
"""Test trollstalker functionality."""
subdir_to_watch = dir_to_watch / "new_dir"
os.mkdir(subdir_to_watch)

trigger_file = subdir_to_watch / "hrpt_noaa18_20230524_1017_10101.l1b"
with open(trigger_file, "w") as fd:
fd.write("hej")
time.sleep(.5)
stop()
thread.join()
assert "Publishing message pytroll://HRPT/l1b/dev/mystation file " in caplog.text
for line in caplog.text.split("\n"):
if "Publishing message" in line:
message = Message(rawstr=line.split("Publishing message ")[1])
time.sleep(LAG_SECONDS)
message = messages_from_observer[0]
assert message.startswith("pytroll://HRPT/l1b/dev/mystation file ")
message = Message(rawstr=message)

assert message.data['platform_name'] == "NOAA-18"
assert message.data['uri'] == os.fspath(trigger_file)


def test_trollstalker_directory_does_not_exist(tmp_path):
def test_trollstalker_monitored_directory_is_created(messages_from_observer, dir_to_watch):
"""Test that monitored directories are created."""
dir_to_watch = tmp_path / "to_watch"
trigger_file = dir_to_watch / "hrpt_noaa18_20230524_1017_10101.l1b"
with open(trigger_file, "w") as fd:
fd.write("hej")
time.sleep(LAG_SECONDS)
assert os.path.exists(dir_to_watch)

config_file = create_config_file(dir_to_watch, tmp_path)

thread = Thread(target=main, args=[["-c", os.fspath(config_file), "-C", "noaa_hrpt"]])
thread.start()
time.sleep(.5)
trigger_file = dir_to_watch / "hrpt_noaa18_20230524_1017_10101.l1b"
def test_trollstalker_handles_moved_files(messages_from_observer, dir_to_watch, tmp_path):
"""Test that trollstalker detects moved files."""
filename = "hrpt_noaa18_20230524_1017_10101.l1b"
trigger_file = tmp_path / filename
with open(trigger_file, "w") as fd:
fd.write("hej")
time.sleep(.5)
stop()
thread.join()
os.rename(trigger_file, dir_to_watch / filename)
time.sleep(LAG_SECONDS)
assert len(messages_from_observer) == 1
assert messages_from_observer[0].startswith("pytroll://HRPT/l1b/dev/mystation file ")

assert os.path.exists(dir_to_watch)

def test_event_names_are_deprecated(config_file):
"""Test that trollstalker detects moved files."""
with open(config_file, "a") as fd:
fd.write("\nevent_names=IN_CLOSE_WRITE,IN_MOVED_TO,IN_CREATE\n")
with pytest.deprecated_call():
obs = start_observer(["-c", os.fspath(config_file), "-C", "noaa_hrpt"])
stop_observer(obs)
Loading
Loading