Skip to content

Commit

Permalink
coordinator rewrite
Browse files Browse the repository at this point in the history
1. moving the coordinator in a separate thread
2. adding a waiting time between when the master its elected and the master can act. This has been done to avoid rapid elections of master that may produce schemas with different ids.

Example of what could happpen without the delay:

|--------------------------------------|
|Node | Node1    | Node2    | Node3    |
|Role | Master   | Follower | Follower |
|--------------------------------------|

Node1 -> Send Message A{id=max(current_ids)} to kafka

where the max(current_ids) = 10

---------------------------------------

Node1 its disconnected, the message its still in the producer queue of Node1

---------------------------------------

Node2 its elected master

|--------------------------------------|
|Node | Node1    | Node2    | Node3    |
|Role | Follower | Master   | Follower |
|--------------------------------------|

----------------------------------------

Node2 produces a message B{id=max(current_ids)} to kafka

Because the message A isn't yet delivered to Node2, the max(current_ids) returns still 10.
And we have an ID clash.

The solution its simple, each master should wait a reasonable high number of milliseconds before acting as a master.
So that all the in-flight messages are delivered to kafka + the reasonable delay of the consumer for the master node before noticing that a message has been produced
  • Loading branch information
eliax1996 committed Nov 19, 2024
1 parent 1ea3ad5 commit 2385470
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 67 deletions.
2 changes: 2 additions & 0 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class Config(TypedDict):
kafka_schema_reader_strict_mode: bool
kafka_retriable_errors_silenced: bool
use_protobuf_formatter: bool
waiting_time_before_acting_as_master_ms: int

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -163,6 +164,7 @@ class ConfigDefaults(Config, total=False):
"kafka_schema_reader_strict_mode": False,
"kafka_retriable_errors_silenced": True,
"use_protobuf_formatter": False,
"waiting_time_before_acting_as_master_ms": 5000,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
73 changes: 60 additions & 13 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,42 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from threading import Thread
from typing import Final

import asyncio
import logging
import time

__all__ = ("MasterCoordinator",)


LOG = logging.getLogger(__name__)


class MasterCoordinator:
"""Handles primary election"""
"""Handles primary election
The coordination is run in own dedicated thread, under stress situation the main
eventloop could have queue of items to work and having own thread will give more
runtime for the coordination tasks as Python intrepreter will switch the active
thread by the configured thread switch interval. Default interval in CPython is
5 milliseconds.
"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
self._running = True
self._sc: SchemaCoordinator | None = None
self._thread: Thread = Thread(target=self._start_loop)
self._loop: asyncio.AbstractEventLoop | None = None
self._schema_reader_stopper: SchemaReaderStoppper | None = None

def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None:
self._schema_reader_stopper = schema_reader_stopper

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
Expand All @@ -41,7 +58,18 @@ def schema_coordinator(self) -> SchemaCoordinator | None:
def config(self) -> Config:
return self._config

async def start(self) -> None:
def start(self) -> None:
self._thread.start()

def _start_loop(self) -> None:
# we should avoid the reassignment otherwise we leak resources
assert self._loop is None, "Loop already started"
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._async_loop())
self._loop.run_forever()

async def _async_loop(self) -> None:
self._kafka_client = self.init_kafka_client()
# Wait until schema coordinator is ready.
# This probably needs better synchronization than plain waits.
Expand All @@ -61,11 +89,23 @@ async def start(self) -> None:
await asyncio.sleep(0.5)

self._sc = self.init_schema_coordinator()
while True:
if self._sc.ready():
return
while self._running:
# keeping the thread sleeping until it die.
# we need to keep the schema_coordinator running
# it contains the `heartbeat` and coordination logic.
await asyncio.sleep(0.5)

LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
while self._loop is not None and not self._loop.is_closed():
self._loop.stop()
if not self._loop.is_running():
self._loop.close()
time.sleep(0.5)
if self._kafka_client:
await self._kafka_client.close()

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
cafile=self._config["ssl_cafile"],
Expand All @@ -90,15 +130,18 @@ def init_kafka_client(self) -> AIOKafkaClient:

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
assert self._schema_reader_stopper is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
schema_reader_stopper=self._schema_reader_stopper,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
master_eligibility=self._config["master_eligibility"],
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
waiting_time_before_acting_as_master_ms=self._config["waiting_time_before_acting_as_master_ms"],
)
schema_coordinator.start()
return schema_coordinator
Expand All @@ -107,7 +150,7 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary=self._sc.are_we_master() if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
Expand All @@ -116,12 +159,16 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:

def get_master_info(self) -> tuple[bool | None, str | None]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url
if not self._sc:
return False, None

if not self._sc.ready():
# we should wait for a while after we have been elected master, we should also consume
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
# for more details
return False, None

return self._sc.are_we_master(), self._sc.master_url

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
self._running = False
105 changes: 98 additions & 7 deletions src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from aiokafka.util import create_future, create_task
from collections.abc import Coroutine, Sequence
from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData
from karapace.typing import JsonData, SchemaReaderStoppper
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Any, Final
Expand Down Expand Up @@ -118,12 +118,12 @@ class SchemaCoordinator:
Contains original comments and also Schema Registry specific comments.
"""

