Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python TemplateAccessor and CmdResponseProtocol #1684

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions openc3/python/openc3/accessors/template_accessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2024 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.accessors.accessor import Accessor
import re


class TemplateAccessor(Accessor):
def __init__(self, packet, left_char="<", right_char=">"):
super().__init__(packet)
self.left_char = left_char
self.right_char = right_char
self.configured = False

def configure(self):
if self.configured:
return

escaped_left_char = self.left_char
if self.left_char == "(":
escaped_left_char = f"\\{self.left_char}"
escaped_right_char = self.right_char
if self.right_char == ")":
escaped_right_char = f"\\{self.right_char}"

# Convert the template into a Regexp for reading each item
template = self.packet.template[:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just add .decode() here and do it once

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

template_items = re.compile(f"{escaped_left_char}.*?{escaped_right_char}", re.X).findall(template.decode())
escaped_read_template = re.escape(template.decode())

self.item_keys = []
for item in template_items:
self.item_keys.append(item[1:-1])
# If they're using parens we have to escape them
# since we're working with the already escaped template
if self.left_char == "(":
item = f"\({item[1:]}"
if self.right_char == ")":
item = f"{item[0:-1]}\)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At line 34 and 37 it is a double \\ but here a single \(. Is that correct? Ruby has double \\.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. One is for a regex and the other is for a string replace

escaped_read_template = escaped_read_template.replace(item, "(.*)")
self.read_regexp = re.compile(escaped_read_template, re.X)
self.configured = True

def read_item(self, item, buffer):
if item.data_type == "DERIVED":
return None
self.configure()

# Scan the response for all the variables in brackets <VARIABLE>
values = self.read_regexp.match(buffer.decode())
if values is not None:
values = values.groups()
if values is None or (len(values) != len(self.item_keys)):
num_items = 0
if values is not None:
num_items = len(values)
raise RuntimeError(
f"Unexpected number of items found in buffer= {num_items}, Expected= {len(self.item_keys)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruby code is buffer: and Expected:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)
else:
for i, value in enumerate(values):
item_key = self.item_keys[i]
if item_key == item.key:
return Accessor.convert_to_type(value, item)

raise RuntimeError(f"Response does not include key {item.key}: {buffer}")

def read_items(self, items, buffer):
result = {}
self.configure()

# Scan the response for all the variables in brackets <VARIABLE>
values = self.read_regexp.match(buffer.decode())
if values is not None:
values = values.groups()
if values is None or (len(values) != len(self.item_keys)):
num_items = 0
if values is not None:
num_items = len(values)
raise RuntimeError(
f"Unexpected number of items found in buffer= {num_items}, Expected= {len(self.item_keys)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change = to :

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)
else:
for item in items:
if item.data_type == "DERIVED":
result[item.name] = None
continue
try:
index = self.item_keys.index(item.key)
result[item.name] = Accessor.convert_to_type(values[index], item)
except ValueError:
raise RuntimeError(f"Unknown item with key {item.key} requested")

return result

def write_item(self, item, value, buffer):
if item.data_type == "DERIVED":
return None
self.configure()

updated_buffer = buffer.decode().replace(f"{self.left_char}{item.key}{self.right_char}", str(value)).encode()

if buffer == updated_buffer:
raise RuntimeError(f"Key {item.key} not found in template")
buffer[0:] = updated_buffer
return value

def write_items(self, items, values, buffer):
self.configure()
for index, item in enumerate(items):
if item.data_type == "DERIVED":
continue
updated_buffer = (
buffer.decode().replace(f"{self.left_char}{item.key}{self.right_char}", str(values[index])).encode()
)

if buffer == updated_buffer:
raise RuntimeError(f"Key {item.key} not found in template")
buffer[0:] = updated_buffer
return values

# If this is set it will enforce that buffer data is encoded
# in a specific encoding
def enforce_encoding(self):
return None

# This affects whether the Packet class enforces the buffer
# length at all. Set to false to remove any correlation between
# buffer length and defined sizes of items in COSMOS
def enforce_length(self):
return False

# This sets the short_buffer_allowed flag in the Packet class
# which allows packets that have a buffer shorter than the defined size.
# Note that the buffer is still resized to the defined length
def enforce_short_buffer_allowed(self):
return True

# If this is true it will enforce that COSMOS DERIVED items must have a
# write_conversion to be written
def enforce_derived_write_conversion(self, _item):
return True
113 changes: 113 additions & 0 deletions openc3/python/openc3/interfaces/protocols/cmd_response_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2024 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.system.system import System
from openc3.config.config_parser import ConfigParser
from openc3.utilities.logger import Logger
from openc3.interfaces.protocols.protocol import Protocol
from queue import SimpleQueue, Empty
import time


# Protocol that waits for a response for any commands with a defined response packet.
# The response packet is identified but not defined by the protocol.
class CmdResponseProtocol(Protocol):
# @param response_timeout [Float] Number of seconds to wait before timing out
# when waiting for a response
# @param response_polling_period [Float] Number of seconds to wait between polling
# for a response
# @param raise_exceptions [String] Whether to raise exceptions when errors
# occur in the protocol like unexpected responses or response timeouts.
# @param allow_empty_data [true/false/nil] See Protocol#initialize
def __init__(
self, response_timeout=5.0, response_polling_period=0.02, raise_exceptions=False, allow_empty_data=None
):
super().__init__(allow_empty_data)
self.response_timeout = ConfigParser.handle_none(response_timeout)
if self.response_timeout:
self.response_timeout = float(response_timeout)
self.response_polling_period = float(response_polling_period)
self.raise_exceptions = ConfigParser.handle_true_false(raise_exceptions)
self.write_block_queue = SimpleQueue()
self.response_packet = None

def connect_reset(self):
super().connect_reset()
try:
while self.write_block_queue.qsize != 0:
self.write_block_queue.get_nowait()
except Empty:
pass

def disconnect_reset(self):
super().disconnect_reset()
self.write_block_queue.put(None) # Unblock the write block queue

def read_packet(self, packet):
if self.response_packet is not None:
# Grab the response packet specified in the command
result_packet = System.telemetry.packet(self.response_packet[0], self.response_packet[1]).clone()
result_packet.buffer = packet.buffer
result_packet.received_time = None
result_packet.stored = packet.stored
result_packet.extra = packet.extra

# Release the write
self.write_block_queue.put(None)

# This returns the fully identified and defined packet
# Received time is handled by the interface microservice
return result_packet
else:
return packet

def write_packet(self, packet):
# Setup the response packet (if there is one)
# This primes waiting for the response in post_write_interface
self.response_packet = packet.response

return packet

def post_write_interface(self, packet, data, extra=None):
if self.response_packet is not None:
if self.response_timeout:
response_timeout_time = time.time() + self.response_timeout
else:
response_timeout_time = None

# Block the write until the response is received
while True:
try:
self.write_block_queue.get_nowait()
break
except Empty:
time.sleep(self.response_polling_period)
if response_timeout_time is None:
continue
if response_timeout_time and time.time() < response_timeout_time:
continue
interface_name = ""
if self.interface is not None:
interface_name = self.interface.name
self.handle_error(f"{interface_name}: Timeout waiting for response")

self.response_packet = None
return super().post_write_interface(packet, data, extra)

def handle_error(self, msg):
Logger.error(msg)
if self.raise_exceptions:
raise RuntimeError(msg)
Loading
Loading