Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command State Range Checking for COSMOS 6 #1670

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions openc3/lib/openc3/packets/commands.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# GNU Affero General Public License for more details.

# Modified by OpenC3, Inc.
# All changes Copyright 2022, OpenC3, Inc.
# All changes Copyright 2024, OpenC3, Inc.
# All Rights Reserved
#
# This file may also be used under the terms of a commercial license
Expand Down Expand Up @@ -308,12 +308,23 @@ def set_parameters(command, params, range_checking)
item = command.get_item(item_upcase)
range_check_value = value

# Convert from state to value if possible
if item.states and item.states[value.to_s.upcase]
range_check_value = item.states[value.to_s.upcase]
end

if range_checking
if item.states
if item.states[value.to_s.upcase]
range_check_value = item.states[value.to_s.upcase]
else
unless item.states.values.include?(value)
if command.raw
# Raw commands report missing value maps
raise "Command parameter '#{command.target_name} #{command.packet_name} #{item_upcase}' = #{value.to_s.upcase} not one of #{item.states.values.join(', ')}"
else
# Normal commands report missing state maps
raise "Command parameter '#{command.target_name} #{command.packet_name} #{item_upcase}' = #{value.to_s.upcase} not one of #{item.states.keys.join(', ')}"
end
end
end
end

range = item.range
if range
# Perform Range Check on command parameter
Expand Down
2 changes: 1 addition & 1 deletion openc3/lib/openc3/packets/packet.rb
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def write_item(item, value, value_type = :CONVERTED, buffer = @buffer)
super(item, value, :RAW, buffer)
rescue ArgumentError => e
if item.states and String === value and e.message =~ /invalid value for/
raise "Unknown state #{value} for #{item.name}"
raise "Unknown state #{value} for #{item.name}, must be one of #{item.states.keys.join(', ')}"
else
raise e
end
Expand Down
19 changes: 15 additions & 4 deletions openc3/python/openc3/packets/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,22 @@ def _set_parameters(self, command, params, range_checking):
item = command.get_item(item_upcase)
range_check_value = value

# Convert from state to value if possible
if item.states is not None and item.states.get(str(value).upper()) is not None:
range_check_value = item.states[value.upper()]

if range_checking:
if item.states:
if item.states.get(str(value).upper()) is not None:
range_check_value = item.states[str(value).upper()]
else:
if value not in item.states.values():
if command.raw:
# Raw commands report missing value maps
raise RuntimeError(
f"Command parameter '{command.target_name} {command.packet_name} {item_upcase}' = {value} not one of {', '.join(map(str, item.states.values()))}"
)
else:
# Normal commands report missing state maps
raise RuntimeError(
f"Command parameter '{command.target_name} {command.packet_name} {item_upcase}' = {value} not one of {', '.join(item.states.keys())}")

