-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #73 from us-irs/add-seq-count-abstractions
Add Sequence Counter Module
- Loading branch information
Showing
7 changed files
with
189 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
Sequence Count Module | ||
======================== | ||
|
||
.. automodule:: spacepackets.seqcount | ||
:members: | ||
:undoc-members: | ||
:show-inheritance: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
"""This module provides generic sequence counter abstractions and implementation which are commonly | ||
needed when working with space packet protocols. | ||
""" | ||
from abc import abstractmethod, ABC | ||
from pathlib import Path | ||
|
||
|
||
class ProvidesSeqCount(ABC): | ||
@property | ||
@abstractmethod | ||
def max_bit_width(self) -> int: | ||
pass | ||
|
||
@max_bit_width.setter | ||
@abstractmethod | ||
def max_bit_width(self, width: int): | ||
pass | ||
|
||
@abstractmethod | ||
def get_and_increment(self) -> int: | ||
"""Contract: Retrieve the current sequence count and then increment it. The first call | ||
should yield 0""" | ||
raise NotImplementedError( | ||
"Please use a concrete class implementing this method" | ||
) | ||
|
||
def __next__(self): | ||
return self.get_and_increment() | ||
|
||
|
||
class FileSeqCountProvider(ProvidesSeqCount): | ||
"""Sequence count provider which uses a disk file to store the current sequence count | ||
in a non-volatile way. The first call with the next built-in or using the base | ||
class :py:meth:`current` call will yield a 0 | ||
""" | ||
|
||
def __init__(self, max_bit_width: int, file_name: Path = Path("seqcnt.txt")): | ||
self.file_name = file_name | ||
self._max_bit_width = max_bit_width | ||
if not self.file_name.exists(): | ||
self.create_new() | ||
|
||
@property | ||
def max_bit_width(self) -> int: | ||
return self._max_bit_width | ||
|
||
@max_bit_width.setter | ||
def max_bit_width(self, width: int): | ||
self._max_bit_width = width | ||
|
||
def create_new(self): | ||
with open(self.file_name, "w") as file: | ||
file.write("0\n") | ||
|
||
def current(self) -> int: | ||
if not self.file_name.exists(): | ||
raise FileNotFoundError(f"{self.file_name} file does not exist") | ||
with open(self.file_name) as file: | ||
return self.check_count(file.readline()) | ||
|
||
def get_and_increment(self) -> int: | ||
if not self.file_name.exists(): | ||
raise FileNotFoundError(f"{self.file_name} file does not exist") | ||
with open(self.file_name, "r+") as file: | ||
curr_seq_cnt = self.check_count(file.readline()) | ||
file.seek(0) | ||
file.write(f"{self._increment_with_rollover(curr_seq_cnt)}\n") | ||
return curr_seq_cnt | ||
|
||
def check_count(self, line: str) -> int: | ||
line = line.rstrip() | ||
if not line.isdigit(): | ||
raise ValueError("Sequence count file content is invalid") | ||
curr_seq_cnt = int(line) | ||
if curr_seq_cnt < 0 or curr_seq_cnt > pow(2, self.max_bit_width) - 1: | ||
raise ValueError("Sequence count in file has invalid value") | ||
return curr_seq_cnt | ||
|
||
def _increment_with_rollover(self, seq_cnt: int) -> int: | ||
"""CCSDS Sequence count has maximum size of 14 bit. Rollover after that size by default""" | ||
if seq_cnt >= pow(2, self.max_bit_width) - 1: | ||
return 0 | ||
else: | ||
return seq_cnt + 1 | ||
|
||
|
||
class PusFileSeqCountProvider(FileSeqCountProvider): | ||
def __init__(self, file_name: Path = Path("seqcnt.txt")): | ||
super().__init__(max_bit_width=14, file_name=file_name) | ||
|
||
|
||
class SeqCountProvider(ProvidesSeqCount): | ||
def __init__(self, bit_width: int): | ||
self.count = 0 | ||
self._max_bit_width = bit_width | ||
|
||
@property | ||
def max_bit_width(self) -> int: | ||
return self._max_bit_width | ||
|
||
@max_bit_width.setter | ||
def max_bit_width(self, width: int): | ||
self._max_bit_width = width | ||
|
||
def get_and_increment(self) -> int: | ||
curr_count = self.count | ||
self.count += 1 | ||
return curr_count |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import os | ||
from pathlib import Path | ||
import platform | ||
from unittest import TestCase | ||
|
||
from spacepackets.seqcount import PusFileSeqCountProvider | ||
from tempfile import NamedTemporaryFile | ||
|
||
|
||
class TestSeqCount(TestCase): | ||
def setUp(self) -> None: | ||
self.file_name = Path("seq_cnt.txt") | ||
|
||
def test_basic(self): | ||
if platform.system() != "Windows": | ||
with NamedTemporaryFile("w+t") as file: | ||
file.write("0\n") | ||
file.seek(0) | ||
seq_cnt_provider = PusFileSeqCountProvider(Path(file.name)) | ||
seq_cnt = seq_cnt_provider.current() | ||
self.assertEqual(seq_cnt, 0) | ||
# The first call will start at 0 | ||
self.assertEqual(next(seq_cnt_provider), 0) | ||
self.assertEqual(seq_cnt_provider.get_and_increment(), 1) | ||
file.seek(0) | ||
file.write(f"{pow(2, 14) - 1}\n") | ||
file.flush() | ||
# Assert rollover | ||
self.assertEqual(next(seq_cnt_provider), pow(2, 14) - 1) | ||
self.assertEqual(next(seq_cnt_provider), 0) | ||
|
||
def test_with_real_file(self): | ||
seq_cnt_provider = PusFileSeqCountProvider(self.file_name) | ||
self.assertTrue(self.file_name.exists()) | ||
self.assertEqual(seq_cnt_provider.current(), 0) | ||
self.assertEqual(next(seq_cnt_provider), 0) | ||
pass | ||
|
||
def test_file_deleted_runtime(self): | ||
seq_cnt_provider = PusFileSeqCountProvider(self.file_name) | ||
self.assertTrue(self.file_name.exists()) | ||
os.remove(self.file_name) | ||
with self.assertRaises(FileNotFoundError): | ||
next(seq_cnt_provider) | ||
with self.assertRaises(FileNotFoundError): | ||
seq_cnt_provider.current() | ||
|
||
def test_faulty_file_entry(self): | ||
if platform.system() != "Windows": | ||
with NamedTemporaryFile("w+t") as file: | ||
file.write("-1\n") | ||
file.seek(0) | ||
seq_cnt_provider = PusFileSeqCountProvider(Path(file.name)) | ||
with self.assertRaises(ValueError): | ||
next(seq_cnt_provider) | ||
file.write(f"{pow(2, 15)}\n") | ||
file.seek(0) | ||
file.flush() | ||
seq_cnt_provider = PusFileSeqCountProvider(Path(file.name)) | ||
with self.assertRaises(ValueError): | ||
next(seq_cnt_provider) | ||
|
||
def tearDown(self) -> None: | ||
if self.file_name.exists(): | ||
os.remove(self.file_name) |