are_we_master: bool | None = None
master_url: str | None = None

def __init__(
self,
client: AIOKafkaClient,
schema_reader_stopper: SchemaReaderStoppper,
hostname: str,
port: int,
scheme: str,
Expand All @@ -135,6 +135,7 @@ def __init__(
rebalance_timeout_ms: int = 30000,
retry_backoff_ms: int = 100,
session_timeout_ms: int = 10000,
waiting_time_before_acting_as_master_ms: int = 5000,
) -> None:
# Coordination flags and futures
self._client: Final = client
Expand All @@ -147,7 +148,17 @@ def __init__(
self.scheme: Final = scheme
self.master_eligibility: Final = master_eligibility
self.master_url: str | None = None
self.are_we_master = False
self._schema_reader_stopper = schema_reader_stopper
self._are_we_master: bool | None = False
# a value that its strictly higher than any clock, so we are sure
# we are never going to consider this the leader without explictly passing
# from False to True for the `_are_we_master` variable.
self._initial_election_sec: float | None = float("infinity")
# used to understand if I need to wait the `waiting_time_before_acting_as_master_ms`
# before acting as a leader or not, if the last time I was leader was less than 5 seconds
# ago I can skip the waiting phase (note that I'm always using my own time, no problems due
# to skew of clocks between machines).
self._last_time_i_was_leader: float = float("-infinity")

self.rejoin_needed_fut: asyncio.Future[None] | None = None
self._coordinator_dead_fut: asyncio.Future[None] | None = None
Expand All @@ -163,6 +174,7 @@ def __init__(
self._rebalance_timeout_ms: Final = rebalance_timeout_ms
self._retry_backoff_ms: Final = retry_backoff_ms
self._session_timeout_ms: Final = session_timeout_ms
self._waiting_time_before_acting_as_master_ms: Final = waiting_time_before_acting_as_master_ms

self._coordinator_lookup_lock: Final = asyncio.Lock()
self._coordination_task: asyncio.Future[None] | None = None
Expand All @@ -182,6 +194,49 @@ def __init__(

self._metadata_snapshot: list[Assignment] = []

def is_master_assigned_to_myself(self) -> bool:
return self._are_we_master or False

def are_we_master(self) -> bool | None:
"""
After a new election its made we should wait for a while since the previous master could have produced
a new message shortly before being disconnected from the cluster.
If this is true the max id selected for the next schema ID, so we can create
two schemas with the same id (or even more if rapid elections are one after another).
The fix its to wait for ~= 5 seconds if new messages arrives before becoming available as a master.
The condition to resume being the master its:
no new messages are still to be processed + at least 5 seconds have passed since we were elected master
"""
if self._are_we_master is None:
# `self._are_we_master` is `None` only during the perform of the assignment
# where we don't know if we are master yet
LOG.warning("No new elections performed yet.")
return None

if not self._ready or not self._schema_reader_stopper.ready():
return False

if self._are_we_master and self._initial_election_sec is not None:
# `time.monotonic()` because we don't want the time to go back or forward because of
# e.g. ntp
if time.monotonic() > self._initial_election_sec + (self._waiting_time_before_acting_as_master_ms / 1000):
# set the value to `None` since it's expensive to call each time the monotonic clock.
LOG.info("Declaring myself as master since %s are passed!", self._waiting_time_before_acting_as_master_ms)
self._initial_election_sec = None
# this is the last point in time were we wait till to the end of the log queue for new
# incoming messages.
self._schema_reader_stopper.set_not_ready()
# flag that its set at startup, fix this
return False

LOG.info(
"Newly elected as master, waiting %s ms before writing any schema to let other requests complete",
self._waiting_time_before_acting_as_master_ms,
)
return False

return self._are_we_master

def start(self) -> None:
"""Must be called after creating SchemaCoordinator object to initialize
futures and start the coordination task.
Expand Down Expand Up @@ -281,6 +336,10 @@ async def _maybe_leave_group(self) -> None:
LOG.warning("LeaveGroup request failed: %s", err)
else:
LOG.info("LeaveGroup request succeeded")
# to avoid sleeping if we were the master, a new actor join the cluster
# and we are immediately elected as leader again.
if self.are_we_master():
self._last_time_i_was_leader = time.monotonic()
self.reset_generation()

def _handle_metadata_update(self, _: ClusterMetadata) -> None:
Expand Down Expand Up @@ -349,7 +408,7 @@ async def perform_assignment(
response_data.protocol,
response_data.members,
)
self.are_we_master = None
self._are_we_master = None
error = NO_ERROR
urls = {}
fallback_urls = {}
Expand Down Expand Up @@ -417,13 +476,40 @@ async def _on_join_complete(
# On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real
if member_assignment["master"] == member_id and member_identity["master_eligibility"]:
self.master_url = master_url
self.are_we_master = True
self._are_we_master = True
ive_never_been_a_master = self._last_time_i_was_leader == float("-inf")
another_master_could_have_been_elected = (
self._last_time_i_was_leader + (self._waiting_time_before_acting_as_master_ms / 1000) < time.monotonic()
)
if ive_never_been_a_master or another_master_could_have_been_elected:
# we need to wait late record arrivals only in the case there
# was a master change, the time before acting its always respect
# to which was the previous master (if we were master no need
# to wait more before acting)
self._schema_reader_stopper.set_not_ready()
# flag that its set at startup, fix this
# `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp
self._initial_election_sec = time.monotonic()

LOG.info(
"Declaring myself as not master for %s milliseconds, "
"another master meanwhile could have added other records",
self._waiting_time_before_acting_as_master_ms,
)
else:
LOG.info(
"Starting immediately serving requests since I was master less than %s milliseconds ago, "
"no other masters could have written a new schema meanwhile",
self._waiting_time_before_acting_as_master_ms,
)
elif not member_identity["master_eligibility"]:
LOG.warning("Member %s is not eligible as a master", member_id)
self.master_url = None
self.are_we_master = False
self._are_we_master = False
else:
LOG.info("We are not elected as master")
self.master_url = master_url
self.are_we_master = False
self._are_we_master = False
self._ready = True
return None

Expand All @@ -434,13 +520,16 @@ def coordinator_dead(self) -> None:
"""
if self._coordinator_dead_fut is not None and self.coordinator_id is not None:
LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id)
self._are_we_master = False
self.coordinator_id = None
self._coordinator_dead_fut.set_result(None)

def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
need to re-join the group.
"""
LOG.info("Resetting generation status")
self._are_we_master = False
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.request_rejoin()
Expand Down Expand Up @@ -514,6 +603,8 @@ async def __coordination_routine(self) -> None:
try:
await self.ensure_coordinator_known()
if self.need_rejoin():
if self.are_we_master():
self._last_time_i_was_leader = time.monotonic()
new_assignment = await self.ensure_active_group()
if not new_assignment:
continue
Expand Down
Loading

0 comments on commit 2385470

Please sign in to comment.