diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cb93d03157..d6a9c08e7d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,11 +16,20 @@ on: required: false type: string default: "wakuorg/go-waku:latest" + protocol: + description: "Protocol used to comunicate inside the network" + required: true + type: choice + default: "REST" + options: + - "REST" + - "RPC" env: FORCE_COLOR: "1" NODE_1: ${{ inputs.node1 || 'wakuorg/nwaku:deploy-wakuv2-test' }} NODE_2: ${{ inputs.node2 || 'wakuorg/go-waku:latest' }} + PROTOCOL: ${{ inputs.protocol || 'REST' }} jobs: diff --git a/README.md b/README.md index 016958b59c..eaf4d05ae9 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # waku-interop-tests -Waku interop testing between various implementation of the [Waku v2 protocol](https://rfc.vac.dev/spec/10/). +Waku e2e and interop framework used to test various implementation of the [Waku v2 protocol](https://rfc.vac.dev/spec/10/). -## Setup +## Setup and contribute ```shell git clone git@github.com:waku-org/waku-interop-tests.git @@ -10,9 +10,16 @@ cd waku-interop-tests python -m venv .venv source .venv/bin/activate pip install -r requirements.txt +pre-commit install +(optional) Overwrite default vars from src/env_vars.py via cli env vars or by adding a .env file pytest ``` +## CI + +- Test runs via github actions +- [Allure Test Reports](https://waku-org.github.io/waku-interop-tests/3/) are published via github pages + ## License Licensed and distributed under either of diff --git a/pytest.ini b/pytest.ini index 3412b31451..313afa0aa3 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,5 @@ [pytest] -addopts = --instafail --tb=short --color=auto +addopts = -s --instafail --tb=short --color=auto log_level = DEBUG log_cli = True log_file = log/test.log diff --git a/requirements.txt b/requirements.txt index 5800bd801b..38e3913adb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,13 @@ allure-pytest black docker +marshmallow-dataclass pre-commit pyright pytest pytest-instafail pytest-xdist pytest-rerunfailures +python-dotenv requests tenacity diff --git a/src/data_classes.py b/src/data_classes.py index 0dcff31387..4266a55ba9 100644 --- a/src/data_classes.py +++ b/src/data_classes.py @@ -1,24 +1,24 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field +from marshmallow_dataclass import class_schema from typing import Optional -@dataclass -class KeyPair: - privateKey: str - publicKey: str - - @dataclass class MessageRpcQuery: - payload: str # Hex encoded data string without `0x` prefix. - contentTopic: Optional[str] = None - timestamp: Optional[int] = None # Unix epoch time in nanoseconds as a 64-bit integer value. + payload: str + contentTopic: str + timestamp: Optional[int] = None @dataclass class MessageRpcResponse: payload: str - contentTopic: Optional[str] = None - version: Optional[int] = None - timestamp: Optional[int] = None # Unix epoch time in nanoseconds as a 64-bit integer value. - ephemeral: Optional[bool] = None + contentTopic: str + version: Optional[int] + timestamp: int + ephemeral: Optional[bool] + rateLimitProof: Optional[dict] = field(default_factory=dict) + rate_limit_proof: Optional[dict] = field(default_factory=dict) + + +message_rpc_response_schema = class_schema(MessageRpcResponse)() diff --git a/src/env_vars.py b/src/env_vars.py index e226705227..396697e5af 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -1,17 +1,20 @@ import os -import logging +from dotenv import load_dotenv -logger = logging.getLogger(__name__) +load_dotenv() # This will load environment variables from a .env file if it exists -def get_env_var(var_name, default): +def get_env_var(var_name, default=None): env_var = os.getenv(var_name, default) - logger.debug(f"{var_name}: {env_var}") + if env_var is not None: + print(f"{var_name}: {env_var}") + else: + print(f"{var_name} is not set; using default value: {default}") return env_var # Configuration constants. Need to be upercase to appear in reports -NODE_1 = get_env_var("NODE_1", "wakuorg/nwaku:deploy-wakuv2-test") +NODE_1 = get_env_var("NODE_1", "wakuorg/nwaku:latest") NODE_2 = get_env_var("NODE_2", "wakuorg/go-waku:latest") LOG_DIR = get_env_var("LOG_DIR", "./log") NETWORK_NAME = get_env_var("NETWORK_NAME", "waku") @@ -19,3 +22,4 @@ def get_env_var(var_name, default): IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24") GATEWAY = get_env_var("GATEWAY", "172.18.0.1") DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto") +PROTOCOL = get_env_var("PROTOCOL", "REST") diff --git a/src/node/api_clients/__init__.py b/src/node/api_clients/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py new file mode 100644 index 0000000000..6be538f815 --- /dev/null +++ b/src/node/api_clients/base_client.py @@ -0,0 +1,44 @@ +import logging +import requests +from tenacity import retry, stop_after_delay, wait_fixed +from abc import ABC, abstractmethod + +logger = logging.getLogger(__name__) + + +class BaseClient(ABC): + # The retry decorator is applied to handle transient errors gracefully. This is particularly + # useful when running tests in parallel, where occasional network-related errors such as + # connection drops, timeouts, or temporary unavailability of a service can occur. Retrying + # ensures that such intermittent issues don't cause the tests to fail outright. + @retry(stop=stop_after_delay(2), wait=wait_fixed(0.1), reraise=True) + def make_request(self, method, url, headers=None, data=None): + logger.debug("%s call: %s with payload: %s", method.upper(), url, data) + response = requests.request(method.upper(), url, headers=headers, data=data) + try: + response.raise_for_status() + except requests.HTTPError as http_err: + logger.error("HTTP error occurred: %s", http_err) + raise + except Exception as err: + logger.error("An error occurred: %s", err) + raise + else: + logger.info("Response status code: %s", response.status_code) + return response + + @abstractmethod + def info(self): + pass + + @abstractmethod + def set_subscriptions(self, pubsub_topics): + pass + + @abstractmethod + def send_message(self, message, pubsub_topic): + pass + + @abstractmethod + def get_messages(self, pubsub_topic): + pass diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py new file mode 100644 index 0000000000..47ecbf79c9 --- /dev/null +++ b/src/node/api_clients/rest.py @@ -0,0 +1,31 @@ +import logging +import json +from dataclasses import asdict +from urllib.parse import quote +from src.node.api_clients.base_client import BaseClient + +logger = logging.getLogger(__name__) + + +class REST(BaseClient): + def __init__(self, rest_port): + self._rest_port = rest_port + + def rest_call(self, method, endpoint, payload=None): + url = f"http://127.0.0.1:{self._rest_port}/{endpoint}" + headers = {"Content-Type": "application/json"} + return self.make_request(method, url, headers=headers, data=payload) + + def info(self): + info_response = self.rest_call("get", "debug/v1/info") + return info_response.json() + + def set_subscriptions(self, pubsub_topics): + return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) + + def send_message(self, message, pubsub_topic): + return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(asdict(message))) + + def get_messages(self, pubsub_topic): + get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}") + return get_messages_response.json() diff --git a/src/node/api_clients/rpc.py b/src/node/api_clients/rpc.py new file mode 100644 index 0000000000..2ed32c5e94 --- /dev/null +++ b/src/node/api_clients/rpc.py @@ -0,0 +1,35 @@ +import logging +import json +from dataclasses import asdict +from src.node.api_clients.base_client import BaseClient + +logger = logging.getLogger(__name__) + + +class RPC(BaseClient): + def __init__(self, rpc_port, image_name): + self._image_name = image_name + self._rpc_port = rpc_port + + def rpc_call(self, endpoint, params=[]): + url = f"http://127.0.0.1:{self._rpc_port}" + headers = {"Content-Type": "application/json"} + payload = {"jsonrpc": "2.0", "method": endpoint, "params": params, "id": 1} + return self.make_request("post", url, headers=headers, data=json.dumps(payload)) + + def info(self): + info_response = self.rpc_call("get_waku_v2_debug_v1_info", []) + return info_response.json()["result"] + + def set_subscriptions(self, pubsub_topics): + if "nwaku" in self._image_name: + return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics]) + else: + return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics]) + + def send_message(self, message, pubsub_topic): + return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, asdict(message)]) + + def get_messages(self, pubsub_topic): + get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic]) + return get_messages_response.json()["result"] diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index d0bc36477c..4e76bf300b 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -14,13 +14,13 @@ class DockerManager: def __init__(self, image): self._image = image self._client = docker.from_env() - logger.debug(f"Docker client initialized with image {self._image}") + logger.debug("Docker client initialized with image %s", self._image) def create_network(self, network_name=NETWORK_NAME): - logger.debug(f"Attempting to create or retrieve network '{network_name}'.") + logger.debug("Attempting to create or retrieve network %s", network_name) networks = self._client.networks.list(names=[network_name]) if networks: - logger.debug(f"Network '{network_name}' already exists.") + logger.debug("Network %s already exists", network_name) return networks[0] network = self._client.networks.create( @@ -28,20 +28,25 @@ def create_network(self, network_name=NETWORK_NAME): driver="bridge", ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]), ) - logger.debug(f"Network '{network_name}' created.") + logger.debug("Network %s created", network_name) return network def start_container(self, image_name, ports, args, log_path, container_ip): - command = [f"--{key}={value}" for key, value in args.items()] + cli_args = [] + for key, value in args.items(): + if isinstance(value, list): # Check if value is a list + cli_args.extend([f"--{key}={item}" for item in value]) # Add a command for each item in the list + else: + cli_args.append(f"--{key}={value}") # Add a single command port_bindings = {f"{port}/tcp": ("", port) for port in ports} - logger.debug(f"Starting container with image '{image_name}'.") - - container = self._client.containers.run(image_name, command=command, ports=port_bindings, detach=True, auto_remove=True) + logger.debug("Starting container with image %s", image_name) + logger.debug("Using args %s", cli_args) + container = self._client.containers.run(image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True) network = self._client.networks.get(NETWORK_NAME) network.connect(container, ipv4_address=container_ip) - logger.debug(f"Container started with ID {container.short_id}. Setting up logs at '{log_path}'.") + logger.debug("Container started with ID %s. Setting up logs at %s", container.short_id, log_path) log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) log_thread.daemon = True log_thread.start() @@ -54,18 +59,18 @@ def _log_container_output(self, container, log_path): for chunk in container.logs(stream=True): log_file.write(chunk) - def generate_ports(self, base_port=None, count=4): + def generate_ports(self, base_port=None, count=5): if base_port is None: base_port = random.randint(1024, 65535 - count) ports = [base_port + i for i in range(count)] - logger.debug(f"Generated ports: {ports}") + logger.debug("Generated ports %s", ports) return ports @staticmethod def generate_random_ext_ip(): base_ip_fragments = ["172", "18"] ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)]) - logger.debug(f"Generated random external IP: {ext_ip}") + logger.debug("Generated random external IP %s", ext_ip) return ext_ip def is_container_running(self, container): @@ -73,7 +78,7 @@ def is_container_running(self, container): refreshed_container = self._client.containers.get(container.id) return refreshed_container.status == "running" except NotFound: - logger.error(f"Container with ID {container.id} not found.") + logger.error("Container with ID %s not found", container.id) return False @property diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 7e5391bacd..13fb3dd837 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -1,15 +1,11 @@ import os import logging -from time import time -import requests -import json from tenacity import retry, stop_after_delay, wait_fixed -from dataclasses import asdict +from src.node.api_clients.rpc import RPC +from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC +from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL from src.data_storage import DS -from src.libs.common import bytes_to_hex - logger = logging.getLogger(__name__) @@ -21,29 +17,40 @@ def __init__(self, docker_image, docker_log_prefix=""): self._docker_manager = DockerManager(self._image_name) self._container = None self._ext_ip = self._docker_manager.generate_random_ext_ip() - logger.debug(f"WakuNode instance initialized with log path: {self._log_path}") + self._ports = self._docker_manager.generate_ports() + self._rest_port = self._ports[0] + self._rpc_port = self._ports[1] + self._websocket_port = self._ports[2] + logger.debug("WakuNode instance initialized with log path %s", self._log_path) + if PROTOCOL == "RPC": + self._api = RPC(self._rpc_port, self._image_name) + elif PROTOCOL == "REST": + self._api = REST(self._rest_port) + else: + raise ValueError(f"Unknown protocol: {PROTOCOL}") @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def start(self, **kwargs): logger.debug("Starting Node...") self._docker_manager.create_network() - ports = self._docker_manager.generate_ports() - self._rpc_port = ports[0] - self._websocket_port = ports[2] default_args = { "listen-address": "0.0.0.0", "rpc": "true", "rpc-admin": "true", + "rest": "true", + "rest-admin": "true", "websocket-support": "true", "log-level": "TRACE", - "websocket-port": str(ports[2]), - "rpc-port": str(ports[0]), - "tcp-port": str(ports[1]), - "discv5-udp-port": str(ports[3]), + "websocket-port": str(self._ports[3]), + "rpc-port": self._rpc_port, + "rest-port": self._rest_port, + "tcp-port": str(self._ports[2]), + "discv5-udp-port": str(self._ports[4]), "rpc-address": "0.0.0.0", - "topic": DEFAULT_PUBSUBTOPIC, + "rest-address": "0.0.0.0", "nat": f"extip:{self._ext_ip}", + "pubsub-topic": DEFAULT_PUBSUBTOPIC, } if "go-waku" in self._docker_manager.image: @@ -58,14 +65,15 @@ def start(self, **kwargs): key = key.replace("_", "-") default_args[key] = value - logger.debug(f"Starting container with args: {default_args}") - self._container = self._docker_manager.start_container(self._docker_manager.image, ports, default_args, self._log_path, self._ext_ip) - logger.debug(f"Started container from image {self._image_name}. RPC port: {self._rpc_port} and WebSocket port: {self._websocket_port}") + self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip) + logger.debug( + "Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port + ) DS.waku_nodes.append(self) try: - self.ensure_rpc_ready() + self.ensure_ready() except Exception as e: - logger.error(f"RPC service did not become ready in time: {e}") + logger.error("%s service did not become ready in time: %s", PROTOCOL, e) raise @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @@ -75,82 +83,19 @@ def stop(self): self._container.stop() logger.debug("Container stopped.") - @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) - def rpc_call(self, method, params=[]): - url = f"http://127.0.0.1:{self._rpc_port}" - headers = {"Content-Type": "application/json"} - payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1} - logger.debug("RPC call payload %s", payload) - response = requests.post(url, data=json.dumps(payload), headers=headers) - return response.json() - @retry(stop=stop_after_delay(5), wait=wait_fixed(0.05), reraise=True) - def ensure_rpc_ready(self): + def ensure_ready(self): self.info() logger.debug("RPC service is ready.") def info(self): - return self.rpc_call("get_waku_v2_debug_v1_info", []) + return self._api.info() def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUBTOPIC]): - return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics]) + return self._api.set_subscriptions(pubsub_topics) def send_message(self, message, pubsub_topic=DEFAULT_PUBSUBTOPIC): - if message.timestamp is None: - message.timestamp = int(time() * 1e9) - return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, asdict(message)]) + return self._api.send_message(message, pubsub_topic) def get_messages(self, pubsub_topic=DEFAULT_PUBSUBTOPIC): - return self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic]) - - def get_asymmetric_key_pair(self): - response = self.rpc_call("get_waku_v2_private_v1_asymmetric_keypair", []) - seckey = response.get("seckey") - pubkey = response.get("pubkey") - private_key = response.get("privateKey") - public_key = response.get("publicKey") - if seckey: - return {"privateKey": seckey, "publicKey": pubkey} - else: - return {"privateKey": private_key, "publicKey": public_key} - - def post_asymmetric_message(self, message, public_key, pubsub_topic=None): - if not message.payload: - raise Exception("Attempting to send an empty message") - return self.rpc_call( - "post_waku_v2_private_v1_asymmetric_message", - [pubsub_topic or DEFAULT_PUBSUBTOPIC, message, "0x" + bytes_to_hex(public_key)], - ) - - def get_asymmetric_messages(self, private_key, pubsub_topic=None): - return self.rpc_call( - "get_waku_v2_private_v1_asymmetric_messages", - [pubsub_topic or DEFAULT_PUBSUBTOPIC, "0x" + bytes_to_hex(private_key)], - ) - - def get_symmetric_key(self): - return bytes.fromhex(self.rpc_call("get_waku_v2_private_v1_symmetric_key", [])) - - def post_symmetric_message(self, message, sym_key, pubsub_topic=None): - if not message.payload: - raise Exception("Attempting to send an empty message") - return self.rpc_call( - "post_waku_v2_private_v1_symmetric_message", - [pubsub_topic or DEFAULT_PUBSUBTOPIC, message, "0x" + bytes_to_hex(sym_key)], - ) - - def get_symmetric_messages(self, sym_key, pubsub_topic=None): - return self.rpc_call( - "get_waku_v2_private_v1_symmetric_messages", - [pubsub_topic or DEFAULT_PUBSUBTOPIC, "0x" + bytes_to_hex(sym_key)], - ) - - def get_peer_id(self): - # not implemented - peer_id = "" - return peer_id - - def get_multiaddr_with_id(self): - peer_id = self.get_peer_id() - multiaddr_with_id = f"/ip4/127.0.0.1/tcp/{self._websocket_port}/ws/p2p/{peer_id}" - return multiaddr_with_id + return self._api.get_messages(pubsub_topic) diff --git a/src/steps/relay.py b/src/steps/relay.py index fbb0150cd1..6ba381c596 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -1,7 +1,8 @@ import logging -from time import sleep +from time import sleep, time import pytest import allure +from src.data_classes import message_rpc_response_schema from src.env_vars import NODE_1, NODE_2 from src.node.waku_node import WakuNode from tenacity import retry, stop_after_delay, wait_fixed @@ -14,21 +15,24 @@ class StepsRelay: def setup_nodes(self, request): self.node1 = WakuNode(NODE_1, request.cls.test_id) self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true") - enr_uri = self.node1.info()["result"]["enrUri"] + enr_uri = self.node1.info()["enrUri"] self.node2 = WakuNode(NODE_2, request.cls.test_id) self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=enr_uri, peer_exchange="true") - self.node1.set_subscriptions() - self.node2.set_subscriptions() - self.default_content_topic = "/test/1/waku-relay" - self.default_payload = "Relay works!!" + self.test_pubsub_topic = "test" + self.test_content_topic = "/test/1/waku-relay" + self.test_payload = "Relay works!!" + self.node1.set_subscriptions([self.test_pubsub_topic]) + self.node2.set_subscriptions([self.test_pubsub_topic]) @allure.step - @retry(stop=stop_after_delay(2), wait=wait_fixed(0.2), reraise=True) + @retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True) def check_published_message_reaches_peer(self, message): - self.node1.send_message(message) + message.timestamp = int(time() * 1e9) + self.node1.send_message(message, self.test_pubsub_topic) sleep(0.1) - get_messages_response = self.node2.get_messages() + get_messages_response = self.node2.get_messages(self.test_pubsub_topic) logger.debug("Got reponse from remote peer %s", get_messages_response) - assert get_messages_response["result"][0]["payload"] == message.payload - assert get_messages_response["result"][0]["contentTopic"] == message.contentTopic - assert get_messages_response["result"][0]["timestamp"] == message.timestamp + received_message = message_rpc_response_schema.load(get_messages_response[0]) + assert received_message.payload == message.payload + assert received_message.contentTopic == message.contentTopic + assert received_message.timestamp == message.timestamp diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 47d7343d5f..0e88560e82 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,5 +1,4 @@ import logging -from time import sleep from src.libs.common import to_base64 from src.data_classes import MessageRpcQuery @@ -14,11 +13,11 @@ def test_publish_with_various_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: logger.debug("Running test with payload %s", payload["description"]) - message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.default_content_topic) + message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.test_content_topic) try: self.check_published_message_reaches_peer(message) except Exception as e: - logger.error(f"Payload '{payload['description']}' failed: {str(e)}") + logger.error("Payload %s failed: %s", {payload["description"]}, {str(e)}) failed_payloads.append(payload) assert not failed_payloads, f"Payloads failed: {failed_payloads}" @@ -26,14 +25,10 @@ def test_publish_with_various_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: logger.debug("Running test with content topic %s", content_topic["description"]) - message = MessageRpcQuery(payload=to_base64(self.default_payload), contentTopic=content_topic["value"]) + message = MessageRpcQuery(payload=to_base64(self.test_payload), contentTopic=content_topic["value"]) try: self.check_published_message_reaches_peer(message) except Exception as e: - logger.error(f"ContentTopic '{content_topic['description']}' failed: {str(e)}") + logger.error("ContentTopic %s failed: %s", {content_topic["description"]}, {str(e)}) failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" - - def test_fail_for_report_puposes(self): - message = MessageRpcQuery(payload="", contentTopic="") - self.check_published_message_reaches_peer(message)