Skip to content

Commit

Permalink
WIP: fix tests adding a wait for the master to become available befor…
Browse files Browse the repository at this point in the history
…e forwarding the requests
  • Loading branch information
eliax1996 committed Aug 23, 2024
1 parent 054efb7 commit 7f657bd
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 15 deletions.
6 changes: 5 additions & 1 deletion karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def __init__(

self._metadata_snapshot: list[Assignment] = []

def is_master_assigned_to_myself(self) -> bool:
return self._are_we_master or False

def are_we_master(self) -> bool | None:
"""
After a new election its made we should wait for a while since the previous master could have produced
Expand Down Expand Up @@ -489,7 +492,8 @@ async def _on_join_complete(
else:
LOG.info(
"Starting immediately serving requests since I was master less than %s milliseconds ago, "
"no other masters could have written a new schema meanwhile"
"no other masters could have written a new schema meanwhile",
self._waiting_time_before_acting_as_master_ms,
)
elif not member_identity["master_eligibility"]:
self.master_url = None
Expand Down
22 changes: 22 additions & 0 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ def _add_schema_registry_routes(self) -> None:
schema_request=True,
auth=self._auth,
)
self.route(
"/master_available",
callback=self.master_available,
# post because they cannot be cached, need to be sure always gathering the real value of that property
method="POST",
schema_request=True,
)
self.route(
"/config",
callback=self.config_set,
Expand Down Expand Up @@ -715,6 +722,21 @@ async def config_subject_delete(

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

async def master_available(self, content_type: str, *, request: HTTPRequest) -> None:
are_we_master, master_url = await self.schema_registry.get_master()

if (
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
):
self.r({"master_available": are_we_master}, content_type)

if master_url is None:
self.r({"master_available": False}, content_type)
else:
url = f"{master_url}/master_available"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST")

async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
deleted = request.query.get("deleted", "false").lower() == "true"
subjects = self.schema_registry.database.find_subjects(include_deleted=deleted)
Expand Down
40 changes: 31 additions & 9 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from _pytest.fixtures import SubRequest
from aiohttp.pytest_plugin import AiohttpClient
from aiohttp.test_utils import TestClient
Expand Down Expand Up @@ -31,11 +33,12 @@
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from typing import AsyncIterator, Iterator, List, Optional
from tests.utils import repeat_until_master_is_available, repeat_until_successful_request
from typing import AsyncIterator, Iterator
from urllib.parse import urlparse

import asyncio
import contextlib
import json
import os
import pathlib
Expand Down Expand Up @@ -269,7 +272,7 @@ async def fixture_rest_async(
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
) -> AsyncIterator[KafkaRest | None]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
Expand Down Expand Up @@ -342,7 +345,7 @@ async def fixture_rest_async_novalidation(
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
) -> AsyncIterator[KafkaRest | None]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
Expand Down Expand Up @@ -415,7 +418,7 @@ async def fixture_rest_async_registry_auth(
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
kafka_servers: KafkaServers,
registry_async_client_auth: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
) -> AsyncIterator[KafkaRest | None]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
Expand Down Expand Up @@ -486,7 +489,7 @@ async def fixture_registry_async_pair(
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[List[str]]:
) -> AsyncIterator[list[str]]:
"""Starts a cluster of two Schema Registry servers and returns their URL endpoints."""

config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers}
Expand All @@ -497,7 +500,8 @@ async def fixture_registry_async_pair(
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as endpoints:
yield [server.endpoint.to_url() for server in endpoints]
async with after_master_is_available(endpoints, request.config.getoption("server_ca")):
yield [server.endpoint.to_url() for server in endpoints]


@pytest.fixture(scope="function", name="registry_cluster")
Expand Down Expand Up @@ -550,6 +554,7 @@ async def fixture_registry_async_client(
timeout=10,
sleep=0.3,
)
await repeat_until_master_is_available(client)
yield client
finally:
await client.close()
Expand Down Expand Up @@ -689,19 +694,35 @@ async def fixture_registry_async_client_auth(
timeout=10,
sleep=0.3,
)
await repeat_until_master_is_available(client)
yield client
finally:
await client.close()


@contextlib.asynccontextmanager
async def after_master_is_available(
registry_instances: list[RegistryDescription], server_ca: str | None
) -> AsyncIterator[None]:
client = Client(
server_uri=registry_instances[0].endpoint.to_url(),
server_ca=server_ca,
)
try:
await repeat_until_master_is_available(client)
yield
finally:
await client.close()


@pytest.fixture(scope="function", name="registry_async_auth_pair")
async def fixture_registry_async_auth_pair(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[List[str]]:
) -> AsyncIterator[list[str]]:
"""Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints."""

config1: Config = {
Expand All @@ -718,7 +739,8 @@ async def fixture_registry_async_auth_pair(
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as endpoints:
yield [server.endpoint.to_url() for server in endpoints]
async with after_master_is_available(endpoints, request.config.getoption("server_ca")):
yield [server.endpoint.to_url() for server in endpoints]


@pytest.fixture(scope="function", name="new_topic")
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def is_master(mc: MasterCoordinator) -> bool:
This takes care of a race condition were the flag `master` is set but
`master_url` is not yet set.
"""
return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url)
return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master() and mc.schema_coordinator.master_url)


def has_master(mc: MasterCoordinator) -> bool:
"""True if `mc` has a master."""
return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url)
return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master() and mc.schema_coordinator.master_url)


@pytest.mark.timeout(60) # Github workflows need a bit of extra time
@pytest.mark.timeout(65) # Github workflows need a bit of extra time
@pytest.mark.parametrize("strategy", ["lowest", "highest"])
async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None:
# Use random port to allow for parallel runs.
Expand Down Expand Up @@ -185,11 +185,11 @@ async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortR
mc = await init_admin(config_aa)
try:
# Wait for the election to happen, ie. flag is not None
while not mc.schema_coordinator or mc.schema_coordinator.are_we_master is None:
while not mc.schema_coordinator or mc.schema_coordinator.are_we_master() is None:
await asyncio.sleep(0.5)

# Make sure the end configuration is as expected
assert mc.schema_coordinator.are_we_master is False
assert mc.schema_coordinator.are_we_master() is False
assert mc.schema_coordinator.master_url is None
finally:
await mc.close()
Expand Down
16 changes: 16 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ async def repeat_until_successful_request(
return res


async def repeat_until_master_is_available(client: Client) -> None:
while True:
res = await repeat_until_successful_request(
client.post,
"master_available",
json_data={},
headers=None,
error_msg=f"Registry API {client.server_uri} is unreachable",
timeout=10,
sleep=1,
)
reply = res.json()
if reply["master_available"] is True:
break


def write_ini(file_path: Path, ini_data: dict) -> None:
ini_contents = (f"{key}={value}" for key, value in ini_data.items())
file_contents = "\n".join(ini_contents)
Expand Down

0 comments on commit 7f657bd

Please sign in to comment.