-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* bump all product versions to 2 * bump version * fix version label * pin common CONF version for end2end test * ignore VERS_CFG in end2end testing * add test for TM folder observing * bump watchdog==6.0.0 * integrate review fixes
- Loading branch information
Showing
5 changed files
with
69 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ install_requires = | |
scp | ||
spiceypy | ||
sunpy>=4.0.1 | ||
watchdog | ||
watchdog==6.0.0 | ||
requests | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import re | ||
import time | ||
import shutil | ||
|
||
import pytest | ||
from watchdog.observers import Observer | ||
|
||
from stixcore.processing.pipeline import GFTSFileHandler | ||
from stixcore.util.logging import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
@pytest.fixture | ||
def out_dir(tmp_path): | ||
return tmp_path | ||
|
||
|
||
def processing_tm_file(path, **args): | ||
logger.info(f"start processing tm file: {path}") | ||
time.sleep(2) | ||
logger.info(f"stop processing tm file: {path}") | ||
|
||
|
||
@pytest.fixture | ||
def gfts_manager(): | ||
return GFTSFileHandler(processing_tm_file, re.compile(r'.*test_[0-9]*.tm$'), name="w-dog-test") | ||
|
||
|
||
def test_gfts_manager_watchdog(out_dir, gfts_manager): | ||
(out_dir / "test_1.tm").touch() | ||
(out_dir / "test_2.tm").touch() | ||
(out_dir / "test_3.tm").touch() | ||
(out_dir / "test_4.tmp").touch() | ||
(out_dir / "test_5.tmp").touch() | ||
(out_dir / "test_6.tmp").touch() | ||
|
||
observer = Observer() | ||
observer.schedule(gfts_manager, out_dir, recursive=False) | ||
observer.start() | ||
|
||
assert gfts_manager.queue.qsize() == 0 | ||
|
||
# 3 files are in the base dir | ||
# simulate start with unprocessed | ||
gfts_manager.add_to_queue(out_dir.glob("*.tm")) | ||
assert gfts_manager.queue.qsize() == 3 | ||
|
||
# now some tm in coming in | ||
time.sleep(1) | ||
shutil.move((out_dir / "test_4.tmp"), (out_dir / "test_4.tm")) | ||
time.sleep(1) | ||
shutil.move((out_dir / "test_5.tmp"), (out_dir / "test_5.tm")) | ||
time.sleep(1) | ||
shutil.move((out_dir / "test_6.tmp"), (out_dir / "test_6.tm")) | ||
time.sleep(1) | ||
|
||
openfiles = gfts_manager.queue.qsize() | ||
assert openfiles > 2 | ||
time.sleep(20) | ||
assert gfts_manager.queue.qsize() < openfiles | ||
observer.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters