Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ownership tests #55

Merged
merged 5 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions interoperability_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def run_subscriber_shape_main(
produced_code_index: int,
subscriber_index: int,
samples_sent: "list[multiprocessing.Queue]",
last_sample_saved: "list[multiprocessing.Queue]",
verbosity: bool,
timeout: int,
file: tempfile.TemporaryFile,
Expand All @@ -56,6 +57,9 @@ def run_subscriber_shape_main(
samples_sent <<in>>: list of multiprocessing Queues with the samples
the Publishers send. Element 1 of the list is for
Publisher 1, etc.
last_sample_saved <<in>>: list of multiprocessing Queues with the last
sample saved on samples_sent for each Publisher. Element 1 of
the list is for Publisher 1, etc.
verbosity <<in>>: print debug information.
timeout <<in>>: time pexpect waits until it matches a pattern.
file <<inout>>: temporal file to save shape_main application output.
Expand Down Expand Up @@ -157,7 +161,7 @@ def run_subscriber_shape_main(
# to the Subscriber. By default it does not check
# anything and returns ReturnCode.OK.
produced_code[produced_code_index] = check_function(
child_sub, samples_sent, timeout)
child_sub, samples_sent, last_sample_saved, timeout)

subscriber_finished.set() # set subscriber as finished
log_message(f'Subscriber {subscriber_index}: Waiting for Publishers to '
Expand All @@ -176,6 +180,7 @@ def run_publisher_shape_main(
produced_code_index: int,
publisher_index: int,
samples_sent: multiprocessing.Queue,
last_sample_saved: multiprocessing.Queue,
verbosity: bool,
timeout: int,
file: tempfile.TemporaryFile,
Expand All @@ -197,6 +202,8 @@ def run_publisher_shape_main(
publisher it is 1, for the second 2, etc.
samples_sent <<out>>: this variable contains the samples
the Publisher sends.
last_sample_saved <<out>>: this variable contains the last sample
saved on samples_sent.
verbosity <<in>>: print debug information.
timeout <<in>>: time pexpect waits until it matches a pattern.
file <<inout>>: temporal file to save shape_main application output.
Expand Down Expand Up @@ -296,12 +303,14 @@ def run_publisher_shape_main(
produced_code[produced_code_index] = ReturnCode.OK
log_message(f'Publisher {publisher_index}: Sending '
'samples', verbosity)
last_sample = ''
for x in range(0, MAX_SAMPLES_SAVED, 1):
# At this point, at least one sample has been printed
# Therefore, that sample is added to samples_sent.
pub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]',
child_pub.before + child_pub.after)
samples_sent.put(pub_string.group(0))
last_sample = pub_string.group(0)
samples_sent.put(last_sample)
index = child_pub.expect([
'\[[0-9]+\]', # index = 0
'on_offered_deadline_missed()', # index = 1
Expand All @@ -314,6 +323,7 @@ def run_publisher_shape_main(
elif index == 2:
produced_code[produced_code_index] = ReturnCode.DATA_NOT_SENT
break
last_sample_saved.put(last_sample)
else:
produced_code[produced_code_index] = ReturnCode.OK

Expand Down Expand Up @@ -395,6 +405,7 @@ def run_test(
return_codes = manager.list(range(num_entities))
samples_sent = [] # used for storing the samples the Publishers send.
# It is a list with one Queue for each Publisher.
last_sample_saved = [] # used for storing the last value sent by each Publisher.

# list of multiprocessing Events used as semaphores to control the end of
# the processes, one for each entity.
Expand All @@ -419,6 +430,7 @@ def run_test(
if ('-P ' in element or element.endswith('-P')):
publishers_finished.append(multiprocessing.Event())
samples_sent.append(multiprocessing.Queue())
last_sample_saved.append(multiprocessing.Queue())
elif ('-S ' in element or element.endswith('-S')):
subscribers_finished.append(multiprocessing.Event())
else:
Expand All @@ -438,6 +450,7 @@ def run_test(
'produced_code_index':i,
'publisher_index':publisher_number+1,
'samples_sent':samples_sent[publisher_number],
'last_sample_saved':last_sample_saved[publisher_number],
'verbosity':verbosity,
'timeout':timeout,
'file':temporary_file[i],
Expand All @@ -461,6 +474,7 @@ def run_test(
'produced_code_index':i,
'subscriber_index':subscriber_number+1,
'samples_sent':samples_sent,
'last_sample_saved':last_sample_saved,
'verbosity':verbosity,
'timeout':timeout,
'file':temporary_file[i],
Expand Down
2 changes: 1 addition & 1 deletion rtps_test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ def remove_ansi_colors(text):
cleaned_str = ansi_escape.sub('', text)
return cleaned_str

def no_check(child_sub, samples_sent, timeout):
def no_check(child_sub, samples_sent, last_sample_saved, timeout):
return ReturnCode.OK
51 changes: 43 additions & 8 deletions test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
# is received in order, or that OWNERSHIP works properly, etc...
MAX_SAMPLES_READ = 500

def test_ownership_receivers(child_sub, samples_sent, timeout):
def test_ownership_receivers(child_sub, samples_sent, last_sample_saved, timeout):

"""
This function is used by test cases that have two publishers and one subscriber.
Expand All @@ -63,6 +63,9 @@ def test_ownership_receivers(child_sub, samples_sent, timeout):
samples_sent: list of multiprocessing Queues with the samples
the publishers send. Element 1 of the list is for
publisher 1, etc.
last_sample_saved: list of multiprocessing Queues with the last
sample saved on samples_sent for each Publisher. Element 1 of
the list is for Publisher 1, etc.
timeout: time pexpect waits until it matches a pattern.

This functions assumes that the subscriber has already received samples
Expand All @@ -76,6 +79,9 @@ def test_ownership_receivers(child_sub, samples_sent, timeout):
list_data_received_first = []
max_samples_received = MAX_SAMPLES_READ
samples_read = 0
list_samples_processed = []
last_first_sample = '';
last_second_sample = '';

while(samples_read < max_samples_received):
# take the topic, color, position and size of the ShapeType.
Expand Down Expand Up @@ -110,16 +116,39 @@ def test_ownership_receivers(child_sub, samples_sent, timeout):
except queue.Empty:
pass

# Take the last sample published by each publisher from their queues
# ('last_sample_saved[i]') and save them local variables.
try:
last_first_sample = last_sample_saved[0].get(block=False)
except queue.Empty:
pass

try:
last_second_sample = last_sample_saved[1].get(block=False)
except queue.Empty:
pass

# Determine to which publisher the current sample belong to
if sub_string.group(0) in list_data_received_second:
current_sample_from_publisher = 2
elif sub_string.group(0) in list_data_received_first:
current_sample_from_publisher = 1
else:
# If the sample is not in any queue, wait a bit and continue
# If the sample is not in any queue, break the loop if the
# the last sample for any publisher has already been processed.
if last_first_sample in list_samples_processed:
break
if last_second_sample in list_samples_processed:
break
print(f'Last samples: {last_first_sample}, {last_second_sample}')
# Otherwise, wait a bit and continue
time.sleep(0.1)
continue

# Keep all samples processed in a single list, so we can check whether
# the last sample published by any publisher has already been processed
list_samples_processed.append(sub_string.group(0))

# If the app hit this point, it is because the previous subscriber
# sample has been already read. Then, we can process the next sample
# read by the subscriber.
Expand Down Expand Up @@ -172,7 +201,7 @@ def test_ownership_receivers(child_sub, samples_sent, timeout):
print(f'Samples read: {samples_read}')
return ReturnCode.RECEIVING_FROM_ONE

def test_color_receivers(child_sub, samples_sent, timeout):
def test_color_receivers(child_sub, samples_sent, last_sample_saved, timeout):

"""
This function is used by test cases that have two publishers and one
Expand All @@ -182,6 +211,7 @@ def test_color_receivers(child_sub, samples_sent, timeout):

child_sub: child program generated with pexpect
samples_sent: not used
last_sample_saved: not used
timeout: time pexpect waits until it matches a pattern.
"""
sub_string = re.search('\w\s+(\w+)\s+[0-9]+ [0-9]+ \[[0-9]+\]',
Expand Down Expand Up @@ -217,13 +247,14 @@ def test_color_receivers(child_sub, samples_sent, timeout):
print(f'Samples read: {samples_read}')
return ReturnCode.RECEIVING_FROM_ONE

def test_reliability_order(child_sub, samples_sent, timeout):
def test_reliability_order(child_sub, samples_sent, last_sample_saved, timeout):
"""
This function tests reliability, it checks whether the subscriber receives
the samples in order.

child_sub: child program generated with pexpect
samples_sent: not used
last_sample_saved: not used
timeout: not used
"""

Expand Down Expand Up @@ -267,7 +298,7 @@ def test_reliability_order(child_sub, samples_sent, timeout):
return produced_code


def test_reliability_no_losses(child_sub, samples_sent, timeout):
def test_reliability_no_losses(child_sub, samples_sent, last_sample_saved, timeout):
"""
This function tests RELIABLE reliability, it checks whether the subscriber
receives the samples in order and with no losses.
Expand All @@ -276,6 +307,7 @@ def test_reliability_no_losses(child_sub, samples_sent, timeout):
samples_sent: list of multiprocessing Queues with the samples
the publishers send. Element 1 of the list is for
publisher 1, etc.
last_sample_saved: not used
timeout: time pexpect waits until it matches a pattern.
"""

Expand Down Expand Up @@ -352,7 +384,7 @@ def test_reliability_no_losses(child_sub, samples_sent, timeout):
return produced_code


def test_durability_volatile(child_sub, samples_sent, timeout):
def test_durability_volatile(child_sub, samples_sent, last_sample_saved, timeout):
"""
This function tests the volatile durability, it checks that the sample the
subscriber receives is not the first one. The publisher application sends
Expand All @@ -365,6 +397,7 @@ def test_durability_volatile(child_sub, samples_sent, timeout):

child_sub: child program generated with pexpect
samples_sent: not used
last_sample_saved: not used
timeout: not used
"""

Expand All @@ -387,7 +420,7 @@ def test_durability_volatile(child_sub, samples_sent, timeout):

return produced_code

def test_durability_transient_local(child_sub, samples_sent, timeout):
def test_durability_transient_local(child_sub, samples_sent, last_sample_saved, timeout):
"""
This function tests the TRANSIENT_LOCAL durability, it checks that the
sample the subscriber receives is the first one. The publisher application
Expand All @@ -396,6 +429,7 @@ def test_durability_transient_local(child_sub, samples_sent, timeout):

child_sub: child program generated with pexpect
samples_sent: not used
last_sample_saved: not used
timeout: not used
"""

Expand All @@ -416,14 +450,15 @@ def test_durability_transient_local(child_sub, samples_sent, timeout):
return produced_code


def test_deadline_missed(child_sub, samples_sent, timeout):
def test_deadline_missed(child_sub, samples_sent, last_sample_saved, timeout):
"""
This function tests whether the subscriber application misses the requested
deadline or not. This is needed in case the subscriber application receives
some samples and then missed the requested deadline.

child_sub: child program generated with pexpect
samples_sent: not used
last_sample_saved: not used
timeout: time pexpect waits until it matches a pattern
"""

Expand Down