From 2810fa11cdf8cfec05ccb9bb2e9f4f2fb122efdf Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 7 Nov 2023 18:10:25 +0200 Subject: [PATCH 01/18] small improvements --- src/node/waku_node.py | 13 +++++++------ src/steps/relay.py | 4 ++++ tests/relay/test_publish.py | 23 +++++++++++++++++------ 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 13fb3dd837..8477c8947f 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -16,12 +16,18 @@ def __init__(self, docker_image, docker_log_prefix=""): self._log_path = os.path.join(LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._docker_manager = DockerManager(self._image_name) self._container = None + logger.debug("WakuNode instance initialized with log path %s", self._log_path) + + @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) + def start(self, **kwargs): + logger.debug("Starting Node...") + self._docker_manager.create_network() self._ext_ip = self._docker_manager.generate_random_ext_ip() self._ports = self._docker_manager.generate_ports() self._rest_port = self._ports[0] self._rpc_port = self._ports[1] self._websocket_port = self._ports[2] - logger.debug("WakuNode instance initialized with log path %s", self._log_path) + if PROTOCOL == "RPC": self._api = RPC(self._rpc_port, self._image_name) elif PROTOCOL == "REST": @@ -29,11 +35,6 @@ def __init__(self, docker_image, docker_log_prefix=""): else: raise ValueError(f"Unknown protocol: {PROTOCOL}") - @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) - def start(self, **kwargs): - logger.debug("Starting Node...") - self._docker_manager.create_network() - default_args = { "listen-address": "0.0.0.0", "rpc": "true", diff --git a/src/steps/relay.py b/src/steps/relay.py index 6ba381c596..d45c7d1a1f 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -26,6 +26,10 @@ def setup_nodes(self, request): @allure.step @retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True) + def check_published_message_reaches_peer_with_retry(self, message): + self.check_published_message_reaches_peer(message) + + @allure.step def check_published_message_reaches_peer(self, message): message.timestamp = int(time() * 1e9) self.node1.send_message(message, self.test_pubsub_topic) diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 0e88560e82..f8504eb883 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -11,23 +11,34 @@ class TestRelayPublish(StepsRelay): def test_publish_with_various_payloads(self): failed_payloads = [] - for payload in SAMPLE_INPUTS: + for index, payload in enumerate(SAMPLE_INPUTS): logger.debug("Running test with payload %s", payload["description"]) message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.test_content_topic) try: - self.check_published_message_reaches_peer(message) + # The node may require a warmup period for message processing to stabilize. + # Therefore, we use the retry function for the first payload to account for this warmup. + if index == 0: + self.check_published_message_reaches_peer_with_retry(message) + else: + self.check_published_message_reaches_peer(message) except Exception as e: - logger.error("Payload %s failed: %s", {payload["description"]}, {str(e)}) - failed_payloads.append(payload) + logger.error("Payload %s failed: %s", payload["description"], str(e)) + failed_payloads.append(payload["description"]) + assert not failed_payloads, f"Payloads failed: {failed_payloads}" def test_publish_with_various_content_topics(self): failed_content_topics = [] - for content_topic in SAMPLE_INPUTS: + for index, content_topic in enumerate(SAMPLE_INPUTS): logger.debug("Running test with content topic %s", content_topic["description"]) message = MessageRpcQuery(payload=to_base64(self.test_payload), contentTopic=content_topic["value"]) try: - self.check_published_message_reaches_peer(message) + # The node may require a warmup period for message processing to stabilize. + # Therefore, we use the retry function for the first payload to account for this warmup. + if index == 0: + self.check_published_message_reaches_peer_with_retry(message) + else: + self.check_published_message_reaches_peer(message) except Exception as e: logger.error("ContentTopic %s failed: %s", {content_topic["description"]}, {str(e)}) failed_content_topics.append(content_topic) From dec5e1a4a77996b7b6347c4b0b8499e61a022818 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Wed, 8 Nov 2023 13:38:28 +0200 Subject: [PATCH 02/18] minor adjustments --- .github/workflows/test.yml | 4 ++-- src/node/api_clients/base_client.py | 2 +- src/node/waku_node.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d6a9c08e7d..4125995326 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,7 +11,7 @@ on: node1: required: false type: string - default: "wakuorg/nwaku:deploy-wakuv2-test" + default: "wakuorg/nwaku:latest" node2: required: false type: string @@ -27,7 +27,7 @@ on: env: FORCE_COLOR: "1" - NODE_1: ${{ inputs.node1 || 'wakuorg/nwaku:deploy-wakuv2-test' }} + NODE_1: ${{ inputs.node1 || 'wakuorg/nwaku:latest' }} NODE_2: ${{ inputs.node2 || 'wakuorg/go-waku:latest' }} PROTOCOL: ${{ inputs.protocol || 'REST' }} diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 6be538f815..d89ca9ed9b 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -11,7 +11,7 @@ class BaseClient(ABC): # useful when running tests in parallel, where occasional network-related errors such as # connection drops, timeouts, or temporary unavailability of a service can occur. Retrying # ensures that such intermittent issues don't cause the tests to fail outright. - @retry(stop=stop_after_delay(2), wait=wait_fixed(0.1), reraise=True) + @retry(stop=stop_after_delay(1), wait=wait_fixed(0.1), reraise=True) def make_request(self, method, url, headers=None, data=None): logger.debug("%s call: %s with payload: %s", method.upper(), url, data) response = requests.request(method.upper(), url, headers=headers, data=data) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 8477c8947f..a0946e71f0 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -56,7 +56,7 @@ def start(self, **kwargs): if "go-waku" in self._docker_manager.image: go_waku_args = { - "min-relay-peers-to-publish": "0", + "min-relay-peers-to-publish": "1", "legacy-filter": "false", "log-level": "DEBUG", } From c5e506d981233dde81a38495d41aa40908badba2 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 9 Nov 2023 17:56:56 +0200 Subject: [PATCH 03/18] new tests --- pytest.ini | 1 + requirements.txt | 2 + src/data_classes.py | 14 +--- src/node/api_clients/base_client.py | 2 +- src/node/api_clients/rest.py | 2 +- src/node/api_clients/rpc.py | 2 +- src/node/waku_node.py | 18 ++++ src/steps/relay.py | 39 ++++++--- src/test_data.py | 122 +++++++++++++++++++--------- tests/conftest.py | 11 ++- tests/relay/test_publish.py | 112 ++++++++++++++++++++----- 11 files changed, 236 insertions(+), 89 deletions(-) diff --git a/pytest.ini b/pytest.ini index 313afa0aa3..a876cae59a 100644 --- a/pytest.ini +++ b/pytest.ini @@ -5,3 +5,4 @@ log_cli = True log_file = log/test.log log_cli_format = %(asctime)s %(name)s %(levelname)s %(message)s log_file_format = %(asctime)s %(name)s %(levelname)s %(message)s +timeout = 300 diff --git a/requirements.txt b/requirements.txt index 38e3913adb..bab8931349 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,8 +6,10 @@ pre-commit pyright pytest pytest-instafail +pytest-timeout pytest-xdist pytest-rerunfailures python-dotenv requests tenacity +typeguard \ No newline at end of file diff --git a/src/data_classes.py b/src/data_classes.py index 4266a55ba9..6527250e0f 100644 --- a/src/data_classes.py +++ b/src/data_classes.py @@ -1,13 +1,6 @@ from dataclasses import dataclass, field from marshmallow_dataclass import class_schema -from typing import Optional - - -@dataclass -class MessageRpcQuery: - payload: str - contentTopic: str - timestamp: Optional[int] = None +from typing import Optional, Union @dataclass @@ -15,9 +8,10 @@ class MessageRpcResponse: payload: str contentTopic: str version: Optional[int] - timestamp: int + timestamp: Optional[int] ephemeral: Optional[bool] - rateLimitProof: Optional[dict] = field(default_factory=dict) + meta: Optional[str] + rateLimitProof: Optional[Union[dict, str]] = field(default_factory=dict) rate_limit_proof: Optional[dict] = field(default_factory=dict) diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index d89ca9ed9b..277c40ac41 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -11,7 +11,7 @@ class BaseClient(ABC): # useful when running tests in parallel, where occasional network-related errors such as # connection drops, timeouts, or temporary unavailability of a service can occur. Retrying # ensures that such intermittent issues don't cause the tests to fail outright. - @retry(stop=stop_after_delay(1), wait=wait_fixed(0.1), reraise=True) + @retry(stop=stop_after_delay(0.5), wait=wait_fixed(0.1), reraise=True) def make_request(self, method, url, headers=None, data=None): logger.debug("%s call: %s with payload: %s", method.upper(), url, data) response = requests.request(method.upper(), url, headers=headers, data=data) diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index 47ecbf79c9..68d43a81ad 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -24,7 +24,7 @@ def set_subscriptions(self, pubsub_topics): return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) def send_message(self, message, pubsub_topic): - return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(asdict(message))) + return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message)) def get_messages(self, pubsub_topic): get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}") diff --git a/src/node/api_clients/rpc.py b/src/node/api_clients/rpc.py index 2ed32c5e94..7d79daa2b7 100644 --- a/src/node/api_clients/rpc.py +++ b/src/node/api_clients/rpc.py @@ -28,7 +28,7 @@ def set_subscriptions(self, pubsub_topics): return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics]) def send_message(self, message, pubsub_topic): - return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, asdict(message)]) + return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message]) def get_messages(self, pubsub_topic): get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic]) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index a0946e71f0..c3c1d5ac5f 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -100,3 +100,21 @@ def send_message(self, message, pubsub_topic=DEFAULT_PUBSUBTOPIC): def get_messages(self, pubsub_topic=DEFAULT_PUBSUBTOPIC): return self._api.get_messages(pubsub_topic) + + @property + def image(self): + return self._image_name + + def type(self): + if self.is_nwaku(): + return "nwaku" + elif self.is_gowaku(): + return "gowaku" + else: + raise Exception("Unknown node type!!!") + + def is_nwaku(self): + return "nwaku" in self.image + + def is_gowaku(self): + return "go-waku" in self.image diff --git a/src/steps/relay.py b/src/steps/relay.py index d45c7d1a1f..f284f963d7 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -1,7 +1,9 @@ import logging +import math from time import sleep, time import pytest import allure +from src.libs.common import to_base64 from src.data_classes import message_rpc_response_schema from src.env_vars import NODE_1, NODE_2 from src.node.waku_node import WakuNode @@ -13,30 +15,43 @@ class StepsRelay: @pytest.fixture(scope="function", autouse=True) def setup_nodes(self, request): - self.node1 = WakuNode(NODE_1, request.cls.test_id) + self.node1 = WakuNode(NODE_1, "node1_" + request.cls.test_id) self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true") enr_uri = self.node1.info()["enrUri"] - self.node2 = WakuNode(NODE_2, request.cls.test_id) + self.node2 = WakuNode(NODE_2, "node2_" + request.cls.test_id) self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=enr_uri, peer_exchange="true") - self.test_pubsub_topic = "test" - self.test_content_topic = "/test/1/waku-relay" + self.test_pubsub_topic = "/waku/2/rs/18/1" + self.test_content_topic = "/test/1/waku-relay/proto" self.test_payload = "Relay works!!" self.node1.set_subscriptions([self.test_pubsub_topic]) self.node2.set_subscriptions([self.test_pubsub_topic]) - @allure.step - @retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True) - def check_published_message_reaches_peer_with_retry(self, message): - self.check_published_message_reaches_peer(message) + @pytest.fixture(scope="function", autouse=True) + @retry(stop=stop_after_delay(40), wait=wait_fixed(1), reraise=True) + def wait_for_network_to_warm_up(self): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + try: + self.check_published_message_reaches_peer(message) + except Exception as ex: + raise Exception(f"WARM UP FAILED WITH: {ex}") @allure.step def check_published_message_reaches_peer(self, message): - message.timestamp = int(time() * 1e9) self.node1.send_message(message, self.test_pubsub_topic) sleep(0.1) get_messages_response = self.node2.get_messages(self.test_pubsub_topic) logger.debug("Got reponse from remote peer %s", get_messages_response) + assert get_messages_response, "Peer node couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) - assert received_message.payload == message.payload - assert received_message.contentTopic == message.contentTopic - assert received_message.timestamp == message.timestamp + assert ( + received_message.payload == message["payload"] + ), f'Incorrect payload. Published {message["payload"]} Received {received_message.payload}' + assert ( + received_message.contentTopic == message["contentTopic"] + ), f'Incorrect contentTopic. Published {message["contentTopic"]} Received {received_message.contentTopic}' + if "timestamp" in message and message["timestamp"]: + assert_fail_message = f'Incorrect timestamp. Published {message["timestamp"]} Received {received_message.timestamp}' + if isinstance(message["timestamp"], float): + assert math.isclose(float(received_message.timestamp), message["timestamp"], rel_tol=1e-9), assert_fail_message + else: + assert str(received_message.timestamp) == str(message["timestamp"]), assert_fail_message diff --git a/src/test_data.py b/src/test_data.py index 0a777099ef..d4a4159d86 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -1,42 +1,84 @@ +from time import time +from datetime import datetime, timedelta + +NOW = datetime.now() + SAMPLE_INPUTS = [ - {"description": "A simple string.", "value": "Hello World!"}, - {"description": "An integer.", "value": "1234567890"}, - {"description": "A dictionary.", "value": '{"key": "value"}'}, - {"description": "Chinese characters.", "value": "这是一些中文"}, - {"description": "Emojis.", "value": "🚀🌟✨"}, - {"description": "Lorem ipsum text.", "value": "Lorem ipsum dolor sit amet"}, - {"description": "HTML content.", "value": "Hello"}, - {"description": "Cyrillic characters.", "value": "\u041f\u0440\u0438\u0432\u0435\u0442"}, - {"description": "Base64 encoded string.", "value": "Base64==dGVzdA=="}, - {"description": "Binary data.", "value": "d29ya2luZyB3aXRoIGJpbmFyeSBkYXRh: \x50\x51"}, - {"description": "Special characters with whitespace.", "value": "\t\nSpecial\tCharacters\n"}, - {"description": "Boolean false as a string.", "value": "False"}, - {"description": "A float number.", "value": "3.1415926535"}, - {"description": "A list.", "value": "[1, 2, 3, 4, 5]"}, - {"description": "Hexadecimal number as a string.", "value": "0xDEADBEEF"}, - {"description": "Email format.", "value": "user@example.com"}, - {"description": "URL format.", "value": "http://example.com"}, - {"description": "Date and time in ISO format.", "value": "2023-11-01T12:00:00Z"}, - {"description": "String with escaped quotes.", "value": '"Escaped" \\"quotes\\"'}, - {"description": "A regular expression.", "value": "Regular expression: ^[a-z0-9_-]{3,16}$"}, - {"description": "A very long string.", "value": "x" * 1000}, - {"description": "A JSON string.", "value": '{"name": "John", "age": 30, "city": "New York"}'}, - {"description": "A Unix path.", "value": "/usr/local/bin"}, - {"description": "A Windows path.", "value": "C:\\Windows\\System32"}, - {"description": "An SQL query.", "value": "SELECT * FROM users WHERE id = 1;"}, - {"description": "JavaScript code snippet.", "value": "function test() { console.log('Hello World'); }"}, - {"description": "A CSS snippet.", "value": "body { background-color: #fff; }"}, - {"description": "A Python one-liner.", "value": "print('Hello World')"}, - {"description": "An IP address.", "value": "192.168.1.1"}, - {"description": "A domain name.", "value": "www.example.com"}, - {"description": "A user agent string.", "value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}, - {"description": "A credit card number.", "value": "1234-5678-9012-3456"}, - {"description": "A phone number.", "value": "+1234567890"}, - {"description": "A UUID.", "value": "123e4567-e89b-12d3-a456-426614174000"}, - {"description": "A hashtag.", "value": "#helloWorld"}, - {"description": "A Twitter handle.", "value": "@username"}, - {"description": "A password.", "value": "P@ssw0rd!"}, - {"description": "A date in common format.", "value": "01/11/2023"}, - {"description": "A time string.", "value": "12:00:00"}, - {"description": "A mathematical equation.", "value": "E = mc^2"}, + {"description": "A simple string", "value": "Hello World!"}, + {"description": "An integer", "value": "1234567890"}, + {"description": "A dictionary", "value": '{"key": "value"}'}, + {"description": "Chinese characters", "value": "这是一些中文"}, + {"description": "Emojis", "value": "🚀🌟✨"}, + {"description": "Lorem ipsum text", "value": "Lorem ipsum dolor sit amet"}, + {"description": "HTML content", "value": "Hello"}, + {"description": "Cyrillic characters", "value": "\u041f\u0440\u0438\u0432\u0435\u0442"}, + {"description": "Base64 encoded string", "value": "Base64==dGVzdA=="}, + {"description": "Binary data", "value": "d29ya2luZyB3aXRoIGJpbmFyeSBkYXRh: \x50\x51"}, + {"description": "Special characters with whitespace", "value": "\t\nSpecial\tCharacters\n"}, + {"description": "Boolean false as a string", "value": "False"}, + {"description": "A float number", "value": "3.1415926535"}, + {"description": "A list", "value": "[1, 2, 3, 4, 5]"}, + {"description": "Hexadecimal number as a string", "value": "0xDEADBEEF"}, + {"description": "Email format", "value": "user@example.com"}, + {"description": "URL format", "value": "http://example.com"}, + {"description": "Date and time in ISO format", "value": "2023-11-01T12:00:00Z"}, + {"description": "String with escaped quotes", "value": '"Escaped" \\"quotes\\"'}, + {"description": "A regular expression", "value": "Regular expression: ^[a-z0-9_-]{3,16}$"}, + {"description": "A very long string", "value": "x" * 1000}, + {"description": "A JSON string", "value": '{"name": "John", "age": 30, "city": "New York"}'}, + {"description": "A Unix path", "value": "/usr/local/bin"}, + {"description": "A Windows path", "value": "C:\\Windows\\System32"}, + {"description": "An SQL query", "value": "SELECT * FROM users WHERE id = 1;"}, + {"description": "JavaScript code snippet", "value": "function test() { console.log('Hello World'); }"}, + {"description": "A CSS snippet", "value": "body { background-color: #fff; }"}, + {"description": "A Python one-liner", "value": "print('Hello World')"}, + {"description": "An IP address", "value": "192.168.1.1"}, + {"description": "A domain name", "value": "www.example.com"}, + {"description": "A user agent string", "value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}, + {"description": "A credit card number", "value": "1234-5678-9012-3456"}, + {"description": "A phone number", "value": "+1234567890"}, + {"description": "A UUID", "value": "123e4567-e89b-12d3-a456-426614174000"}, + {"description": "A hashtag", "value": "#helloWorld"}, + {"description": "A Twitter handle", "value": "@username"}, + {"description": "A password", "value": "P@ssw0rd!"}, + {"description": "A date in common format", "value": "01/11/2023"}, + {"description": "A time string", "value": "12:00:00"}, + {"description": "A mathematical equation", "value": "E = mc^2"}, +] + +INVALID_PAYLOADS = [ + {"description": "Unecoded text", "value": "Hello World!"}, + {"description": "A dictionary", "value": {"key": "YWFh"}}, + {"description": "An integer", "value": 1234567890}, + {"description": "A list", "value": ["YWFh"]}, + {"description": "A bool", "value": True}, +] + +INVALID_CONTENT_TOPICS = [ + {"description": "A dictionary", "value": {"key": "YWFh"}}, + {"description": "An integer", "value": 1234567890}, + {"description": "A list", "value": ["YWFh"]}, + {"description": "A bool", "value": True}, +] + + +SAMPLE_TIMESTAMPS = [ + {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]}, + { + "description": "Far future", + "value": int((NOW + timedelta(days=365 * 10)).timestamp() * 1e9), + "valid_for": ["nwaku", "gowaku"], + }, # 10 years from now + {"description": "Recent past", "value": int((NOW - timedelta(hours=1)).timestamp() * 1e9), "valid_for": ["nwaku", "gowaku"]}, # 1 hour ago + {"description": "Near future", "value": int((NOW + timedelta(hours=1)).timestamp() * 1e9), "valid_for": ["nwaku", "gowaku"]}, # 1 hour ahead + {"description": "Positive number", "value": 1, "valid_for": ["nwaku", "gowaku"]}, + {"description": "Negative number", "value": -1, "valid_for": ["nwaku", "gowaku"]}, + {"description": "DST change", "value": int(datetime(2020, 3, 8, 2, 0, 0).timestamp() * 1e9), "valid_for": ["nwaku", "gowaku"]}, # DST starts + {"description": "Timestamp as string number", "value": str(int(time() * 1e9)), "valid_for": ["gowaku"]}, + {"description": "Invalid large number", "value": 2**63, "valid_for": []}, + {"description": "Float number", "value": float(time() * 1e9), "valid_for": ["gowaku"]}, + {"description": "Array instead of timestamp", "value": [int(time() * 1e9)], "valid_for": []}, + {"description": "Object instead of timestamp", "value": {"time": int(time() * 1e9)}, "valid_for": []}, + {"description": "ISO 8601 timestamp", "value": "2023-12-26T10:58:51", "valid_for": []}, + {"description": "Missing", "value": None, "valid_for": ["gowaku"]}, ] diff --git a/tests/conftest.py b/tests/conftest.py index e12f04200d..a1bb104655 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,7 +48,7 @@ def test_setup(request, test_id): @pytest.fixture(scope="function", autouse=True) def attach_logs_on_fail(request): yield - if request.node.rep_call.failed: + if hasattr(request.node, "rep_call") and request.node.rep_call.failed: logger.debug("Test failed, attempting to attach logs to the allure reports") for file in glob.glob(os.path.join(env_vars.LOG_DIR, request.cls.test_id + "*")): attach_allure_file(file) @@ -58,5 +58,12 @@ def attach_logs_on_fail(request): def close_open_nodes(): DS.waku_nodes = [] yield + crashed_containers = [] for node in DS.waku_nodes: - node.stop() + try: + node.stop() + except Exception as ex: + if "No such container" in str(ex): + crashed_containers.append(node.image) + logger.error("Failed to stop container because of error %s", ex) + assert not crashed_containers, f"Containers {crashed_containers} crashed during the test!!!" diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index f8504eb883..596037b7e1 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,45 +1,113 @@ import logging - +from time import time from src.libs.common import to_base64 -from src.data_classes import MessageRpcQuery from src.steps.relay import StepsRelay -from src.test_data import SAMPLE_INPUTS +from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS logger = logging.getLogger(__name__) class TestRelayPublish(StepsRelay): - def test_publish_with_various_payloads(self): + def test_publish_with_valid_payloads(self): failed_payloads = [] - for index, payload in enumerate(SAMPLE_INPUTS): + for payload in SAMPLE_INPUTS: logger.debug("Running test with payload %s", payload["description"]) - message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.test_content_topic) + message = {"payload": to_base64(payload["value"]), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} try: - # The node may require a warmup period for message processing to stabilize. - # Therefore, we use the retry function for the first payload to account for this warmup. - if index == 0: - self.check_published_message_reaches_peer_with_retry(message) - else: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_peer(message) except Exception as e: logger.error("Payload %s failed: %s", payload["description"], str(e)) failed_payloads.append(payload["description"]) - assert not failed_payloads, f"Payloads failed: {failed_payloads}" + def test_publish_with_invalid_payloads(self): + success_payloads = [] + for payload in INVALID_PAYLOADS: + logger.debug("Running test with payload %s", payload["description"]) + message = {"payload": payload["value"], "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + try: + self.node1.send_message(message, self.test_pubsub_topic) + success_payloads.append(payload) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_payloads, f"Invalid Payloads that didn't failed: {success_payloads}" + + def test_publish_with_missing_payload(self): + message = {"contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + try: + self.node1.send_message(message, self.test_pubsub_topic) + raise AssertionError("Publish with missing payload worked!!!") + except Exception as ex: + if self.node1.is_nwaku(): + assert "Bad Request" in str(ex) + elif self.node1.is_gowaku(): + assert "Internal Server Error" in str(ex) + else: + raise Exception("Not implemented") + def test_publish_with_various_content_topics(self): failed_content_topics = [] - for index, content_topic in enumerate(SAMPLE_INPUTS): + for content_topic in SAMPLE_INPUTS: logger.debug("Running test with content topic %s", content_topic["description"]) - message = MessageRpcQuery(payload=to_base64(self.test_payload), contentTopic=content_topic["value"]) + message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)} try: - # The node may require a warmup period for message processing to stabilize. - # Therefore, we use the retry function for the first payload to account for this warmup. - if index == 0: - self.check_published_message_reaches_peer_with_retry(message) - else: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_peer(message) except Exception as e: - logger.error("ContentTopic %s failed: %s", {content_topic["description"]}, {str(e)}) + logger.error("ContentTopic %s failed: %s", content_topic["description"], str(e)) failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + + def test_publish_with_invalid_content_topics(self): + success_content_topics = [] + for content_topic in INVALID_CONTENT_TOPICS: + logger.debug("Running test with contetn topic %s", content_topic["description"]) + message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)} + try: + self.node1.send_message(message, self.test_pubsub_topic) + success_content_topics.append(content_topic) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}" + + def test_publish_with_missing_content_topic(self): + message = {"payload": to_base64(self.test_payload), "timestamp": int(time() * 1e9)} + try: + self.node1.send_message(message, self.test_pubsub_topic) + raise AssertionError("Publish with missing content_topic worked!!!") + except Exception as ex: + if self.node1.is_nwaku(): + assert "Bad Request" in str(ex) + elif self.node1.is_gowaku(): + assert "Internal Server Error" in str(ex) + else: + raise Exception("Not implemented") + + def test_publish_with_valid_timestamps(self): + failed_timestamps = [] + for timestamp in SAMPLE_TIMESTAMPS: + if self.node1.type() in timestamp["valid_for"]: + logger.debug("Running test with timestamp %s", timestamp["description"]) + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]} + try: + self.check_published_message_reaches_peer(message) + except Exception as ex: + logger.error("Timestamp %s failed: %s", timestamp["description"], str(ex)) + failed_timestamps.append(timestamp) + assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" + + def test_publish_with_invalid_timestamps(self): + success_timestamps = [] + for timestamp in SAMPLE_TIMESTAMPS: + if self.node1.type() not in timestamp["valid_for"]: + logger.debug("Running test with timestamp %s", timestamp["description"]) + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]} + try: + self.check_published_message_reaches_peer(message) + success_timestamps.append(timestamp) + except Exception as e: + pass + assert not success_timestamps, f"Invalid Timestamps that didn't failed: {success_timestamps}" + + def test_publish_with_no_timestamp(self): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic} + self.check_published_message_reaches_peer(message) From b9ed32a5ab095c9671c8693b87a302647b28e22c Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 9 Nov 2023 18:15:44 +0200 Subject: [PATCH 04/18] try with more runners --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4125995326..ef01d69da4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -49,7 +49,7 @@ jobs: - run: pip install -r requirements.txt - name: Run tests - run: pytest -n 3 --reruns 1 --alluredir=allure-results + run: pytest -n auto --reruns 1 --alluredir=allure-results - name: Get allure history if: always() From 806d7941f600cf40223595e2fece06d5a913da89 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 9 Nov 2023 18:51:47 +0200 Subject: [PATCH 05/18] try with more runners2 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ef01d69da4..4fbffe1be7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -49,7 +49,7 @@ jobs: - run: pip install -r requirements.txt - name: Run tests - run: pytest -n auto --reruns 1 --alluredir=allure-results + run: pytest -n 5 --reruns 3 --alluredir=allure-results - name: Get allure history if: always() From d50b612ece068be3adbd744dd3bcd4cdb2349ede Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Fri, 10 Nov 2023 09:30:47 +0200 Subject: [PATCH 06/18] tweaks for parallel run --- .github/workflows/test.yml | 2 +- src/node/waku_node.py | 4 +++- src/steps/relay.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4fbffe1be7..78c72a80ad 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -49,7 +49,7 @@ jobs: - run: pip install -r requirements.txt - name: Run tests - run: pytest -n 5 --reruns 3 --alluredir=allure-results + run: pytest -n 4 --reruns 2 --alluredir=allure-results - name: Get allure history if: always() diff --git a/src/node/waku_node.py b/src/node/waku_node.py index c3c1d5ac5f..1e2519d360 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -1,5 +1,6 @@ import os import logging +from time import sleep from tenacity import retry, stop_after_delay, wait_fixed from src.node.api_clients.rpc import RPC from src.node.api_clients.rest import REST @@ -71,6 +72,7 @@ def start(self, **kwargs): "Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port ) DS.waku_nodes.append(self) + sleep(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly try: self.ensure_ready() except Exception as e: @@ -84,7 +86,7 @@ def stop(self): self._container.stop() logger.debug("Container stopped.") - @retry(stop=stop_after_delay(5), wait=wait_fixed(0.05), reraise=True) + @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) def ensure_ready(self): self.info() logger.debug("RPC service is ready.") diff --git a/src/steps/relay.py b/src/steps/relay.py index f284f963d7..4bfdc7ff73 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -27,7 +27,7 @@ def setup_nodes(self, request): self.node2.set_subscriptions([self.test_pubsub_topic]) @pytest.fixture(scope="function", autouse=True) - @retry(stop=stop_after_delay(40), wait=wait_fixed(1), reraise=True) + @retry(stop=stop_after_delay(120), wait=wait_fixed(1), reraise=True) def wait_for_network_to_warm_up(self): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} try: From 49e5f8b40711ba163f4e1147b67184cb618131d3 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Fri, 10 Nov 2023 16:08:27 +0200 Subject: [PATCH 07/18] new tests --- src/libs/common.py | 10 ++++- src/libs/custom_logger.py | 24 +++++++++++ src/node/api_clients/base_client.py | 4 +- src/node/api_clients/rest.py | 4 +- src/node/api_clients/rpc.py | 4 +- src/node/docker_mananger.py | 4 +- src/node/waku_node.py | 8 ++-- src/steps/relay.py | 47 ++++++++++++-------- src/test_data.py | 2 + tests/__init__.py | 4 -- tests/conftest.py | 12 ++++-- tests/relay/test_publish.py | 67 +++++++++++++++++++++++++++-- 12 files changed, 149 insertions(+), 41 deletions(-) create mode 100644 src/libs/custom_logger.py diff --git a/src/libs/common.py b/src/libs/common.py index 697e2881d3..67b37adf47 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -1,9 +1,10 @@ -import logging +from time import sleep +from src.libs.custom_logger import get_custom_logger import os import base64 import allure -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) def bytes_to_hex(byte_array): @@ -26,3 +27,8 @@ def to_base64(input_data): def attach_allure_file(file): logger.debug("Attaching file %s", file) allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT) + + +def delay(num_seconds): + logger.debug("Sleeping for %s seconds", num_seconds) + sleep(num_seconds) diff --git a/src/libs/custom_logger.py b/src/libs/custom_logger.py new file mode 100644 index 0000000000..989548c5bc --- /dev/null +++ b/src/libs/custom_logger.py @@ -0,0 +1,24 @@ +import logging + +max_log_line_length = 5000 + + +def log_length_filter(max_length): + class logLengthFilter(logging.Filter): + def filter(self, record): + if len(record.getMessage()) > max_length: + logging.getLogger(record.name).log( + record.levelno, f"Log line was discarded because it's longer than max_log_line_length={max_log_line_length}" + ) + return False + return True + + return logLengthFilter() + + +def get_custom_logger(name): + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("docker").setLevel(logging.WARNING) + logger = logging.getLogger(name) + logger.addFilter(log_length_filter(max_log_line_length)) + return logger diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 277c40ac41..3e994749c7 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -1,9 +1,9 @@ -import logging import requests from tenacity import retry, stop_after_delay, wait_fixed from abc import ABC, abstractmethod +from src.libs.custom_logger import get_custom_logger -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class BaseClient(ABC): diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index 68d43a81ad..ced0a6787d 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -1,10 +1,10 @@ -import logging +from src.libs.custom_logger import get_custom_logger import json from dataclasses import asdict from urllib.parse import quote from src.node.api_clients.base_client import BaseClient -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class REST(BaseClient): diff --git a/src/node/api_clients/rpc.py b/src/node/api_clients/rpc.py index 7d79daa2b7..522fc0e7db 100644 --- a/src/node/api_clients/rpc.py +++ b/src/node/api_clients/rpc.py @@ -1,9 +1,9 @@ -import logging +from src.libs.custom_logger import get_custom_logger import json from dataclasses import asdict from src.node.api_clients.base_client import BaseClient -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class RPC(BaseClient): diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index 4e76bf300b..00bd0cafb7 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -1,5 +1,5 @@ import os -import logging +from src.libs.custom_logger import get_custom_logger import random import threading import docker @@ -7,7 +7,7 @@ from docker.types import IPAMConfig, IPAMPool from docker.errors import NotFound -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class DockerManager: diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 1e2519d360..0c54ed3684 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -1,6 +1,6 @@ import os -import logging -from time import sleep +from src.libs.common import delay +from src.libs.custom_logger import get_custom_logger from tenacity import retry, stop_after_delay, wait_fixed from src.node.api_clients.rpc import RPC from src.node.api_clients.rest import REST @@ -8,7 +8,7 @@ from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL from src.data_storage import DS -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class WakuNode: @@ -72,7 +72,7 @@ def start(self, **kwargs): "Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port ) DS.waku_nodes.append(self) - sleep(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly + delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly try: self.ensure_ready() except Exception as e: diff --git a/src/steps/relay.py b/src/steps/relay.py index 4bfdc7ff73..4902dc43df 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -1,15 +1,15 @@ -import logging +from src.libs.custom_logger import get_custom_logger import math -from time import sleep, time +from time import time import pytest import allure -from src.libs.common import to_base64 +from src.libs.common import to_base64, delay from src.data_classes import message_rpc_response_schema from src.env_vars import NODE_1, NODE_2 from src.node.waku_node import WakuNode from tenacity import retry, stop_after_delay, wait_fixed -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class StepsRelay: @@ -23,6 +23,7 @@ def setup_nodes(self, request): self.test_pubsub_topic = "/waku/2/rs/18/1" self.test_content_topic = "/test/1/waku-relay/proto" self.test_payload = "Relay works!!" + self.test_message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} self.node1.set_subscriptions([self.test_pubsub_topic]) self.node2.set_subscriptions([self.test_pubsub_topic]) @@ -36,22 +37,34 @@ def wait_for_network_to_warm_up(self): raise Exception(f"WARM UP FAILED WITH: {ex}") @allure.step - def check_published_message_reaches_peer(self, message): - self.node1.send_message(message, self.test_pubsub_topic) - sleep(0.1) - get_messages_response = self.node2.get_messages(self.test_pubsub_topic) + def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1): + if not pubsub_topic: + pubsub_topic = self.test_pubsub_topic + self.node1.send_message(message, pubsub_topic) + + delay(message_propagation_delay) + get_messages_response = self.node2.get_messages(pubsub_topic) logger.debug("Got reponse from remote peer %s", get_messages_response) assert get_messages_response, "Peer node couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) + self.assert_received_message(message, received_message) + + def assert_received_message(self, sent_message, received_message): + def assert_fail_message(field_name): + return f"Incorrect {field_name}. Published {sent_message[field_name]} Received {getattr(received_message, field_name)}" + assert ( - received_message.payload == message["payload"] - ), f'Incorrect payload. Published {message["payload"]} Received {received_message.payload}' + received_message.payload == sent_message["payload"] + ), f'Incorrect payload. Published {sent_message["payload"]} Received {received_message.payload}' assert ( - received_message.contentTopic == message["contentTopic"] - ), f'Incorrect contentTopic. Published {message["contentTopic"]} Received {received_message.contentTopic}' - if "timestamp" in message and message["timestamp"]: - assert_fail_message = f'Incorrect timestamp. Published {message["timestamp"]} Received {received_message.timestamp}' - if isinstance(message["timestamp"], float): - assert math.isclose(float(received_message.timestamp), message["timestamp"], rel_tol=1e-9), assert_fail_message + received_message.contentTopic == sent_message["contentTopic"] + ), f'Incorrect contentTopic. Published {sent_message["contentTopic"]} Received {received_message.contentTopic}' + if "timestamp" in sent_message and sent_message["timestamp"]: + if isinstance(sent_message["timestamp"], float): + assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") else: - assert str(received_message.timestamp) == str(message["timestamp"]), assert_fail_message + assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp") + if "version" in sent_message and sent_message["version"]: + assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version") + if "meta" in sent_message and sent_message["meta"]: + assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta") diff --git a/src/test_data.py b/src/test_data.py index d4a4159d86..4b5f018c0c 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -47,6 +47,7 @@ ] INVALID_PAYLOADS = [ + {"description": "Empty string", "value": ""}, {"description": "Unecoded text", "value": "Hello World!"}, {"description": "A dictionary", "value": {"key": "YWFh"}}, {"description": "An integer", "value": 1234567890}, @@ -55,6 +56,7 @@ ] INVALID_CONTENT_TOPICS = [ + {"description": "Empty string", "value": ""}, {"description": "A dictionary", "value": {"key": "YWFh"}}, {"description": "An integer", "value": 1234567890}, {"description": "A list", "value": ["YWFh"]}, diff --git a/tests/__init__.py b/tests/__init__.py index d392987ae1..e69de29bb2 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,4 +0,0 @@ -import logging - -logging.getLogger("urllib3").setLevel(logging.WARNING) -logging.getLogger("docker").setLevel(logging.WARNING) diff --git a/tests/conftest.py b/tests/conftest.py index a1bb104655..9d60e7a08a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- import glob -import logging +from src.libs.custom_logger import get_custom_logger import os import pytest from datetime import datetime @@ -9,7 +9,7 @@ import src.env_vars as env_vars from src.data_storage import DS -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) # See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures @@ -43,6 +43,12 @@ def test_id(request): @pytest.fixture(scope="function", autouse=True) def test_setup(request, test_id): logger.debug("Running test: %s with id: %s", request.node.name, request.cls.test_id) + yield + for file in glob.glob(os.path.join(env_vars.LOG_DIR, "*" + request.cls.test_id + "*")): + try: + os.remove(file) + except Exception: + logger.debug("Could not remove file: %s", file) @pytest.fixture(scope="function", autouse=True) @@ -50,7 +56,7 @@ def attach_logs_on_fail(request): yield if hasattr(request.node, "rep_call") and request.node.rep_call.failed: logger.debug("Test failed, attempting to attach logs to the allure reports") - for file in glob.glob(os.path.join(env_vars.LOG_DIR, request.cls.test_id + "*")): + for file in glob.glob(os.path.join(env_vars.LOG_DIR, "*" + request.cls.test_id + "*")): attach_allure_file(file) diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 596037b7e1..5cbbeddb4d 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,10 +1,10 @@ -import logging +from src.libs.custom_logger import get_custom_logger from time import time from src.libs.common import to_base64 from src.steps.relay import StepsRelay from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS -logger = logging.getLogger(__name__) +logger = get_custom_logger(__name__) class TestRelayPublish(StepsRelay): @@ -45,7 +45,24 @@ def test_publish_with_missing_payload(self): else: raise Exception("Not implemented") - def test_publish_with_various_content_topics(self): + def test_publish_with_payload_less_than_one_mb(self): + payload_length = 1024 * 1023 + logger.debug("Running test with payload length of %s bytes", payload_length) + message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + self.check_published_message_reaches_peer(message, message_propagation_delay=2) + + def test_publish_with_payload_equal_or_more_than_one_mb(self): + payload_length = 1024 * 1023 + for payload_length in [1024 * 1024, 1024 * 1024 * 10]: + logger.debug("Running test with payload length of %s bytes", payload_length) + message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + try: + self.check_published_message_reaches_peer(message, message_propagation_delay=2) + raise AssertionError("Duplicate message was retrieved twice") + except Exception as ex: + assert "Peer node couldn't find any messages" in str(ex) + + def test_publish_with_valid_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: logger.debug("Running test with content topic %s", content_topic["description"]) @@ -82,6 +99,18 @@ def test_publish_with_missing_content_topic(self): else: raise Exception("Not implemented") + def test_publish_on_unsubscribed_pubsub_topic(self): + try: + self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1") + raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") + except Exception as ex: + if self.node1.is_nwaku(): + assert "Bad Request" in str(ex) + elif self.node1.is_gowaku(): + assert "Internal Server Error" in str(ex) + else: + raise Exception("Not implemented") + def test_publish_with_valid_timestamps(self): failed_timestamps = [] for timestamp in SAMPLE_TIMESTAMPS: @@ -111,3 +140,35 @@ def test_publish_with_invalid_timestamps(self): def test_publish_with_no_timestamp(self): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic} self.check_published_message_reaches_peer(message) + + def test_publish_with_valid_version(self): + self.test_message["version"] = 10 + self.check_published_message_reaches_peer(self.test_message) + + def test_publish_with_invalid_version(self): + self.test_message["version"] = 2.1 + try: + self.check_published_message_reaches_peer(self.test_message) + raise AssertionError("Publish with invalid version worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_publish_with_valid_meta(self): + self.test_message["meta"] = to_base64(self.test_payload) + self.check_published_message_reaches_peer(self.test_message) + + def test_publish_with_invalid_meta(self): + self.test_message["meta"] = self.test_payload + try: + self.check_published_message_reaches_peer(self.test_message) + raise AssertionError("Publish with invalid meta worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_publish_and_retrieve_duplicate_message(self): + self.check_published_message_reaches_peer(self.test_message) + try: + self.check_published_message_reaches_peer(self.test_message) + raise AssertionError("Duplicate message was retrieved twice") + except Exception as ex: + assert "Peer node couldn't find any messages" in str(ex) From aab1a6b63f2133f634f2358ef6ea3f7d5da7e6d6 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Fri, 10 Nov 2023 16:48:34 +0200 Subject: [PATCH 08/18] small tweaks --- src/env_vars.py | 3 ++- src/node/waku_node.py | 4 ++-- tests/conftest.py | 17 ++++++++++------- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index 396697e5af..0824fc6959 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -16,10 +16,11 @@ def get_env_var(var_name, default=None): # Configuration constants. Need to be upercase to appear in reports NODE_1 = get_env_var("NODE_1", "wakuorg/nwaku:latest") NODE_2 = get_env_var("NODE_2", "wakuorg/go-waku:latest") -LOG_DIR = get_env_var("LOG_DIR", "./log") +DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker") NETWORK_NAME = get_env_var("NETWORK_NAME", "waku") SUBNET = get_env_var("SUBNET", "172.18.0.0/16") IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24") GATEWAY = get_env_var("GATEWAY", "172.18.0.1") DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto") PROTOCOL = get_env_var("PROTOCOL", "REST") +RUNNING_IN_CI = get_env_var("CI") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 0c54ed3684..744fd55036 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -5,7 +5,7 @@ from src.node.api_clients.rpc import RPC from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL +from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL from src.data_storage import DS logger = get_custom_logger(__name__) @@ -14,7 +14,7 @@ class WakuNode: def __init__(self, docker_image, docker_log_prefix=""): self._image_name = docker_image - self._log_path = os.path.join(LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") + self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._docker_manager = DockerManager(self._image_name) self._container = None logger.debug("WakuNode instance initialized with log path %s", self._log_path) diff --git a/tests/conftest.py b/tests/conftest.py index 9d60e7a08a..6991416554 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import os import pytest from datetime import datetime +from time import time from uuid import uuid4 from src.libs.common import attach_allure_file import src.env_vars as env_vars @@ -44,19 +45,21 @@ def test_id(request): def test_setup(request, test_id): logger.debug("Running test: %s with id: %s", request.node.name, request.cls.test_id) yield - for file in glob.glob(os.path.join(env_vars.LOG_DIR, "*" + request.cls.test_id + "*")): - try: - os.remove(file) - except Exception: - logger.debug("Could not remove file: %s", file) + for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")): + if os.path.getmtime(file) < time() - 3600: + logger.debug(f"Deleting old log file: {file}") + try: + os.remove(file) + except: + logger.error("Could not delete file") @pytest.fixture(scope="function", autouse=True) def attach_logs_on_fail(request): yield - if hasattr(request.node, "rep_call") and request.node.rep_call.failed: + if env_vars.RUNNING_IN_CI and hasattr(request.node, "rep_call") and request.node.rep_call.failed: logger.debug("Test failed, attempting to attach logs to the allure reports") - for file in glob.glob(os.path.join(env_vars.LOG_DIR, "*" + request.cls.test_id + "*")): + for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*" + request.cls.test_id + "*")): attach_allure_file(file) From aee2128b7e7a6a9983f4f597e99a74611b56a497 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Mon, 13 Nov 2023 19:09:48 +0200 Subject: [PATCH 09/18] new tests --- src/node/waku_node.py | 2 +- src/steps/relay.py | 7 +++++- src/test_data.py | 4 ++-- tests/relay/test_publish.py | 45 ++++++++++++++++++++----------------- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 744fd55036..d3fa437a18 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -89,7 +89,7 @@ def stop(self): @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) def ensure_ready(self): self.info() - logger.debug("RPC service is ready.") + logger.info("%s service is ready !!", PROTOCOL) def info(self): return self._api.info() diff --git a/src/steps/relay.py b/src/steps/relay.py index 4902dc43df..f67f602ba0 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -33,6 +33,7 @@ def wait_for_network_to_warm_up(self): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} try: self.check_published_message_reaches_peer(message) + logger.info("WARM UP successful !!") except Exception as ex: raise Exception(f"WARM UP FAILED WITH: {ex}") @@ -51,7 +52,7 @@ def check_published_message_reaches_peer(self, message, pubsub_topic=None, messa def assert_received_message(self, sent_message, received_message): def assert_fail_message(field_name): - return f"Incorrect {field_name}. Published {sent_message[field_name]} Received {getattr(received_message, field_name)}" + return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}" assert ( received_message.payload == sent_message["payload"] @@ -68,3 +69,7 @@ def assert_fail_message(field_name): assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version") if "meta" in sent_message and sent_message["meta"]: assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta") + if "ephemeral" in sent_message and sent_message["ephemeral"]: + assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral") + if "rateLimitProof" in sent_message and sent_message["rateLimitProof"]: + assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") diff --git a/src/test_data.py b/src/test_data.py index 4b5f018c0c..9cda6ef745 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -76,9 +76,9 @@ {"description": "Positive number", "value": 1, "valid_for": ["nwaku", "gowaku"]}, {"description": "Negative number", "value": -1, "valid_for": ["nwaku", "gowaku"]}, {"description": "DST change", "value": int(datetime(2020, 3, 8, 2, 0, 0).timestamp() * 1e9), "valid_for": ["nwaku", "gowaku"]}, # DST starts - {"description": "Timestamp as string number", "value": str(int(time() * 1e9)), "valid_for": ["gowaku"]}, + {"description": "Timestamp as string number", "value": str(int(time() * 1e9)), "valid_for": []}, {"description": "Invalid large number", "value": 2**63, "valid_for": []}, - {"description": "Float number", "value": float(time() * 1e9), "valid_for": ["gowaku"]}, + {"description": "Float number", "value": float(time() * 1e9), "valid_for": []}, {"description": "Array instead of timestamp", "value": [int(time() * 1e9)], "valid_for": []}, {"description": "Object instead of timestamp", "value": {"time": int(time() * 1e9)}, "valid_for": []}, {"description": "ISO 8601 timestamp", "value": "2023-12-26T10:58:51", "valid_for": []}, diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 5cbbeddb4d..11c5555640 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -29,7 +29,7 @@ def test_publish_with_invalid_payloads(self): self.node1.send_message(message, self.test_pubsub_topic) success_payloads.append(payload) except Exception as ex: - assert "Bad Request" in str(ex) + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert not success_payloads, f"Invalid Payloads that didn't failed: {success_payloads}" def test_publish_with_missing_payload(self): @@ -38,12 +38,7 @@ def test_publish_with_missing_payload(self): self.node1.send_message(message, self.test_pubsub_topic) raise AssertionError("Publish with missing payload worked!!!") except Exception as ex: - if self.node1.is_nwaku(): - assert "Bad Request" in str(ex) - elif self.node1.is_gowaku(): - assert "Internal Server Error" in str(ex) - else: - raise Exception("Not implemented") + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_with_payload_less_than_one_mb(self): payload_length = 1024 * 1023 @@ -83,7 +78,7 @@ def test_publish_with_invalid_content_topics(self): self.node1.send_message(message, self.test_pubsub_topic) success_content_topics.append(content_topic) except Exception as ex: - assert "Bad Request" in str(ex) + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}" def test_publish_with_missing_content_topic(self): @@ -92,24 +87,14 @@ def test_publish_with_missing_content_topic(self): self.node1.send_message(message, self.test_pubsub_topic) raise AssertionError("Publish with missing content_topic worked!!!") except Exception as ex: - if self.node1.is_nwaku(): - assert "Bad Request" in str(ex) - elif self.node1.is_gowaku(): - assert "Internal Server Error" in str(ex) - else: - raise Exception("Not implemented") + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_on_unsubscribed_pubsub_topic(self): try: self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1") raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") except Exception as ex: - if self.node1.is_nwaku(): - assert "Bad Request" in str(ex) - elif self.node1.is_gowaku(): - assert "Internal Server Error" in str(ex) - else: - raise Exception("Not implemented") + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_with_valid_timestamps(self): failed_timestamps = [] @@ -165,6 +150,26 @@ def test_publish_with_invalid_meta(self): except Exception as ex: assert "Bad Request" in str(ex) + def test_publish_with_rate_limit_proof(self): + self.test_message["rateLimitProof"] = { + "proof": to_base64("proofData"), + "epoch": to_base64("epochData"), + "nullifier": to_base64("nullifierData"), + } + self.check_published_message_reaches_peer(self.test_message) + + def test_publish_with_ephemeral(self): + failed_ephemeral = [] + for ephemeral in [True, False]: + logger.debug("Running test with Ephemeral %s", ephemeral) + self.test_message["ephemeral"] = ephemeral + try: + self.check_published_message_reaches_peer(self.test_message) + except Exception as e: + logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e)) + failed_ephemeral.append(ephemeral) + assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + def test_publish_and_retrieve_duplicate_message(self): self.check_published_message_reaches_peer(self.test_message) try: From 1c1b147bc48eacab9875a6b751fec6d2b9a2f76b Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 14 Nov 2023 17:29:02 +0200 Subject: [PATCH 10/18] new tests --- src/env_vars.py | 4 +-- src/node/waku_node.py | 15 ++++++++++++ src/steps/relay.py | 14 ++++++++--- tests/relay/test_publish.py | 49 ++++++++++++++++++++++++++++++------- 4 files changed, 67 insertions(+), 15 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index 0824fc6959..abf923a983 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -14,8 +14,8 @@ def get_env_var(var_name, default=None): # Configuration constants. Need to be upercase to appear in reports -NODE_1 = get_env_var("NODE_1", "wakuorg/nwaku:latest") -NODE_2 = get_env_var("NODE_2", "wakuorg/go-waku:latest") +NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest") +NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest") DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker") NETWORK_NAME = get_env_var("NETWORK_NAME", "waku") SUBNET = get_env_var("SUBNET", "172.18.0.0/16") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index d3fa437a18..1867d01d76 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -86,6 +86,21 @@ def stop(self): self._container.stop() logger.debug("Container stopped.") + def restart(self): + if self._container: + logger.debug("Restarting container with id %s", self._container.short_id) + self._container.restart() + + def pause(self): + if self._container: + logger.debug("Pausing container with id %s", self._container.short_id) + self._container.pause() + + def unpause(self): + if self._container: + logger.debug("Unpause container with id %s", self._container.short_id) + self._container.unpause() + @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) def ensure_ready(self): self.info() diff --git a/src/steps/relay.py b/src/steps/relay.py index f67f602ba0..3b96db47cc 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -28,11 +28,9 @@ def setup_nodes(self, request): self.node2.set_subscriptions([self.test_pubsub_topic]) @pytest.fixture(scope="function", autouse=True) - @retry(stop=stop_after_delay(120), wait=wait_fixed(1), reraise=True) - def wait_for_network_to_warm_up(self): - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + def network_warm_up(self, setup_nodes): try: - self.check_published_message_reaches_peer(message) + self.wait_for_published_message_to_reach_peer(120) logger.info("WARM UP successful !!") except Exception as ex: raise Exception(f"WARM UP FAILED WITH: {ex}") @@ -73,3 +71,11 @@ def assert_fail_message(field_name): assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral") if "rateLimitProof" in sent_message and sent_message["rateLimitProof"]: assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") + + def wait_for_published_message_to_reach_peer(self, timeout_duration, time_between_retries=1): + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) + def check_peer_connection(): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + self.check_published_message_reaches_peer(message) + + check_peer_connection() diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 11c5555640..c286a73eda 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,8 +1,9 @@ from src.libs.custom_logger import get_custom_logger -from time import time +from time import sleep, time from src.libs.common import to_base64 from src.steps.relay import StepsRelay from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS +import pytest logger = get_custom_logger(__name__) @@ -150,14 +151,6 @@ def test_publish_with_invalid_meta(self): except Exception as ex: assert "Bad Request" in str(ex) - def test_publish_with_rate_limit_proof(self): - self.test_message["rateLimitProof"] = { - "proof": to_base64("proofData"), - "epoch": to_base64("epochData"), - "nullifier": to_base64("nullifierData"), - } - self.check_published_message_reaches_peer(self.test_message) - def test_publish_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: @@ -170,6 +163,18 @@ def test_publish_with_ephemeral(self): failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + def test_publish_with_rate_limit_proof(self): + self.test_message["rateLimitProof"] = { + "proof": to_base64("proofData"), + "epoch": to_base64("epochData"), + "nullifier": to_base64("nullifierData"), + } + self.check_published_message_reaches_peer(self.test_message) + + def test_publish_with_extra_field(self): + self.test_message["extraField"] = "extraValue" + self.check_published_message_reaches_peer(self.test_message) + def test_publish_and_retrieve_duplicate_message(self): self.check_published_message_reaches_peer(self.test_message) try: @@ -177,3 +182,29 @@ def test_publish_and_retrieve_duplicate_message(self): raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: assert "Peer node couldn't find any messages" in str(ex) + + def test_publish_after_node_pauses(self): + self.check_published_message_reaches_peer(self.test_message) + self.node1.pause() + self.node1.unpause() + self.test_message["payload"] = to_base64("new payload 1") + self.check_published_message_reaches_peer(self.test_message) + self.node2.pause() + self.node2.unpause() + self.test_message["payload"] = to_base64("new payload 2") + self.check_published_message_reaches_peer(self.test_message) + + @pytest.mark.skip("enrUri resets after node restart and node2 looses connection") + def test_publish_after_node1_restarts(self): + self.check_published_message_reaches_peer(self.test_message) + self.node1.restart() + self.node1.set_subscriptions([self.test_pubsub_topic]) + self.node2.set_subscriptions([self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer(20) + + def test_publish_after_node2_restarts(self): + self.check_published_message_reaches_peer(self.test_message) + self.node2.restart() + self.node1.set_subscriptions([self.test_pubsub_topic]) + self.node2.set_subscriptions([self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer(20) From feea4be91e32dead9abacc37f763ae4478f80a9d Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 14 Nov 2023 17:47:46 +0200 Subject: [PATCH 11/18] test remove defaults from CI --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 78c72a80ad..16d8b78adb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,8 +27,8 @@ on: env: FORCE_COLOR: "1" - NODE_1: ${{ inputs.node1 || 'wakuorg/nwaku:latest' }} - NODE_2: ${{ inputs.node2 || 'wakuorg/go-waku:latest' }} + NODE_1: ${{ inputs.node1 }} + NODE_2: ${{ inputs.node2 }} PROTOCOL: ${{ inputs.protocol || 'REST' }} jobs: From 9ab961af602a1569e7e4318f621936cf405cb779 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 14 Nov 2023 17:56:24 +0200 Subject: [PATCH 12/18] handle empty strings for env vars in CI --- src/env_vars.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index abf923a983..c1bf5a6490 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -6,10 +6,10 @@ def get_env_var(var_name, default=None): env_var = os.getenv(var_name, default) - if env_var is not None: - print(f"{var_name}: {env_var}") - else: + if env_var in [None, ""]: print(f"{var_name} is not set; using default value: {default}") + env_var = default + print(f"{var_name}: {env_var}") return env_var From 37c0cbb94bb9fcd5b78a409dc8d3d0082210465a Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 14 Nov 2023 18:05:20 +0200 Subject: [PATCH 13/18] add nodekey to main node --- src/env_vars.py | 1 + src/steps/relay.py | 4 ++-- tests/relay/test_publish.py | 4 +--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index c1bf5a6490..e1e99159c4 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -24,3 +24,4 @@ def get_env_var(var_name, default=None): DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto") PROTOCOL = get_env_var("PROTOCOL", "REST") RUNNING_IN_CI = get_env_var("CI") +NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") diff --git a/src/steps/relay.py b/src/steps/relay.py index 3b96db47cc..a21cec5e06 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -5,7 +5,7 @@ import allure from src.libs.common import to_base64, delay from src.data_classes import message_rpc_response_schema -from src.env_vars import NODE_1, NODE_2 +from src.env_vars import NODE_1, NODE_2, NODEKEY from src.node.waku_node import WakuNode from tenacity import retry, stop_after_delay, wait_fixed @@ -16,7 +16,7 @@ class StepsRelay: @pytest.fixture(scope="function", autouse=True) def setup_nodes(self, request): self.node1 = WakuNode(NODE_1, "node1_" + request.cls.test_id) - self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true") + self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) enr_uri = self.node1.info()["enrUri"] self.node2 = WakuNode(NODE_2, "node2_" + request.cls.test_id) self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=enr_uri, peer_exchange="true") diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index c286a73eda..3e0e0f1ead 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,9 +1,8 @@ from src.libs.custom_logger import get_custom_logger -from time import sleep, time +from time import time from src.libs.common import to_base64 from src.steps.relay import StepsRelay from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS -import pytest logger = get_custom_logger(__name__) @@ -194,7 +193,6 @@ def test_publish_after_node_pauses(self): self.test_message["payload"] = to_base64("new payload 2") self.check_published_message_reaches_peer(self.test_message) - @pytest.mark.skip("enrUri resets after node restart and node2 looses connection") def test_publish_after_node1_restarts(self): self.check_published_message_reaches_peer(self.test_message) self.node1.restart() From 5b803ba0c3400e0f1a0c3b8e02eebbef72a5cc1e Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Wed, 15 Nov 2023 16:58:24 +0200 Subject: [PATCH 14/18] add more tests --- src/node/api_clients/base_client.py | 6 +++--- src/node/waku_node.py | 1 + src/steps/relay.py | 8 ++------ src/test_data.py | 14 +++++++++++++ tests/relay/test_publish.py | 31 +++++++++++++++++++++++++++-- 5 files changed, 49 insertions(+), 11 deletions(-) diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 3e994749c7..7992cb03e7 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -18,13 +18,13 @@ def make_request(self, method, url, headers=None, data=None): try: response.raise_for_status() except requests.HTTPError as http_err: - logger.error("HTTP error occurred: %s", http_err) + logger.error("HTTP error occurred: %s. Response content: %s", http_err, response.content) raise except Exception as err: - logger.error("An error occurred: %s", err) + logger.error("An error occurred: %s. Response content: %s", err, response.content) raise else: - logger.info("Response status code: %s", response.status_code) + logger.info("Response status code: %s. Response content: %s", response.status_code, response.content) return response @abstractmethod diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 1867d01d76..c97c02c97d 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -44,6 +44,7 @@ def start(self, **kwargs): "rest-admin": "true", "websocket-support": "true", "log-level": "TRACE", + "rest-relay-cache-capacity": "100", "websocket-port": str(self._ports[3]), "rpc-port": self._rpc_port, "rest-port": self._rest_port, diff --git a/src/steps/relay.py b/src/steps/relay.py index a21cec5e06..37050e9ac3 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -52,12 +52,8 @@ def assert_received_message(self, sent_message, received_message): def assert_fail_message(field_name): return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}" - assert ( - received_message.payload == sent_message["payload"] - ), f'Incorrect payload. Published {sent_message["payload"]} Received {received_message.payload}' - assert ( - received_message.contentTopic == sent_message["contentTopic"] - ), f'Incorrect contentTopic. Published {sent_message["contentTopic"]} Received {received_message.contentTopic}' + assert received_message.payload == sent_message["payload"], assert_fail_message("payload") + assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") if "timestamp" in sent_message and sent_message["timestamp"]: if isinstance(sent_message["timestamp"], float): assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") diff --git a/src/test_data.py b/src/test_data.py index 9cda6ef745..c4c72846da 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -1,6 +1,8 @@ from time import time from datetime import datetime, timedelta +from src.env_vars import DEFAULT_PUBSUBTOPIC + NOW = datetime.now() SAMPLE_INPUTS = [ @@ -63,6 +65,18 @@ {"description": "A bool", "value": True}, ] +VALID_PUBSUB_TOPICS = [ + DEFAULT_PUBSUBTOPIC, + "/waku/2/rs/18/1", + "/test/2/rs/18/1", + "/waku/3/rs/18/1", + "/waku/2/test/18/1", + "/waku/2/rs/66/1", + "/waku/2/rs/18/50", + "/waku/18/50", + "test", +] + SAMPLE_TIMESTAMPS = [ {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]}, diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 3e0e0f1ead..c40012a4e5 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,8 +1,8 @@ from src.libs.custom_logger import get_custom_logger from time import time -from src.libs.common import to_base64 +from src.libs.common import delay, to_base64 from src.steps.relay import StepsRelay -from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS +from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS logger = get_custom_logger(__name__) @@ -89,6 +89,20 @@ def test_publish_with_missing_content_topic(self): except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + def test_publish_on_multiple_pubsub_topics(self): + self.node1.set_subscriptions(VALID_PUBSUB_TOPICS) + self.node2.set_subscriptions(VALID_PUBSUB_TOPICS) + failed_pubsub_topics = [] + for pubsub_topic in VALID_PUBSUB_TOPICS: + logger.debug("Running test with pubsub topic %s", pubsub_topic) + first_message = {"payload": to_base64("M1"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + try: + self.check_published_message_reaches_peer(first_message, pubsub_topic=pubsub_topic) + except Exception as e: + logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e)) + failed_pubsub_topics.append(pubsub_topic) + assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" + def test_publish_on_unsubscribed_pubsub_topic(self): try: self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1") @@ -206,3 +220,16 @@ def test_publish_after_node2_restarts(self): self.node1.set_subscriptions([self.test_pubsub_topic]) self.node2.set_subscriptions([self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer(20) + + def test_publish_and_retrieve_100_messages(self): + num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag + for index in range(num_messages): + message = {"payload": to_base64(f"M_{index}"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + self.node1.send_message(message, self.test_pubsub_topic) + delay(1) + messages = self.node2.get_messages(self.test_pubsub_topic) + assert len(messages) == num_messages + for index, message in enumerate(messages): + assert message["payload"] == to_base64( + f"M_{index}" + ), f'Incorrect payload at index: {index}. Published {to_base64(f"M_{index}")} Received {message["payload"]}' From ca94c2b44e766ea5a0757c6eaf492ce477baf7d4 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Wed, 15 Nov 2023 19:09:13 +0200 Subject: [PATCH 15/18] finishing touches --- src/env_vars.py | 2 +- src/node/waku_node.py | 10 ++--- src/steps/relay.py | 22 +++++++---- src/test_data.py | 4 +- tests/conftest.py | 2 +- tests/relay/test_publish.py | 78 ++++++++++++++++++------------------- 6 files changed, 60 insertions(+), 58 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index e1e99159c4..92803cd1b0 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -21,7 +21,7 @@ def get_env_var(var_name, default=None): SUBNET = get_env_var("SUBNET", "172.18.0.0/16") IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24") GATEWAY = get_env_var("GATEWAY", "172.18.0.1") -DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto") +DEFAULT_PUBSUB_TOPIC = get_env_var("DEFAULT_PUBSUB_TOPIC", "/waku/2/default-waku/proto") PROTOCOL = get_env_var("PROTOCOL", "REST") RUNNING_IN_CI = get_env_var("CI") NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index c97c02c97d..9fa4ba2849 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -5,7 +5,7 @@ from src.node.api_clients.rpc import RPC from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL +from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUB_TOPIC, PROTOCOL from src.data_storage import DS logger = get_custom_logger(__name__) @@ -53,7 +53,7 @@ def start(self, **kwargs): "rpc-address": "0.0.0.0", "rest-address": "0.0.0.0", "nat": f"extip:{self._ext_ip}", - "pubsub-topic": DEFAULT_PUBSUBTOPIC, + "pubsub-topic": DEFAULT_PUBSUB_TOPIC, } if "go-waku" in self._docker_manager.image: @@ -110,13 +110,13 @@ def ensure_ready(self): def info(self): return self._api.info() - def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUBTOPIC]): + def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUB_TOPIC]): return self._api.set_subscriptions(pubsub_topics) - def send_message(self, message, pubsub_topic=DEFAULT_PUBSUBTOPIC): + def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC): return self._api.send_message(message, pubsub_topic) - def get_messages(self, pubsub_topic=DEFAULT_PUBSUBTOPIC): + def get_messages(self, pubsub_topic=DEFAULT_PUBSUB_TOPIC): return self._api.get_messages(pubsub_topic) @property diff --git a/src/steps/relay.py b/src/steps/relay.py index 37050e9ac3..84174ce2e2 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -23,7 +23,6 @@ def setup_nodes(self, request): self.test_pubsub_topic = "/waku/2/rs/18/1" self.test_content_topic = "/test/1/waku-relay/proto" self.test_payload = "Relay works!!" - self.test_message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} self.node1.set_subscriptions([self.test_pubsub_topic]) self.node2.set_subscriptions([self.test_pubsub_topic]) @@ -40,10 +39,8 @@ def check_published_message_reaches_peer(self, message, pubsub_topic=None, messa if not pubsub_topic: pubsub_topic = self.test_pubsub_topic self.node1.send_message(message, pubsub_topic) - delay(message_propagation_delay) get_messages_response = self.node2.get_messages(pubsub_topic) - logger.debug("Got reponse from remote peer %s", get_messages_response) assert get_messages_response, "Peer node couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) self.assert_received_message(message, received_message) @@ -54,18 +51,18 @@ def assert_fail_message(field_name): assert received_message.payload == sent_message["payload"], assert_fail_message("payload") assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") - if "timestamp" in sent_message and sent_message["timestamp"]: + if "timestamp" in sent_message and sent_message["timestamp"] is not None: if isinstance(sent_message["timestamp"], float): assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") else: assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp") - if "version" in sent_message and sent_message["version"]: + if "version" in sent_message: assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version") - if "meta" in sent_message and sent_message["meta"]: + if "meta" in sent_message: assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta") - if "ephemeral" in sent_message and sent_message["ephemeral"]: + if "ephemeral" in sent_message: assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral") - if "rateLimitProof" in sent_message and sent_message["rateLimitProof"]: + if "rateLimitProof" in sent_message: assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") def wait_for_published_message_to_reach_peer(self, timeout_duration, time_between_retries=1): @@ -75,3 +72,12 @@ def check_peer_connection(): self.check_published_message_reaches_peer(message) check_peer_connection() + + def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list): + for node in node_list: + node.set_subscriptions(pubsub_topic_list) + + def create_message(self, **kwargs): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message.update(kwargs) + return message diff --git a/src/test_data.py b/src/test_data.py index c4c72846da..b6e746f4a4 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -1,7 +1,7 @@ from time import time from datetime import datetime, timedelta -from src.env_vars import DEFAULT_PUBSUBTOPIC +from src.env_vars import DEFAULT_PUBSUB_TOPIC NOW = datetime.now() @@ -66,7 +66,7 @@ ] VALID_PUBSUB_TOPICS = [ - DEFAULT_PUBSUBTOPIC, + DEFAULT_PUBSUB_TOPIC, "/waku/2/rs/18/1", "/test/2/rs/18/1", "/waku/3/rs/18/1", diff --git a/tests/conftest.py b/tests/conftest.py index 6991416554..6eedd099ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,7 +64,7 @@ def attach_logs_on_fail(request): @pytest.fixture(scope="function", autouse=True) -def close_open_nodes(): +def close_open_nodes(attach_logs_on_fail): DS.waku_nodes = [] yield crashed_containers = [] diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index c40012a4e5..ca5eec7040 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -12,7 +12,7 @@ def test_publish_with_valid_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: logger.debug("Running test with payload %s", payload["description"]) - message = {"payload": to_base64(payload["value"]), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64(payload["value"])) try: self.check_published_message_reaches_peer(message) except Exception as e: @@ -24,7 +24,7 @@ def test_publish_with_invalid_payloads(self): success_payloads = [] for payload in INVALID_PAYLOADS: logger.debug("Running test with payload %s", payload["description"]) - message = {"payload": payload["value"], "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=payload["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) success_payloads.append(payload) @@ -43,14 +43,14 @@ def test_publish_with_missing_payload(self): def test_publish_with_payload_less_than_one_mb(self): payload_length = 1024 * 1023 logger.debug("Running test with payload length of %s bytes", payload_length) - message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64("a" * (payload_length))) self.check_published_message_reaches_peer(message, message_propagation_delay=2) def test_publish_with_payload_equal_or_more_than_one_mb(self): payload_length = 1024 * 1023 for payload_length in [1024 * 1024, 1024 * 1024 * 10]: logger.debug("Running test with payload length of %s bytes", payload_length) - message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64("a" * (payload_length))) try: self.check_published_message_reaches_peer(message, message_propagation_delay=2) raise AssertionError("Duplicate message was retrieved twice") @@ -61,7 +61,7 @@ def test_publish_with_valid_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: logger.debug("Running test with content topic %s", content_topic["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)} + message = self.create_message(contentTopic=content_topic["value"]) try: self.check_published_message_reaches_peer(message) except Exception as e: @@ -73,7 +73,7 @@ def test_publish_with_invalid_content_topics(self): success_content_topics = [] for content_topic in INVALID_CONTENT_TOPICS: logger.debug("Running test with contetn topic %s", content_topic["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)} + message = self.create_message(contentTopic=content_topic["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) success_content_topics.append(content_topic) @@ -90,22 +90,27 @@ def test_publish_with_missing_content_topic(self): assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_on_multiple_pubsub_topics(self): - self.node1.set_subscriptions(VALID_PUBSUB_TOPICS) - self.node2.set_subscriptions(VALID_PUBSUB_TOPICS) + self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: logger.debug("Running test with pubsub topic %s", pubsub_topic) - first_message = {"payload": to_base64("M1"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} try: - self.check_published_message_reaches_peer(first_message, pubsub_topic=pubsub_topic) + self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic) except Exception as e: logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e)) failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" + def test_message_published_on_different_pubsub_topic_is_not_retrieved(self): + self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) + self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0]) + delay(0.1) + messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) + assert not messages, "Message was retrieved on wrong pubsub_topic" + def test_publish_on_unsubscribed_pubsub_topic(self): try: - self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1") + self.check_published_message_reaches_peer(self.create_message(), pubsub_topic="/waku/2/rs/19/1") raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -115,7 +120,7 @@ def test_publish_with_valid_timestamps(self): for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() in timestamp["valid_for"]: logger.debug("Running test with timestamp %s", timestamp["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]} + message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) except Exception as ex: @@ -128,7 +133,7 @@ def test_publish_with_invalid_timestamps(self): for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() not in timestamp["valid_for"]: logger.debug("Running test with timestamp %s", timestamp["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]} + message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) success_timestamps.append(timestamp) @@ -141,25 +146,21 @@ def test_publish_with_no_timestamp(self): self.check_published_message_reaches_peer(message) def test_publish_with_valid_version(self): - self.test_message["version"] = 10 - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(version=10)) def test_publish_with_invalid_version(self): - self.test_message["version"] = 2.1 try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(version=2.1)) raise AssertionError("Publish with invalid version worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_publish_with_valid_meta(self): - self.test_message["meta"] = to_base64(self.test_payload) - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(meta=to_base64(self.test_payload))) def test_publish_with_invalid_meta(self): - self.test_message["meta"] = self.test_payload try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(meta=self.test_payload)) raise AssertionError("Publish with invalid meta worked!!!") except Exception as ex: assert "Bad Request" in str(ex) @@ -168,63 +169,58 @@ def test_publish_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: logger.debug("Running test with Ephemeral %s", ephemeral) - self.test_message["ephemeral"] = ephemeral try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral)) except Exception as e: logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e)) failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" def test_publish_with_rate_limit_proof(self): - self.test_message["rateLimitProof"] = { + rate_limit_proof = { "proof": to_base64("proofData"), "epoch": to_base64("epochData"), "nullifier": to_base64("nullifierData"), } - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(rateLimitProof=rate_limit_proof)) def test_publish_with_extra_field(self): - self.test_message["extraField"] = "extraValue" - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(extraField="extraValue")) def test_publish_and_retrieve_duplicate_message(self): - self.check_published_message_reaches_peer(self.test_message) + message = self.create_message() + self.check_published_message_reaches_peer(message) try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(message) raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: assert "Peer node couldn't find any messages" in str(ex) def test_publish_after_node_pauses(self): - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message()) self.node1.pause() self.node1.unpause() - self.test_message["payload"] = to_base64("new payload 1") - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.node2.pause() self.node2.unpause() - self.test_message["payload"] = to_base64("new payload 2") - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) def test_publish_after_node1_restarts(self): - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message()) self.node1.restart() - self.node1.set_subscriptions([self.test_pubsub_topic]) - self.node2.set_subscriptions([self.test_pubsub_topic]) + self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer(20) def test_publish_after_node2_restarts(self): - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message()) self.node2.restart() - self.node1.set_subscriptions([self.test_pubsub_topic]) - self.node2.set_subscriptions([self.test_pubsub_topic]) + self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer(20) def test_publish_and_retrieve_100_messages(self): num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag for index in range(num_messages): - message = {"payload": to_base64(f"M_{index}"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64(f"M_{index}")) self.node1.send_message(message, self.test_pubsub_topic) delay(1) messages = self.node2.get_messages(self.test_pubsub_topic) From 2e58b2db975455bd34a33310afe6fe197da38d01 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 16 Nov 2023 16:43:44 +0200 Subject: [PATCH 16/18] fixes based on Alex suggestions --- src/env_vars.py | 1 + src/libs/common.py | 4 ++-- src/node/api_clients/base_client.py | 11 +++++----- src/node/docker_mananger.py | 20 ++++++++--------- src/node/waku_node.py | 24 ++++++++++---------- src/steps/relay.py | 10 ++++----- tests/conftest.py | 4 ++-- tests/relay/test_publish.py | 34 ++++++++++++++--------------- 8 files changed, 55 insertions(+), 53 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index 92803cd1b0..3cf33609a0 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -25,3 +25,4 @@ def get_env_var(var_name, default=None): PROTOCOL = get_env_var("PROTOCOL", "REST") RUNNING_IN_CI = get_env_var("CI") NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") +API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 10) diff --git a/src/libs/common.py b/src/libs/common.py index 67b37adf47..58db57b8d5 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -25,10 +25,10 @@ def to_base64(input_data): def attach_allure_file(file): - logger.debug("Attaching file %s", file) + logger.debug(f"Attaching file {file}") allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT) def delay(num_seconds): - logger.debug("Sleeping for %s seconds", num_seconds) + logger.debug(f"Sleeping for {num_seconds} seconds") sleep(num_seconds) diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 7992cb03e7..30725db3d5 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -1,6 +1,7 @@ import requests from tenacity import retry, stop_after_delay, wait_fixed from abc import ABC, abstractmethod +from src.env_vars import API_REQUEST_TIMEOUT from src.libs.custom_logger import get_custom_logger logger = get_custom_logger(__name__) @@ -13,18 +14,18 @@ class BaseClient(ABC): # ensures that such intermittent issues don't cause the tests to fail outright. @retry(stop=stop_after_delay(0.5), wait=wait_fixed(0.1), reraise=True) def make_request(self, method, url, headers=None, data=None): - logger.debug("%s call: %s with payload: %s", method.upper(), url, data) - response = requests.request(method.upper(), url, headers=headers, data=data) + logger.debug(f"{method.upper()} call: {url} with payload: {data}") + response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) try: response.raise_for_status() except requests.HTTPError as http_err: - logger.error("HTTP error occurred: %s. Response content: %s", http_err, response.content) + logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}") raise except Exception as err: - logger.error("An error occurred: %s. Response content: %s", err, response.content) + logger.error(f"An error occurred: {err}. Response content: {response.content}") raise else: - logger.info("Response status code: %s. Response content: %s", response.status_code, response.content) + logger.error(f"Response status code: {response.status_code}. Response content: {response.content}") return response @abstractmethod diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index 00bd0cafb7..152e04dfa8 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -14,13 +14,13 @@ class DockerManager: def __init__(self, image): self._image = image self._client = docker.from_env() - logger.debug("Docker client initialized with image %s", self._image) + logger.debug(f"Docker client initialized with image {self._image}") def create_network(self, network_name=NETWORK_NAME): - logger.debug("Attempting to create or retrieve network %s", network_name) + logger.debug(f"Attempting to create or retrieve network {network_name}") networks = self._client.networks.list(names=[network_name]) if networks: - logger.debug("Network %s already exists", network_name) + logger.debug(f"Network {network_name} already exists") return networks[0] network = self._client.networks.create( @@ -28,7 +28,7 @@ def create_network(self, network_name=NETWORK_NAME): driver="bridge", ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]), ) - logger.debug("Network %s created", network_name) + logger.debug(f"Network {network_name} created") return network def start_container(self, image_name, ports, args, log_path, container_ip): @@ -39,14 +39,14 @@ def start_container(self, image_name, ports, args, log_path, container_ip): else: cli_args.append(f"--{key}={value}") # Add a single command port_bindings = {f"{port}/tcp": ("", port) for port in ports} - logger.debug("Starting container with image %s", image_name) - logger.debug("Using args %s", cli_args) + logger.debug(f"Starting container with image {image_name}") + logger.debug(f"Using args {cli_args}") container = self._client.containers.run(image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True) network = self._client.networks.get(NETWORK_NAME) network.connect(container, ipv4_address=container_ip) - logger.debug("Container started with ID %s. Setting up logs at %s", container.short_id, log_path) + logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) log_thread.daemon = True log_thread.start() @@ -63,14 +63,14 @@ def generate_ports(self, base_port=None, count=5): if base_port is None: base_port = random.randint(1024, 65535 - count) ports = [base_port + i for i in range(count)] - logger.debug("Generated ports %s", ports) + logger.debug(f"Generated ports {ports}") return ports @staticmethod def generate_random_ext_ip(): base_ip_fragments = ["172", "18"] ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)]) - logger.debug("Generated random external IP %s", ext_ip) + logger.debug(f"Generated random external IP {ext_ip}") return ext_ip def is_container_running(self, container): @@ -78,7 +78,7 @@ def is_container_running(self, container): refreshed_container = self._client.containers.get(container.id) return refreshed_container.status == "running" except NotFound: - logger.error("Container with ID %s not found", container.id) + logger.error(f"Container with ID {container.id} not found") return False @property diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 9fa4ba2849..f701b14a7e 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -17,7 +17,7 @@ def __init__(self, docker_image, docker_log_prefix=""): self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._docker_manager = DockerManager(self._image_name) self._container = None - logger.debug("WakuNode instance initialized with log path %s", self._log_path) + logger.debug(f"WakuNode instance initialized with log path {self._log_path}") @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def start(self, **kwargs): @@ -70,47 +70,49 @@ def start(self, **kwargs): self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip) logger.debug( - "Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port + f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port}" ) DS.waku_nodes.append(self) delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly try: self.ensure_ready() - except Exception as e: - logger.error("%s service did not become ready in time: %s", PROTOCOL, e) + except Exception as ex: + logger.error(f"{PROTOCOL} service did not become ready in time: {ex}") raise @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): if self._container: - logger.debug("Stopping container with id %s", self._container.short_id) + logger.debug(f"Stopping container with id {self._container.short_id}") self._container.stop() logger.debug("Container stopped.") def restart(self): if self._container: - logger.debug("Restarting container with id %s", self._container.short_id) + logger.debug(f"Restarting container with id {self._container.short_id}") self._container.restart() def pause(self): if self._container: - logger.debug("Pausing container with id %s", self._container.short_id) + logger.debug(f"Pausing container with id {self._container.short_id}") self._container.pause() def unpause(self): if self._container: - logger.debug("Unpause container with id %s", self._container.short_id) + logger.debug(f"Unpause container with id {self._container.short_id}") self._container.unpause() @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) def ensure_ready(self): self.info() - logger.info("%s service is ready !!", PROTOCOL) + logger.info(f"{PROTOCOL} service is ready !!") def info(self): return self._api.info() - def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUB_TOPIC]): + def set_subscriptions(self, pubsub_topics=None): + if not pubsub_topics: + pubsub_topics = [DEFAULT_PUBSUB_TOPIC] return self._api.set_subscriptions(pubsub_topics) def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC): @@ -129,7 +131,7 @@ def type(self): elif self.is_gowaku(): return "gowaku" else: - raise Exception("Unknown node type!!!") + raise ValueError("Unknown node type!!!") def is_nwaku(self): return "nwaku" in self.image diff --git a/src/steps/relay.py b/src/steps/relay.py index 84174ce2e2..720232d243 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -32,15 +32,13 @@ def network_warm_up(self, setup_nodes): self.wait_for_published_message_to_reach_peer(120) logger.info("WARM UP successful !!") except Exception as ex: - raise Exception(f"WARM UP FAILED WITH: {ex}") + raise TimeoutError(f"WARM UP FAILED WITH: {ex}") @allure.step def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1): - if not pubsub_topic: - pubsub_topic = self.test_pubsub_topic - self.node1.send_message(message, pubsub_topic) + self.node1.send_message(message, pubsub_topic or self.test_pubsub_topic) delay(message_propagation_delay) - get_messages_response = self.node2.get_messages(pubsub_topic) + get_messages_response = self.node2.get_messages(pubsub_topic or self.test_pubsub_topic) assert get_messages_response, "Peer node couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) self.assert_received_message(message, received_message) @@ -51,7 +49,7 @@ def assert_fail_message(field_name): assert received_message.payload == sent_message["payload"], assert_fail_message("payload") assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") - if "timestamp" in sent_message and sent_message["timestamp"] is not None: + if sent_message.get("timestamp") is not None: if isinstance(sent_message["timestamp"], float): assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") else: diff --git a/tests/conftest.py b/tests/conftest.py index 6eedd099ed..b412a29713 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,7 +43,7 @@ def test_id(request): @pytest.fixture(scope="function", autouse=True) def test_setup(request, test_id): - logger.debug("Running test: %s with id: %s", request.node.name, request.cls.test_id) + logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}") yield for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")): if os.path.getmtime(file) < time() - 3600: @@ -74,5 +74,5 @@ def close_open_nodes(attach_logs_on_fail): except Exception as ex: if "No such container" in str(ex): crashed_containers.append(node.image) - logger.error("Failed to stop container because of error %s", ex) + logger.error(f"Failed to stop container because of error {ex}") assert not crashed_containers, f"Containers {crashed_containers} crashed during the test!!!" diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index ca5eec7040..eaed41c56f 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -11,19 +11,19 @@ class TestRelayPublish(StepsRelay): def test_publish_with_valid_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: - logger.debug("Running test with payload %s", payload["description"]) + logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=to_base64(payload["value"])) try: self.check_published_message_reaches_peer(message) except Exception as e: - logger.error("Payload %s failed: %s", payload["description"], str(e)) + logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" def test_publish_with_invalid_payloads(self): success_payloads = [] for payload in INVALID_PAYLOADS: - logger.debug("Running test with payload %s", payload["description"]) + logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=payload["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) @@ -42,14 +42,13 @@ def test_publish_with_missing_payload(self): def test_publish_with_payload_less_than_one_mb(self): payload_length = 1024 * 1023 - logger.debug("Running test with payload length of %s bytes", payload_length) + logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) self.check_published_message_reaches_peer(message, message_propagation_delay=2) def test_publish_with_payload_equal_or_more_than_one_mb(self): - payload_length = 1024 * 1023 for payload_length in [1024 * 1024, 1024 * 1024 * 10]: - logger.debug("Running test with payload length of %s bytes", payload_length) + logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) try: self.check_published_message_reaches_peer(message, message_propagation_delay=2) @@ -60,19 +59,19 @@ def test_publish_with_payload_equal_or_more_than_one_mb(self): def test_publish_with_valid_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: - logger.debug("Running test with content topic %s", content_topic["description"]) + logger.debug(f'Running test with content topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: self.check_published_message_reaches_peer(message) except Exception as e: - logger.error("ContentTopic %s failed: %s", content_topic["description"], str(e)) + logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" def test_publish_with_invalid_content_topics(self): success_content_topics = [] for content_topic in INVALID_CONTENT_TOPICS: - logger.debug("Running test with contetn topic %s", content_topic["description"]) + logger.debug(f'Running test with contetn topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) @@ -93,11 +92,11 @@ def test_publish_on_multiple_pubsub_topics(self): self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: - logger.debug("Running test with pubsub topic %s", pubsub_topic) + logger.debug(f"Running test with pubsub topic {pubsub_topic}") try: self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic) except Exception as e: - logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e)) + logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" @@ -119,12 +118,12 @@ def test_publish_with_valid_timestamps(self): failed_timestamps = [] for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() in timestamp["valid_for"]: - logger.debug("Running test with timestamp %s", timestamp["description"]) + logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) except Exception as ex: - logger.error("Timestamp %s failed: %s", timestamp["description"], str(ex)) + logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') failed_timestamps.append(timestamp) assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" @@ -132,7 +131,7 @@ def test_publish_with_invalid_timestamps(self): success_timestamps = [] for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() not in timestamp["valid_for"]: - logger.debug("Running test with timestamp %s", timestamp["description"]) + logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) @@ -168,11 +167,11 @@ def test_publish_with_invalid_meta(self): def test_publish_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: - logger.debug("Running test with Ephemeral %s", ephemeral) + logger.debug(f"Running test with Ephemeral {ephemeral}") try: self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral)) except Exception as e: - logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e)) + logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}") failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" @@ -196,12 +195,13 @@ def test_publish_and_retrieve_duplicate_message(self): except Exception as ex: assert "Peer node couldn't find any messages" in str(ex) - def test_publish_after_node_pauses(self): + def test_publish_after_node_pauses_and_pauses(self): self.check_published_message_reaches_peer(self.create_message()) self.node1.pause() self.node1.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.node2.pause() + self.check_published_message_reaches_peer(self.create_message()) self.node2.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) From 57822dc86b3ecff2db69b53f5f95340641b98f54 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 16 Nov 2023 16:49:05 +0200 Subject: [PATCH 17/18] revert unwanted change --- tests/relay/test_publish.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index eaed41c56f..34e8aaf18c 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -201,7 +201,6 @@ def test_publish_after_node_pauses_and_pauses(self): self.node1.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.node2.pause() - self.check_published_message_reaches_peer(self.create_message()) self.node2.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) From e836666b25a32e8a67d4d0702117b395a05d3bcf Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 16 Nov 2023 17:27:43 +0200 Subject: [PATCH 18/18] add new pause test --- src/node/api_clients/base_client.py | 2 +- tests/relay/test_publish.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 30725db3d5..be1c291fc7 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -25,7 +25,7 @@ def make_request(self, method, url, headers=None, data=None): logger.error(f"An error occurred: {err}. Response content: {response.content}") raise else: - logger.error(f"Response status code: {response.status_code}. Response content: {response.content}") + logger.info(f"Response status code: {response.status_code}. Response content: {response.content}") return response @abstractmethod diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 34e8aaf18c..3d49d19933 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -3,6 +3,7 @@ from src.libs.common import delay, to_base64 from src.steps.relay import StepsRelay from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS +from src.data_classes import message_rpc_response_schema logger = get_custom_logger(__name__) @@ -195,6 +196,16 @@ def test_publish_and_retrieve_duplicate_message(self): except Exception as ex: assert "Peer node couldn't find any messages" in str(ex) + def test_publish_while_peer_is_paused(self): + message = self.create_message() + self.node2.pause() + self.node1.send_message(message, self.test_pubsub_topic) + self.node2.unpause() + get_messages_response = self.node2.get_messages(self.test_pubsub_topic) + assert get_messages_response, "Peer node couldn't find any messages" + received_message = message_rpc_response_schema.load(get_messages_response[0]) + self.assert_received_message(message, received_message) + def test_publish_after_node_pauses_and_pauses(self): self.check_published_message_reaches_peer(self.create_message()) self.node1.pause()