From 7f657bdc1c14548517b5509cdbb524609f282442 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Thu, 22 Aug 2024 19:37:24 +0200 Subject: [PATCH] WIP: fix tests adding a wait for the master to become available before forwarding the requests --- karapace/coordinator/schema_coordinator.py | 6 ++- karapace/schema_registry_apis.py | 22 +++++++++++ tests/integration/conftest.py | 40 +++++++++++++++----- tests/integration/test_master_coordinator.py | 10 ++--- tests/utils.py | 16 ++++++++ 5 files changed, 79 insertions(+), 15 deletions(-) diff --git a/karapace/coordinator/schema_coordinator.py b/karapace/coordinator/schema_coordinator.py index 852c63e8f..c36e60c9e 100644 --- a/karapace/coordinator/schema_coordinator.py +++ b/karapace/coordinator/schema_coordinator.py @@ -191,6 +191,9 @@ def __init__( self._metadata_snapshot: list[Assignment] = [] + def is_master_assigned_to_myself(self) -> bool: + return self._are_we_master or False + def are_we_master(self) -> bool | None: """ After a new election its made we should wait for a while since the previous master could have produced @@ -489,7 +492,8 @@ async def _on_join_complete( else: LOG.info( "Starting immediately serving requests since I was master less than %s milliseconds ago, " - "no other masters could have written a new schema meanwhile" + "no other masters could have written a new schema meanwhile", + self._waiting_time_before_acting_as_master_ms, ) elif not member_identity["master_eligibility"]: self.master_url = None diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index d3b90dac6..75ba91241 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -200,6 +200,13 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, auth=self._auth, ) + self.route( + "/master_available", + callback=self.master_available, + # post because they cannot be cached, need to be sure always gathering the real value of that property + method="POST", + schema_request=True, + ) self.route( "/config", callback=self.config_set, @@ -715,6 +722,21 @@ async def config_subject_delete( self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) + async def master_available(self, content_type: str, *, request: HTTPRequest) -> None: + are_we_master, master_url = await self.schema_registry.get_master() + + if ( + self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access + and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access + ): + self.r({"master_available": are_we_master}, content_type) + + if master_url is None: + self.r({"master_available": False}, content_type) + else: + url = f"{master_url}/master_available" + await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST") + async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None: deleted = request.query.get("deleted", "false").lower() == "true" subjects = self.schema_registry.database.find_subjects(include_deleted=deleted) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6ed4f4b22..1c601f2fc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,6 +4,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient @@ -31,11 +33,12 @@ from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk -from tests.utils import repeat_until_successful_request -from typing import AsyncIterator, Iterator, List, Optional +from tests.utils import repeat_until_master_is_available, repeat_until_successful_request +from typing import AsyncIterator, Iterator from urllib.parse import urlparse import asyncio +import contextlib import json import os import pathlib @@ -269,7 +272,7 @@ async def fixture_rest_async( tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -342,7 +345,7 @@ async def fixture_rest_async_novalidation( tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -415,7 +418,7 @@ async def fixture_rest_async_registry_auth( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument kafka_servers: KafkaServers, registry_async_client_auth: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -486,7 +489,7 @@ async def fixture_registry_async_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> AsyncIterator[List[str]]: +) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers and returns their URL endpoints.""" config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers} @@ -497,7 +500,8 @@ async def fixture_registry_async_pair( data_dir=session_logdir / _clear_test_name(request.node.name), port_range=port_range, ) as endpoints: - yield [server.endpoint.to_url() for server in endpoints] + async with after_master_is_available(endpoints, request.config.getoption("server_ca")): + yield [server.endpoint.to_url() for server in endpoints] @pytest.fixture(scope="function", name="registry_cluster") @@ -550,6 +554,7 @@ async def fixture_registry_async_client( timeout=10, sleep=0.3, ) + await repeat_until_master_is_available(client) yield client finally: await client.close() @@ -689,11 +694,27 @@ async def fixture_registry_async_client_auth( timeout=10, sleep=0.3, ) + await repeat_until_master_is_available(client) yield client finally: await client.close() +@contextlib.asynccontextmanager +async def after_master_is_available( + registry_instances: list[RegistryDescription], server_ca: str | None +) -> AsyncIterator[None]: + client = Client( + server_uri=registry_instances[0].endpoint.to_url(), + server_ca=server_ca, + ) + try: + await repeat_until_master_is_available(client) + yield + finally: + await client.close() + + @pytest.fixture(scope="function", name="registry_async_auth_pair") async def fixture_registry_async_auth_pair( request: SubRequest, @@ -701,7 +722,7 @@ async def fixture_registry_async_auth_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> AsyncIterator[List[str]]: +) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints.""" config1: Config = { @@ -718,7 +739,8 @@ async def fixture_registry_async_auth_pair( data_dir=session_logdir / _clear_test_name(request.node.name), port_range=port_range, ) as endpoints: - yield [server.endpoint.to_url() for server in endpoints] + async with after_master_is_available(endpoints, request.config.getoption("server_ca")): + yield [server.endpoint.to_url() for server in endpoints] @pytest.fixture(scope="function", name="new_topic") diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 225539f8d..d6ee8b006 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -28,15 +28,15 @@ def is_master(mc: MasterCoordinator) -> bool: This takes care of a race condition were the flag `master` is set but `master_url` is not yet set. """ - return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url) + return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master() and mc.schema_coordinator.master_url) def has_master(mc: MasterCoordinator) -> bool: """True if `mc` has a master.""" - return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url) + return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master() and mc.schema_coordinator.master_url) -@pytest.mark.timeout(60) # Github workflows need a bit of extra time +@pytest.mark.timeout(65) # Github workflows need a bit of extra time @pytest.mark.parametrize("strategy", ["lowest", "highest"]) async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None: # Use random port to allow for parallel runs. @@ -185,11 +185,11 @@ async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortR mc = await init_admin(config_aa) try: # Wait for the election to happen, ie. flag is not None - while not mc.schema_coordinator or mc.schema_coordinator.are_we_master is None: + while not mc.schema_coordinator or mc.schema_coordinator.are_we_master() is None: await asyncio.sleep(0.5) # Make sure the end configuration is as expected - assert mc.schema_coordinator.are_we_master is False + assert mc.schema_coordinator.are_we_master() is False assert mc.schema_coordinator.master_url is None finally: await mc.close() diff --git a/tests/utils.py b/tests/utils.py index f38097858..2c5dd4ebf 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -291,6 +291,22 @@ async def repeat_until_successful_request( return res +async def repeat_until_master_is_available(client: Client) -> None: + while True: + res = await repeat_until_successful_request( + client.post, + "master_available", + json_data={}, + headers=None, + error_msg=f"Registry API {client.server_uri} is unreachable", + timeout=10, + sleep=1, + ) + reply = res.json() + if reply["master_available"] is True: + break + + def write_ini(file_path: Path, ini_data: dict) -> None: ini_contents = (f"{key}={value}" for key, value in ini_data.items()) file_contents = "\n".join(ini_contents)