Skip to content

Commit

Permalink
fixes based on Alex suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
fbarbu15 committed Nov 16, 2023
1 parent ca94c2b commit 2e58b2d
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 53 deletions.
1 change: 1 addition & 0 deletions src/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ def get_env_var(var_name, default=None):
PROTOCOL = get_env_var("PROTOCOL", "REST")
RUNNING_IN_CI = get_env_var("CI")
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 10)
4 changes: 2 additions & 2 deletions src/libs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def to_base64(input_data):


def attach_allure_file(file):
logger.debug("Attaching file %s", file)
logger.debug(f"Attaching file {file}")
allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT)


def delay(num_seconds):
logger.debug("Sleeping for %s seconds", num_seconds)
logger.debug(f"Sleeping for {num_seconds} seconds")
sleep(num_seconds)
11 changes: 6 additions & 5 deletions src/node/api_clients/base_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import requests
from tenacity import retry, stop_after_delay, wait_fixed
from abc import ABC, abstractmethod
from src.env_vars import API_REQUEST_TIMEOUT
from src.libs.custom_logger import get_custom_logger

logger = get_custom_logger(__name__)
Expand All @@ -13,18 +14,18 @@ class BaseClient(ABC):
# ensures that such intermittent issues don't cause the tests to fail outright.
@retry(stop=stop_after_delay(0.5), wait=wait_fixed(0.1), reraise=True)
def make_request(self, method, url, headers=None, data=None):
logger.debug("%s call: %s with payload: %s", method.upper(), url, data)
response = requests.request(method.upper(), url, headers=headers, data=data)
logger.debug(f"{method.upper()} call: {url} with payload: {data}")
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
try:
response.raise_for_status()
except requests.HTTPError as http_err:
logger.error("HTTP error occurred: %s. Response content: %s", http_err, response.content)
logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}")
raise
except Exception as err:
logger.error("An error occurred: %s. Response content: %s", err, response.content)
logger.error(f"An error occurred: {err}. Response content: {response.content}")
raise
else:
logger.info("Response status code: %s. Response content: %s", response.status_code, response.content)
logger.error(f"Response status code: {response.status_code}. Response content: {response.content}")
return response

@abstractmethod
Expand Down
20 changes: 10 additions & 10 deletions src/node/docker_mananger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ class DockerManager:
def __init__(self, image):
self._image = image
self._client = docker.from_env()
logger.debug("Docker client initialized with image %s", self._image)
logger.debug(f"Docker client initialized with image {self._image}")

def create_network(self, network_name=NETWORK_NAME):
logger.debug("Attempting to create or retrieve network %s", network_name)
logger.debug(f"Attempting to create or retrieve network {network_name}")
networks = self._client.networks.list(names=[network_name])
if networks:
logger.debug("Network %s already exists", network_name)
logger.debug(f"Network {network_name} already exists")
return networks[0]

network = self._client.networks.create(
network_name,
driver="bridge",
ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]),
)
logger.debug("Network %s created", network_name)
logger.debug(f"Network {network_name} created")
return network

def start_container(self, image_name, ports, args, log_path, container_ip):
Expand All @@ -39,14 +39,14 @@ def start_container(self, image_name, ports, args, log_path, container_ip):
else:
cli_args.append(f"--{key}={value}") # Add a single command
port_bindings = {f"{port}/tcp": ("", port) for port in ports}
logger.debug("Starting container with image %s", image_name)
logger.debug("Using args %s", cli_args)
logger.debug(f"Starting container with image {image_name}")
logger.debug(f"Using args {cli_args}")
container = self._client.containers.run(image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True)

network = self._client.networks.get(NETWORK_NAME)
network.connect(container, ipv4_address=container_ip)

logger.debug("Container started with ID %s. Setting up logs at %s", container.short_id, log_path)
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}")
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path))
log_thread.daemon = True
log_thread.start()
Expand All @@ -63,22 +63,22 @@ def generate_ports(self, base_port=None, count=5):
if base_port is None:
base_port = random.randint(1024, 65535 - count)
ports = [base_port + i for i in range(count)]
logger.debug("Generated ports %s", ports)
logger.debug(f"Generated ports {ports}")
return ports

@staticmethod
def generate_random_ext_ip():
base_ip_fragments = ["172", "18"]
ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)])
logger.debug("Generated random external IP %s", ext_ip)
logger.debug(f"Generated random external IP {ext_ip}")
return ext_ip

def is_container_running(self, container):
try:
refreshed_container = self._client.containers.get(container.id)
return refreshed_container.status == "running"
except NotFound:
logger.error("Container with ID %s not found", container.id)
logger.error(f"Container with ID {container.id} not found")
return False