minimum = item.minimum
maximum = item.maximum
if minimum is not None and maximum is not None:
Expand Down
2 changes: 1 addition & 1 deletion openc3/python/openc3/packets/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def write_item(self, item, value, value_type="CONVERTED", buffer=None):
super().write_item(item, value, "RAW", buffer)
except ValueError as error:
if item.states and isinstance(value, str) and "invalid literal for" in repr(error):
raise ValueError(f"Unknown state {value} for {item.name}") from error
raise ValueError(f"Unknown state {value} for {item.name}, must be one of f{', '.join(item.states.keys())}") from error
else:
raise error
case "FORMATTED" | "WITH_UNITS":
Expand Down
227 changes: 168 additions & 59 deletions openc3/python/test/packets/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,25 @@ def setUp(self):
tf.write("#\n")
tf.write('COMMAND tgt1 pkt1 LITTLE_ENDIAN "TGT1 PKT1 Description"\n')
tf.write(' APPEND_ID_PARAMETER item1 8 UINT 1 1 1 "Item1"\n')
tf.write(' APPEND_PARAMETER item2 8 UINT 0 254 2 "Item2"\n')
tf.write(' APPEND_PARAMETER item3 8 UINT 0 254 3 "Item3"\n')
tf.write(' APPEND_PARAMETER item4 8 UINT 0 254 4 "Item4"\n')
tf.write(' APPEND_PARAMETER item2 8 UINT 0 200 2 "Item2"\n')
tf.write(' APPEND_PARAMETER item3 8 UINT 0 200 3 "Item3"\n')
tf.write(' APPEND_PARAMETER item4 8 UINT 0 200 4 "Item4"\n')
tf.write('COMMAND tgt1 pkt2 LITTLE_ENDIAN "TGT1 PKT2 Description"\n')
tf.write(' APPEND_ID_PARAMETER item1 8 UINT 2 2 2 "Item1"\n')
tf.write(' APPEND_PARAMETER item2 8 UINT 0 255 2 "Item2"\n')
tf.write(' STATE BAD1 0 HAZARDOUS "Hazardous"\n')
tf.write(" STATE BAD2 1 HAZARDOUS\n")
tf.write(" STATE GOOD 2 DISABLE_MESSAGES\n")
tf.write(' APPEND_PARAMETER item3 32 FLOAT 0 1 0 "Item3"\n')
tf.write(' STATE S1 0.0\n')
tf.write(' STATE S2 0.25\n')
tf.write(' STATE S3 0.5\n')
tf.write(' STATE S4 0.75\n')
tf.write(' STATE S5 1.0\n')
tf.write(' APPEND_PARAMETER item4 40 STRING "HELLO"\n')
tf.write(' STATE HI HELLO\n')
tf.write(' STATE WO WORLD\n')
tf.write(' STATE JA JASON\n')
tf.write('COMMAND tgt2 pkt3 LITTLE_ENDIAN "TGT2 PKT3 Description"\n')
tf.write(' HAZARDOUS "Hazardous"\n')
tf.write(' APPEND_ID_PARAMETER item1 8 UINT 3 3 3 "Item1"\n')
Expand All @@ -57,6 +67,10 @@ def setUp(self):
tf.write(' APPEND_ID_PARAMETER item1 8 UINT 5 5 5 "Item1"\n')
tf.write(' APPEND_PARAMETER item2 8 UINT 0 100 0 "Item2"\n')
tf.write(" POLY_WRITE_CONVERSION 0 2\n")
tf.write('COMMAND tgt2 pkt6 BIG_ENDIAN "TGT2 PKT6 Description"\n')
tf.write(' APPEND_ID_PARAMETER item1 16 UINT 6 6 6 "Item1"\n')
tf.write(' APPEND_PARAMETER item2 16 UINT MIN MAX 0 "Item2" LITTLE_ENDIAN\n')
tf.write(' APPEND_PARAMETER item3 16 UINT MIN MAX 0 "Item3"\n')
tf.seek(0)

pc = PacketConfig()
Expand Down Expand Up @@ -84,10 +98,11 @@ def test_packets_returns_all_packets_target_tgt1(self):

def test_packets_returns_all_packets_target_tgt2(self):
pkts = self.cmd.packets("TGT2")
self.assertEqual(len(pkts), 3)
self.assertEqual(len(pkts), 4)
self.assertIn("PKT3", pkts.keys())
self.assertIn("PKT4", pkts.keys())
self.assertIn("PKT5", pkts.keys())
self.assertIn("PKT6", pkts.keys())

def test_params_complains_about_non_existant_targets(self):
with self.assertRaisesRegex(RuntimeError, "Command target 'TGTX' does not exist"):
Expand Down Expand Up @@ -208,63 +223,166 @@ def test_identifies_tgt2_pkt1(self):
self.assertEqual(pkt.read("item2"), 2)

