diff --git a/container_registry/container_analysis/snippets/samples_test.py b/container_registry/container_analysis/snippets/samples_test.py index e48a6ab84033..b8eb6c5fbcba 100644 --- a/container_registry/container_analysis/snippets/samples_test.py +++ b/container_registry/container_analysis/snippets/samples_test.py @@ -15,7 +15,9 @@ from os import environ from os.path import basename -from time import sleep, time +import threading +import time +import uuid from google.api_core.exceptions import AlreadyExists from google.api_core.exceptions import InvalidArgument @@ -35,13 +37,29 @@ TRY_LIMIT = 20 +class MessageReceiver: + """Custom class to handle incoming Pub/Sub messages.""" + def __init__(self, expected_msg_nums, done_event): + # initialize counter to 0 on initialization + self.msg_count = 0 + self.expected_msg_nums = expected_msg_nums + self.done_event = done_event + + def pubsub_callback(self, message): + # every time a pubsub message comes in, print it and count it + self.msg_count += 1 + print('Message {}: {}'.format(self.msg_count, message.data)) + message.ack() + if (self.msg_count == self.expected_msg_nums): + self.done_event.set() + + class TestContainerAnalysisSamples: def setup_method(self, test_method): print('SETUP {}'.format(test_method.__name__)) - timestamp = str(int(time())) - self.note_id = 'note-{}-{}'.format(timestamp, test_method.__name__) - self.image_url = '{}.{}'.format(timestamp, test_method.__name__) + self.note_id = 'note-{}'.format(uuid.uuid4()) + self.image_url = '{}.{}'.format(uuid.uuid4(), test_method.__name__) self.note_obj = samples.create_note(self.note_id, PROJECT_ID) def teardown_method(self, test_method): @@ -102,7 +120,7 @@ def test_occurrences_for_image(self): tries += 1 new_count = samples.get_occurrences_for_image(self.image_url, PROJECT_ID) - sleep(SLEEP_TIME) + time.sleep(SLEEP_TIME) assert new_count == 1 assert orig_count == 0 # clean up @@ -121,7 +139,7 @@ def test_occurrences_for_note(self): tries += 1 new_count = samples.get_occurrences_for_note(self.note_id, PROJECT_ID) - sleep(SLEEP_TIME) + time.sleep(SLEEP_TIME) assert new_count == 1 assert orig_count == 0 # clean up @@ -138,33 +156,31 @@ def test_pubsub(self): except AlreadyExists: pass - subscription_id = 'drydockOccurrences' + subscription_id = 'container-analysis-test-{}'.format(uuid.uuid4()) subscription_name = client.subscription_path(PROJECT_ID, subscription_id) samples.create_occurrence_subscription(subscription_id, PROJECT_ID) - tries = 0 - success = False - while not success and tries < TRY_LIMIT: - print(tries) - tries += 1 - receiver = samples.MessageReceiver() + + # I can not make it pass with multiple messages. My guess is + # the server started to dedup? + message_count = 1 + try: + job_done = threading.Event() + receiver = MessageReceiver(message_count, job_done) client.subscribe(subscription_name, receiver.pubsub_callback) - # test adding 3 more occurrences - total_created = 3 - for _ in range(total_created): - occ = samples.create_occurrence(self.image_url, - self.note_id, - PROJECT_ID, - PROJECT_ID) - sleep(SLEEP_TIME) + for i in range(message_count): + occ = samples.create_occurrence( + self.image_url, self.note_id, PROJECT_ID, PROJECT_ID) + time.sleep(SLEEP_TIME) samples.delete_occurrence(basename(occ.name), PROJECT_ID) - sleep(SLEEP_TIME) + time.sleep(SLEEP_TIME) + job_done.wait(timeout=60) print('done. msg_count = {}'.format(receiver.msg_count)) - success = receiver.msg_count == total_created - assert receiver.msg_count == total_created - # clean up - client.delete_subscription(subscription_name) + assert message_count <= receiver.msg_count + finally: + # clean up + client.delete_subscription(subscription_name) def test_poll_discovery_occurrence(self): # try with no discovery occurrence @@ -177,7 +193,7 @@ def test_poll_discovery_occurrence(self): assert False # create discovery occurrence - note_id = 'discovery-note-{}'.format(int(time())) + note_id = 'discovery-note-{}'.format(uuid.uuid4()) client = containeranalysis_v1.ContainerAnalysisClient() grafeas_client = client.get_grafeas_client() note = { @@ -225,7 +241,7 @@ def test_find_vulnerabilities_for_image(self): occ_list = samples.find_vulnerabilities_for_image(self.image_url, PROJECT_ID) count = len(occ_list) - sleep(SLEEP_TIME) + time.sleep(SLEEP_TIME) assert len(occ_list) == 1 samples.delete_occurrence(basename(created.name), PROJECT_ID) @@ -236,7 +252,7 @@ def test_find_high_severity_vulnerabilities(self): assert len(occ_list) == 0 # create new high severity vulnerability - note_id = 'discovery-note-{}'.format(int(time())) + note_id = 'discovery-note-{}'.format(uuid.uuid4()) client = containeranalysis_v1.ContainerAnalysisClient() grafeas_client = client.get_grafeas_client() note = { @@ -287,7 +303,7 @@ def test_find_high_severity_vulnerabilities(self): occ_list = samples.find_vulnerabilities_for_image(self.image_url, PROJECT_ID) count = len(occ_list) - sleep(SLEEP_TIME) + time.sleep(SLEEP_TIME) assert len(occ_list) == 1 # clean up samples.delete_occurrence(basename(created.name), PROJECT_ID)