@property
Expand Down
24 changes: 13 additions & 11 deletions src/node/waku_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, docker_image, docker_log_prefix=""):
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
self._container = None
logger.debug("WakuNode instance initialized with log path %s", self._log_path)
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")

@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def start(self, **kwargs):
Expand Down Expand Up @@ -70,47 +70,49 @@ def start(self, **kwargs):

self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip)
logger.debug(
"Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port
f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port}"
)
DS.waku_nodes.append(self)
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
try:
self.ensure_ready()
except Exception as e:
logger.error("%s service did not become ready in time: %s", PROTOCOL, e)
except Exception as ex:
logger.error(f"{PROTOCOL} service did not become ready in time: {ex}")
raise

@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._container:
logger.debug("Stopping container with id %s", self._container.short_id)
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
logger.debug("Container stopped.")

def restart(self):
if self._container:
logger.debug("Restarting container with id %s", self._container.short_id)
logger.debug(f"Restarting container with id {self._container.short_id}")
self._container.restart()

def pause(self):
if self._container:
logger.debug("Pausing container with id %s", self._container.short_id)
logger.debug(f"Pausing container with id {self._container.short_id}")
self._container.pause()

def unpause(self):
if self._container:
logger.debug("Unpause container with id %s", self._container.short_id)
logger.debug(f"Unpause container with id {self._container.short_id}")
self._container.unpause()

@retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True)
def ensure_ready(self):
self.info()
logger.info("%s service is ready !!", PROTOCOL)
logger.info(f"{PROTOCOL} service is ready !!")

def info(self):
return self._api.info()

def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUB_TOPIC]):
def set_subscriptions(self, pubsub_topics=None):
if not pubsub_topics:
pubsub_topics = [DEFAULT_PUBSUB_TOPIC]
return self._api.set_subscriptions(pubsub_topics)

def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC):
Expand All @@ -129,7 +131,7 @@ def type(self):
elif self.is_gowaku():
return "gowaku"
else:
raise Exception("Unknown node type!!!")
raise ValueError("Unknown node type!!!")

def is_nwaku(self):
return "nwaku" in self.image
Expand Down
10 changes: 4 additions & 6 deletions src/steps/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ def network_warm_up(self, setup_nodes):
self.wait_for_published_message_to_reach_peer(120)
logger.info("WARM UP successful !!")
except Exception as ex:
raise Exception(f"WARM UP FAILED WITH: {ex}")
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")

@allure.step
def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1):
if not pubsub_topic:
pubsub_topic = self.test_pubsub_topic
self.node1.send_message(message, pubsub_topic)
self.node1.send_message(message, pubsub_topic or self.test_pubsub_topic)
delay(message_propagation_delay)
get_messages_response = self.node2.get_messages(pubsub_topic)
get_messages_response = self.node2.get_messages(pubsub_topic or self.test_pubsub_topic)
assert get_messages_response, "Peer node couldn't find any messages"
received_message = message_rpc_response_schema.load(get_messages_response[0])
self.assert_received_message(message, received_message)
Expand All @@ -51,7 +49,7 @@ def assert_fail_message(field_name):

assert received_message.payload == sent_message["payload"], assert_fail_message("payload")
assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic")
if "timestamp" in sent_message and sent_message["timestamp"] is not None:
if sent_message.get("timestamp") is not None:
if isinstance(sent_message["timestamp"], float):
assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp")
else:
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_id(request):

@pytest.fixture(scope="function", autouse=True)
def test_setup(request, test_id):
logger.debug("Running test: %s with id: %s", request.node.name, request.cls.test_id)
logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}")
yield
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")):
if os.path.getmtime(file) < time() - 3600:
Expand Down Expand Up @@ -74,5 +74,5 @@ def close_open_nodes(attach_logs_on_fail):
except Exception as ex:
if "No such container" in str(ex):
crashed_containers.append(node.image)
logger.error("Failed to stop container because of error %s", ex)
logger.error(f"Failed to stop container because of error {ex}")
assert not crashed_containers, f"Containers {crashed_containers} crashed during the test!!!"
34 changes: 17 additions & 17 deletions tests/relay/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ class TestRelayPublish(StepsRelay):
def test_publish_with_valid_payloads(self):
failed_payloads = []
for payload in SAMPLE_INPUTS:
logger.debug("Running test with payload %s", payload["description"])
logger.debug(f'Running test with payload {payload["description"]}')
message = self.create_message(payload=to_base64(payload["value"]))
try:
self.check_published_message_reaches_peer(message)
except Exception as e:
logger.error("Payload %s failed: %s", payload["description"], str(e))
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
failed_payloads.append(payload["description"])
assert not failed_payloads, f"Payloads failed: {failed_payloads}"