def test_build_cmd_complains_about_non_existant_targets(self):
with self.assertRaisesRegex(RuntimeError, "Command target 'TGTX' does not exist"):
self.cmd.build_cmd("tgtX", "pkt1")
for range_checking in [True, False]:
for raw in [True, False]:
with self.assertRaisesRegex(RuntimeError, "Command target 'TGTX' does not exist"):
self.cmd.build_cmd("tgtX", "pkt1", {}, range_checking, raw)

def test_build_cmd_complains_about_non_existant_packets(self):
with self.assertRaisesRegex(RuntimeError, "Command packet 'TGT1 PKTX' does not exist"):
self.cmd.build_cmd("tgt1", "pktX")
for range_checking in [True, False]:
for raw in [True, False]:
with self.assertRaisesRegex(RuntimeError, "Command packet 'TGT1 PKTX' does not exist"):
self.cmd.build_cmd("tgt1", "pktX", {}, range_checking, raw)

def test_build_cmd_complains_about_non_existant_items(self):
with self.assertRaisesRegex(AttributeError, "Packet item 'TGT1 PKT1 ITEMX' does not exist"):
self.cmd.build_cmd("tgt1", "pkt1", {"itemX": 1})
for range_checking in [True, False]:
for raw in [True, False]:
with self.assertRaisesRegex(AttributeError, "Packet item 'TGT1 PKT1 ITEMX' does not exist"):
self.cmd.build_cmd("tgt1", "pkt1", {"itemX": 1}, range_checking, raw)

def test_build_cmd_creates_a_populated_command_packet_with_default_values(self):
cmd = self.cmd.build_cmd("TGT1", "PKT1")
self.assertEqual(cmd.read("item1"), 1)
self.assertEqual(cmd.read("item2"), 2)
self.assertEqual(cmd.read("item3"), 3)
self.assertEqual(cmd.read("item4"), 4)
def test_build_cmd_complains_about_missing_required_parameters(self):
for range_checking in [True, False]:
for raw in [True, False]:
with self.assertRaisesRegex(RuntimeError, "Required command parameter 'TGT2 PKT3 ITEM2' not given"):
self.cmd.build_cmd("tgt2", "pkt3", {}, range_checking, raw)

def test_creates_a_command_packet_with_mixed_endianness(self):
for range_checking in [True, False]:
for raw in [True, False]:
items = { "ITEM2": 0xABCD, "ITEM3": 0x6789 }
cmd = self.cmd.build_cmd("TGT2", "PKT6", items, range_checking, raw)
self.assertEqual(cmd.read("item1"), 6)
self.assertEqual(cmd.read("item2"), 0xABCD)
self.assertEqual(cmd.read("item3"), 0x6789)
self.assertEqual(cmd.buffer, b"\x00\x06\xCD\xAB\x67\x89")

def test_build_cmd_complains_about_out_of_range_item_values(self):
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT1 ITEM2' = 1000 not in valid range of 0 to 254",
):
self.cmd.build_cmd("tgt1", "pkt1", {"item2": 1000})

def test_build_cmd_handles_string_values(self):
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT1 ITEM2' = '10' not in valid range of 0 to 254",
):
self.cmd.build_cmd("tgt1", "pkt1", {"item2": "10"})

def test_build_cmd_ignores_out_of_range_item_values_if_requested(self):
cmd = self.cmd.build_cmd("tgt1", "pkt1", {"item2": 255}, False)
self.assertEqual(cmd.read("item1"), 1)
self.assertEqual(cmd.read("item2"), 255)
self.assertEqual(cmd.read("item3"), 3)
self.assertEqual(cmd.read("item4"), 4)
def test_build_cmd_resets_the_buffer_size(self):
for range_checking in [True, False]:
for raw in [True, False]:
packet = self.cmd.packet("TGT1", "PKT1")
packet.buffer = b"\x00" * (packet.defined_length + 1)
self.assertEqual(len(packet.buffer), 5)
items = {"ITEM2": 10}
cmd = self.cmd.build_cmd("TGT1", "PKT1", items, range_checking, raw)
self.assertEqual(cmd.read("ITEM2"), 10)
self.assertEqual(len(cmd.buffer), 4)

