Skip to content

Commit

Permalink
Merge pull request #66 from us-irs/extend-space-packet-abstract-class
Browse files Browse the repository at this point in the history
Extend space packet abstract class
  • Loading branch information
robamu authored Dec 22, 2023
2 parents d97ffaf + 7d804a9 commit 5bb4970
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 60 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

# [unreleased]

## Changed

- Extended `AbstractSpacePacket` with the following abstract properties:
- `ccsds_version`
- `packet_id`
- `packet_seq_control`
- The following properties were added but use the abstract properties:
- `packet_type`
- `sec_header_flag`
- `seq_flags`

## Fixed

- Metadata PDU typing correction.
- More robust `__eq__` implementations which check the type compared against.

# [v0.21.0] 2023-11-10

Expand Down
101 changes: 76 additions & 25 deletions spacepackets/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def raw(self) -> int:
def empty(cls):
return cls(seq_flags=SequenceFlags.CONTINUATION_SEGMENT, seq_count=0)

def __eq__(self, other: object) -> bool:
if isinstance(other, PacketSeqCtrl):
return self.raw() == other.raw()
return False

@classmethod
def from_raw(cls, raw: int):
return cls(
Expand Down Expand Up @@ -107,6 +112,11 @@ def __str__(self):
def raw(self) -> int:
return self.ptype << 12 | self.sec_header_flag << 11 | self.apid

def __eq__(self, other: object) -> bool:
if isinstance(other, PacketId):
return self.raw() == other.raw()
return False

@classmethod
def from_raw(cls, raw: int) -> PacketId:
return cls(
Expand All @@ -119,14 +129,39 @@ def from_raw(cls, raw: int) -> PacketId:
class AbstractSpacePacket(ABC):
@property
@abstractmethod
def apid(self) -> int:
def ccsds_version(self) -> int:
pass

@property
@abstractmethod
def seq_count(self) -> int:
def packet_id(self) -> PacketId:
pass

@property
@abstractmethod
def packet_seq_control(self) -> PacketSeqCtrl:
pass

@property
def packet_type(self) -> PacketType:
return self.packet_id.ptype

@property
def apid(self) -> int:
return self.packet_id.apid

@property
def sec_header_flag(self) -> bool:
return self.packet_id.sec_header_flag

@property
def seq_count(self) -> int:
return self.packet_seq_control.seq_count

@property
def seq_flags(self) -> SequenceFlags:
return self.packet_seq_control.seq_flags

@abstractmethod
def pack(self) -> bytearray:
pass
Expand Down Expand Up @@ -159,7 +194,7 @@ def __init__(
19
>>> sph.packet_id
PacketId(ptype=<PacketType.TC: 1>, sec_header_flag=False, apid=66)
>>> sph.psc
>>> sph.packet_seq_control
PacketSeqCtrl(seq_flags=<SequenceFlags.UNSEGMENTED: 3>, seq_count=0)
:param packet_type: 0 for Telemetery, 1 for Telecommands
Expand All @@ -179,11 +214,11 @@ def __init__(
"Invalid data length value, exceeds maximum value of"
f" {pow(2, 16) - 1} or negative"
)
self.ccsds_version = ccsds_version
self.packet_id = PacketId(
self._ccsds_version = ccsds_version
self._packet_id = PacketId(
ptype=packet_type, sec_header_flag=sec_header_flag, apid=apid
)
self.psc = PacketSeqCtrl(seq_flags=seq_flags, seq_count=seq_count)
self._psc = PacketSeqCtrl(seq_flags=seq_flags, seq_count=seq_count)
self.data_len = data_len

@classmethod
Expand All @@ -210,45 +245,57 @@ def pack(self) -> bytearray:
header = bytearray()
packet_id_with_version = self.ccsds_version << 13 | self.packet_id.raw()
header.extend(struct.pack("!H", packet_id_with_version))
header.extend(struct.pack("!H", self.psc.raw()))
header.extend(struct.pack("!H", self._psc.raw()))
header.extend(struct.pack("!H", self.data_len))
return header

@property
def packet_type(self):
def ccsds_version(self) -> int:
return self._ccsds_version

@property
def packet_id(self) -> PacketId:
return self._packet_id

@property
def packet_type(self) -> PacketType:
return self.packet_id.ptype

@packet_type.setter
def packet_type(self, packet_type):
self.packet_id.ptype = packet_type

@property
def apid(self):
return self.packet_id.apid
def apid(self) -> int:
return self._packet_id.apid

@property
def sec_header_flag(self):
return self.packet_id.sec_header_flag
def packet_seq_control(self) -> PacketSeqCtrl:
return self._psc

@property
def sec_header_flag(self) -> bool:
return self._packet_id.sec_header_flag

@sec_header_flag.setter
def sec_header_flag(self, value):
self.packet_id.sec_header_flag = value
self._packet_id.sec_header_flag = value

@property
def seq_count(self):
return self.psc.seq_count
return self._psc.seq_count

@seq_count.setter
def seq_count(self, seq_cnt):
self.psc.seq_count = seq_cnt
self._psc.seq_count = seq_cnt

@property
def seq_flags(self):
return self.psc.seq_flags
return self._psc.seq_flags

@seq_flags.setter
def seq_flags(self, value):
self.psc.seq_flags = value
self._psc.seq_flags = value

@property
def header_len(self) -> int:
Expand Down Expand Up @@ -299,8 +346,10 @@ def __repr__(self):
f" sec_header_flag={self.sec_header_flag!r}, seq_flags={self.seq_flags!r})"
)

def __eq__(self, other: SpacePacketHeader):
return self.pack() == other.pack()
def __eq__(self, other: object):
if isinstance(other, SpacePacketHeader):
return self.pack() == other.pack()
return False


class SpacePacket:
Expand Down Expand Up @@ -357,12 +406,14 @@ def seq_count(self):
def sec_header_flag(self):
return self.sp_header.sec_header_flag

def __eq__(self, other: SpacePacket):
return (
self.sp_header == other.sp_header
and self.sec_header == other.sec_header
and self.user_data == other.user_data
)
def __eq__(self, other: object):
if isinstance(other, SpacePacket):
return (
self.sp_header == other.sp_header
and self.sec_header == other.sec_header
and self.user_data == other.user_data
)
return False


def get_space_packet_id_bytes(
Expand Down
13 changes: 13 additions & 0 deletions spacepackets/ecss/pus_17_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional

from spacepackets import SpacePacketHeader
from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss.conf import FETCH_GLOBAL_APID
from spacepackets.ecss.defs import PusService
Expand Down Expand Up @@ -42,6 +43,18 @@ def __init__(
def sp_header(self) -> SpacePacketHeader:
return self.pus_tm.space_packet_header

@property
def ccsds_version(self) -> int:
return self.pus_tm.ccsds_version

@property
def packet_id(self) -> PacketId:
return self.pus_tm.packet_id

@property
def packet_seq_control(self) -> PacketSeqCtrl:
return self.pus_tm.packet_seq_control

@property
def service(self) -> int:
return self.pus_tm.service
Expand Down
13 changes: 13 additions & 0 deletions spacepackets/ecss/pus_1_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Optional

from spacepackets.ccsds import SpacePacketHeader
from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss import PusTelecommand
from spacepackets.ecss.conf import FETCH_GLOBAL_APID
Expand Down Expand Up @@ -185,6 +186,18 @@ def unpack(cls, data: bytes, params: UnpackParams) -> Service1Tm:
def time_provider(self) -> Optional[CcsdsTimeProvider]:
return self.pus_tm.time_provider

@property
def ccsds_version(self) -> int:
return self.pus_tm.space_packet_header.ccsds_version

@property
def packet_seq_control(self) -> PacketSeqCtrl:
return self.pus_tm.space_packet_header.packet_seq_control

@property
def packet_id(self) -> PacketId:
return self.pus_tm.space_packet_header.packet_id

@property
def sp_header(self) -> SpacePacketHeader:
return self.pus_tm.space_packet_header
Expand Down
2 changes: 1 addition & 1 deletion spacepackets/ecss/req_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def from_sp_header(cls, header: SpacePacketHeader) -> RequestId:
return cls(
ccsds_version=header.ccsds_version,
tc_packet_id=header.packet_id,
tc_psc=header.psc,
tc_psc=header._psc,
)

def pack(self) -> bytes:
Expand Down
30 changes: 19 additions & 11 deletions spacepackets/ecss/tc.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ def __repr__(self):
f" subservice={self.subservice!r}, ack_flags={self.ack_flags!r} "
)

def __eq__(self, other: PusTcDataFieldHeader):
return self.pack() == other.pack()
def __eq__(self, other: object):
if isinstance(other, PusTcDataFieldHeader):
return self.pack() == other.pack()
return False

@classmethod
def get_header_size(cls):
Expand Down Expand Up @@ -232,19 +234,21 @@ def __str__(self):
f" {self.apid:#05x}, SSC {self.sp_header.seq_count}"
)

def __eq__(self, other: PusTelecommand):
return (
self.sp_header == other.sp_header
and self.pus_tc_sec_header == other.pus_tc_sec_header
and self._app_data == other._app_data
)
def __eq__(self, other: object):
if isinstance(other, PusTelecommand):
return (
self.sp_header == other.sp_header
and self.pus_tc_sec_header == other.pus_tc_sec_header
and self._app_data == other._app_data
)
return False

def to_space_packet(self) -> SpacePacket:
"""Retrieve the generic CCSDS space packet representation. This also calculates the CRC16
before converting the PUS TC to a generic Space Packet"""
self.calc_crc()
user_data = bytearray(self._app_data)
user_data.extend(self._crc16)
user_data.extend(self._crc16) # type: ignore
return SpacePacket(self.sp_header, self.pus_tc_sec_header.pack(), user_data)

def calc_crc(self):
Expand Down Expand Up @@ -343,6 +347,10 @@ def source_id(self) -> int:
def source_id(self, source_id: int):
self.pus_tc_sec_header.source_id = source_id

@property
def ccsds_version(self) -> int:
return self.sp_header.ccsds_version

@property
def seq_count(self) -> int:
return self.sp_header.seq_count
Expand All @@ -356,8 +364,8 @@ def packet_id(self) -> PacketId:
return self.sp_header.packet_id

@property
def packet_seq_ctrl(self) -> PacketSeqCtrl:
return self.sp_header.psc
def packet_seq_control(self) -> PacketSeqCtrl:
return self.sp_header._psc

@property
def app_data(self) -> bytes:
Expand Down
Loading

0 comments on commit 5bb4970

Please sign in to comment.