def test_publish_with_invalid_payloads(self):
success_payloads = []
for payload in INVALID_PAYLOADS:
logger.debug("Running test with payload %s", payload["description"])
logger.debug(f'Running test with payload {payload["description"]}')
message = self.create_message(payload=payload["value"])
try:
self.node1.send_message(message, self.test_pubsub_topic)
Expand All @@ -42,14 +42,13 @@ def test_publish_with_missing_payload(self):

def test_publish_with_payload_less_than_one_mb(self):
payload_length = 1024 * 1023
logger.debug("Running test with payload length of %s bytes", payload_length)
logger.debug(f"Running test with payload length of {payload_length} bytes")
message = self.create_message(payload=to_base64("a" * (payload_length)))
self.check_published_message_reaches_peer(message, message_propagation_delay=2)

def test_publish_with_payload_equal_or_more_than_one_mb(self):
payload_length = 1024 * 1023
for payload_length in [1024 * 1024, 1024 * 1024 * 10]:
logger.debug("Running test with payload length of %s bytes", payload_length)
logger.debug(f"Running test with payload length of {payload_length} bytes")
message = self.create_message(payload=to_base64("a" * (payload_length)))
try:
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
Expand All @@ -60,19 +59,19 @@ def test_publish_with_payload_equal_or_more_than_one_mb(self):
def test_publish_with_valid_content_topics(self):
failed_content_topics = []
for content_topic in SAMPLE_INPUTS:
logger.debug("Running test with content topic %s", content_topic["description"])
logger.debug(f'Running test with content topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"])
try:
self.check_published_message_reaches_peer(message)
except Exception as e:
logger.error("ContentTopic %s failed: %s", content_topic["description"], str(e))
logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}')
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"

def test_publish_with_invalid_content_topics(self):
success_content_topics = []
for content_topic in INVALID_CONTENT_TOPICS:
logger.debug("Running test with contetn topic %s", content_topic["description"])
logger.debug(f'Running test with contetn topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"])
try:
self.node1.send_message(message, self.test_pubsub_topic)
Expand All @@ -93,11 +92,11 @@ def test_publish_on_multiple_pubsub_topics(self):
self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS)
failed_pubsub_topics = []
for pubsub_topic in VALID_PUBSUB_TOPICS:
logger.debug("Running test with pubsub topic %s", pubsub_topic)
logger.debug(f"Running test with pubsub topic {pubsub_topic}")
try:
self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic)
except Exception as e:
logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e))
logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}")
failed_pubsub_topics.append(pubsub_topic)
assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}"

Expand All @@ -119,20 +118,20 @@ def test_publish_with_valid_timestamps(self):
failed_timestamps = []
for timestamp in SAMPLE_TIMESTAMPS:
if self.node1.type() in timestamp["valid_for"]:
logger.debug("Running test with timestamp %s", timestamp["description"])
logger.debug(f'Running test with timestamp {timestamp["description"]}')
message = self.create_message(timestamp=timestamp["value"])
try:
self.check_published_message_reaches_peer(message)
except Exception as ex:
logger.error("Timestamp %s failed: %s", timestamp["description"], str(ex))
logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}')
failed_timestamps.append(timestamp)
assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}"

def test_publish_with_invalid_timestamps(self):
success_timestamps = []
for timestamp in SAMPLE_TIMESTAMPS:
if self.node1.type() not in timestamp["valid_for"]:
logger.debug("Running test with timestamp %s", timestamp["description"])
logger.debug(f'Running test with timestamp {timestamp["description"]}')
message = self.create_message(timestamp=timestamp["value"])
try:
self.check_published_message_reaches_peer(message)
Expand Down Expand Up @@ -168,11 +167,11 @@ def test_publish_with_invalid_meta(self):
def test_publish_with_ephemeral(self):
failed_ephemeral = []
for ephemeral in [True, False]:
logger.debug("Running test with Ephemeral %s", ephemeral)
logger.debug(f"Running test with Ephemeral {ephemeral}")
try:
self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral))
except Exception as e:
logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e))
logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}")
failed_ephemeral.append(ephemeral)
assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}"

Expand All @@ -196,12 +195,13 @@ def test_publish_and_retrieve_duplicate_message(self):
except Exception as ex:
assert "Peer node couldn't find any messages" in str(ex)

def test_publish_after_node_pauses(self):
def test_publish_after_node_pauses_and_pauses(self):
self.check_published_message_reaches_peer(self.create_message())
self.node1.pause()
self.node1.unpause()
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1")))
self.node2.pause()
self.check_published_message_reaches_peer(self.create_message())
self.node2.unpause()
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2")))

Expand Down

0 comments on commit 2e58b2d

Please sign in to comment.