def test_build_cmd_creates_a_populated_command_packet_with_default_values(self):
for range_checking in [True, False]:
for raw in [True, False]:
cmd = self.cmd.build_cmd("TGT1", "PKT1", {}, range_checking, raw)
self.assertEqual(cmd.read("item1"), 1)
self.assertEqual(cmd.read("item2"), 2)
self.assertEqual(cmd.read("item3"), 3)
self.assertEqual(cmd.read("item4"), 4)

def test_build_cmd_creates_a_command_packet_with_override_item_values(self):
items = {"ITEM2": 10, "ITEM4": 11}
cmd = self.cmd.build_cmd("TGT1", "PKT1", items)
self.assertEqual(cmd.read("item1"), 1)
self.assertEqual(cmd.read("item2"), 10)
self.assertEqual(cmd.read("item3"), 3)
self.assertEqual(cmd.read("item4"), 11)
for range_checking in [True, False]:
for raw in [True, False]:
items = {"ITEM2": 10, "ITEM4": 11}
cmd = self.cmd.build_cmd("TGT1", "PKT1", items, range_checking, raw)
self.assertEqual(cmd.read("item1"), 1)
self.assertEqual(cmd.read("item2"), 10)
self.assertEqual(cmd.read("item3"), 3)
self.assertEqual(cmd.read("item4"), 11)

def test_build_cmd_creates_a_command_packet_with_override_item_value_states(self):
items = {"ITEM2": "GOOD"}
cmd = self.cmd.build_cmd("TGT1", "PKT2", items)
self.assertEqual(cmd.read("item1"), 2)
self.assertEqual(cmd.read("item2"), "GOOD")
self.assertEqual(cmd.read("ITEM2", "RAW"), 2)
for range_checking in [True, False]:
for raw in [True, False]:
if raw:
items = {"ITEM2": 2, "ITEM3": 0.5, "ITEM4": "WORLD"}
else:
# Converted (not raw) can take either states or values
items = {"ITEM2": 2, "ITEM3": "S3", "ITEM4": "WO"}
cmd = self.cmd.build_cmd("TGT1", "PKT2", items, range_checking, raw)
self.assertEqual(cmd.read("item1"), 2)
self.assertEqual(cmd.read("item2"), "GOOD")
self.assertEqual(cmd.read("ITEM2", "RAW"), 2)
self.assertEqual(cmd.read("item3"), "S3")
self.assertEqual(cmd.read("ITEM3", "RAW"), 0.5)
self.assertEqual(cmd.read("item4"), "WO")
self.assertEqual(cmd.read("ITEM4", "RAW"), "WORLD")

def test_build_cmd_complains_about_missing_required_parameters(self):
with self.assertRaisesRegex(RuntimeError, "Required command parameter 'TGT2 PKT3 ITEM2' not given"):
self.cmd.build_cmd("tgt2", "pkt3")
def test_build_cmd_complains_about_out_of_range_item_values(self):
for raw in [True, False]:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT1 ITEM2' = 255 not in valid range of 0 to 200",
):
self.cmd.build_cmd("tgt1", "pkt1", {"item2": 255}, True, raw)

def test_build_cmd_complains_about_out_of_range_item_states(self):
for raw in [True, False]:
items = { "ITEM2": 3, "ITEM3": 0.0, "ITEM4": "WORLD" }
if raw:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT2 ITEM2' = 3 not one of 0, 1, 2",
):
self.cmd.build_cmd("tgt1", "pkt2", items, True, raw)
else:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT2 ITEM2' = 3 not one of BAD1, BAD2, GOOD",
):
self.cmd.build_cmd("tgt1", "pkt2", items, True, raw)

items = { "ITEM2": 0, "ITEM3": 2.0, "ITEM4": "WORLD" }
if raw:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT2 ITEM3' = 2.0 not one of 0.0, 0.25, 0.5, 0.75, 1.0",
):
self.cmd.build_cmd("tgt1", "pkt2", items, True, raw)
else:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT2 ITEM3' = 2.0 not one of S1, S2, S3, S4, S5",
):
self.cmd.build_cmd("tgt1", "pkt2", items, True, raw)

items = { "ITEM2": 0, "ITEM3": 0.0, "ITEM4": "TESTY" }
if raw:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT2 ITEM4' = TESTY not one of HELLO, WORLD, JASON",
):
self.cmd.build_cmd("tgt1", "pkt2", items, True, raw)
else:
with self.assertRaisesRegex(
RuntimeError,
"Command parameter 'TGT1 PKT2 ITEM4' = TESTY not one of HI, WO, JA",
):
self.cmd.build_cmd("tgt1", "pkt2", items, True, raw)

def test_build_cmd_ignores_about_out_of_range_item_values(self):
for raw in [True, False]:
cmd = self.cmd.build_cmd("tgt1", "pkt1", {"item2": 255}, False, raw)
self.assertEqual(cmd.read("item1"), 1)
self.assertEqual(cmd.read("item2"), 255)
self.assertEqual(cmd.read("item3"), 3)
self.assertEqual(cmd.read("item4"), 4)


def test_build_cmd_ignores_out_of_range_item_states(self):
for raw in [True, False]:
items = { "ITEM2": 3, "ITEM3": 0.0, "ITEM4": "WORLD" }
cmd = self.cmd.build_cmd("tgt1", "pkt2", items, False, raw)
self.assertEqual(cmd.read("item2", 'RAW'), 3)
self.assertEqual(cmd.read("item3", 'RAW'), 0.0)
self.assertEqual(cmd.read("item4", 'RAW'), 'WORLD')

items = { "ITEM2": 0, "ITEM3": 2.0, "ITEM4": "WORLD" }
cmd = self.cmd.build_cmd("tgt1", "pkt2", items, False, raw)
self.assertEqual(cmd.read("item2", 'RAW'), 0)
self.assertEqual(cmd.read("item3", 'RAW'), 2.0)
self.assertEqual(cmd.read("item4", 'RAW'), 'WORLD')

items = { "ITEM2": 0, "ITEM3": 0.0, "ITEM4": "TESTY" }
cmd = self.cmd.build_cmd("tgt1", "pkt2", items, False, raw)
self.assertEqual(cmd.read("item2", 'RAW'), 0)
self.assertEqual(cmd.read("item3", 'RAW'), 0.0)
self.assertEqual(cmd.read("item4", 'RAW'), 'TESTY')

def test_build_cmd_supports_building_raw_commands(self):
items = {"ITEM2": 10}
Expand All @@ -276,15 +394,6 @@ def test_build_cmd_supports_building_raw_commands(self):
self.assertEqual(cmd.raw, True)
self.assertEqual(cmd.read("ITEM2"), 10)

def test_build_cmd_resets_the_buffer_size(self):
packet = self.cmd.packet("TGT1", "PKT1")
packet.buffer = b"\x00" * (packet.defined_length + 1)
self.assertEqual(len(packet.buffer), 5)
items = {"ITEM2": 10}
cmd = self.cmd.build_cmd("TGT1", "PKT1", items)
self.assertEqual(cmd.read("ITEM2"), 10)
self.assertEqual(len(cmd.buffer), 4)

def test_format_creates_a_string_representation_of_a_command(self):
pkt = self.cmd.packet("TGT1", "PKT1")
self.assertEqual(
Expand Down
1 change: 0 additions & 1 deletion openc3/python/test/packets/test_packet_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ def test_complains_if_there_are_too_many_parameters(self):
case _:
tf.write(f"{keyword} extra\n")
tf.seek(0)
print(keyword)
with self.assertRaisesRegex(ConfigParser.Error, f"Too many parameters for {keyword}"):
self.pc.process_file(tf.name, "TGT1")
tf.close()
Expand Down
Loading
Loading