Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dpe-5945-rename-external
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex committed Nov 27, 2024
2 parents 39edb1e + f4d9336 commit 365078a
Show file tree
Hide file tree
Showing 30 changed files with 1,347 additions and 221 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
- integration-upgrade
- integration-balancer-single
- integration-balancer-multi
- integration-kraft-single
- integration-kraft-multi
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ The Charmed Operator can be found on [Charmhub](https://charmhub.io/kafka-k8s) a
- SASL/SCRAM auth for Broker-Broker and Client-Broker authentication enabled by default.
- Access control management supported with user-provided ACL lists.

The Charmed Kafka K8s Operator uses Kafka binaries released by the The Apache Software Foundation, made available using the [`ubuntu/kafka` OCI image](https://registry.hub.docker.com/r/ubuntu/kafka) distributed by Canonical.

As currently Kafka requires a paired ZooKeeper deployment in production, this operator makes use of the [ZooKeeper K8s Operator](https://github.com/canonical/zookeeper-k8s-operator) for various essential functions.

## Requirements
Expand Down
3 changes: 3 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ get-admin-credentials:
The returned client_properties can be used for Kafka bin commands using `--bootstrap-server` and `--command-config` for admin level administration
This action must be called on the leader unit.

get-listeners:
description: Get all active listeners and their port allocations

rebalance:
description: Trigger a rebalance of cluster partitions based on configured goals
params:
Expand Down
6 changes: 5 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ options:
roles:
description: |
Comma separated list of the roles assigned to the nodes of this cluster.
This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control).
This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control), 'controller' (KRaft mode).
type: string
default: broker
compression_type:
Expand Down Expand Up @@ -92,6 +92,10 @@ options:
description: Config options to add extra-sans to the ones used when requesting server certificates. The extra-sans are specified by comma-separated names to be added when requesting signed certificates. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1 when requesting the certificate.
type: string
default: ""
extra_listeners:
description: "Config options to add extra SANs to the ones used when requesting server certificates, and to define custom `advertised.listeners` and ports for clients external to the Juju model. These items are comma-separated. Use '{unit}' as a placeholder to be filled with the unit number if necessary. For port allocations, providing the port for a given listener will offset the generated port number by that amount, with an accepted value range of 20001-50000. For example, a provided value of 'worker-{unit}.domain.com:30000' will generate listeners for unit 0 with name 'worker-0.domain.com', and be allocated ports 39092, 39093 etc for each authentication scheme."
type: string
default: ""
log_level:
description: "Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG"
type: string
Expand Down
7 changes: 5 additions & 2 deletions metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
name: kafka-k8s
display-name: Charmed Kafka K8s
display-name: Apache Kafka - K8s
description: |
Kafka is an event streaming platform. This charm deploys and operates Kafka on
a K8s environment.
Apache Kafka is a free, open source software project by the Apache Software Foundation.
Users can find out more at the [Kafka project page](https://kafka.apache.org/).
summary: Charmed Kafka K8s Operator
docs: https://discourse.charmhub.io/t/charmed-kafka-k8s-documentation/10296
source: https://github.com/canonical/kafka-k8s-operator
Expand All @@ -31,7 +34,7 @@ resources:
kafka-image:
type: oci-image
description: OCI Image for Apache Kafka
upstream-source: ghcr.io/canonical/charmed-kafka@sha256:67d2729ca6c4f158682c481a512c37e555bae6edc30e8f0acdb0460fcbeffe88
upstream-source: ghcr.io/canonical/charmed-kafka@sha256:0f180540572828e3152ab6a39b80891c6dfb2d6abc79914ec19646e6b487b1c8

peers:
cluster:
Expand Down
5 changes: 4 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ def _on_roles_changed(self, _):
This handler is in charge of stopping the workloads, since the sub-operators would not
be instantiated if roles are changed.
"""
if not self.state.runs_broker and self.broker.workload.active():
if (
not (self.state.runs_broker or self.state.runs_controller)
and self.broker.workload.active()
):
self.broker.workload.stop()

if (
Expand Down
187 changes: 143 additions & 44 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from lightkube.core.exceptions import ApiError as LightKubeApiError
from ops import Object, Relation
from ops.model import Unit
from tenacity import retry, retry_if_exception_cause_type, stop_after_attempt, wait_fixed

from core.models import (
BrokerCapacities,
Expand All @@ -37,7 +38,10 @@
ADMIN_USER,
BALANCER,
BROKER,
CONTROLLER,
CONTROLLER_PORT,
INTERNAL_USERS,
KRAFT_NODE_ID_OFFSET,
MIN_REPLICAS,
OAUTH_REL_NAME,
PEER,
Expand Down Expand Up @@ -86,7 +90,7 @@ class PeerClusterData(ProviderData, RequirerData):
"""Broker provider data model."""

SECRET_LABEL_MAP = SECRET_LABEL_MAP
SECRET_FIELDS = BALANCER.requested_secrets
SECRET_FIELDS = list(set(BALANCER.requested_secrets) | set(CONTROLLER.requested_secrets))


class ClusterState(Object):
Expand Down Expand Up @@ -138,45 +142,48 @@ def peer_cluster_relation(self) -> Relation | None:
@property
def peer_cluster_orchestrator(self) -> PeerCluster:
"""The state for the related `peer-cluster-orchestrator` application that this charm is requiring from."""
balancer_kwargs: dict[str, Any] = (
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
}
if self.runs_balancer
else {}
)
extra_kwargs: dict[str, Any] = {}

if self.runs_balancer:
extra_kwargs.update(
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
}
)

if self.runs_controller:
extra_kwargs.update(
{
"controller_quorum_uris": self.cluster.controller_quorum_uris,
}
)

return PeerCluster(
relation=self.peer_cluster_relation,
data_interface=PeerClusterData(self.model, PEER_CLUSTER_RELATION),
**balancer_kwargs,
**extra_kwargs,
)

@property
def peer_cluster(self) -> PeerCluster:
"""The state for the related `peer-cluster` application that this charm is providing to."""
return PeerCluster(
relation=self.peer_cluster_orchestrator_relation,
data_interface=PeerClusterOrchestratorData(
self.model, PEER_CLUSTER_ORCHESTRATOR_RELATION
),
)

@property
def balancer(self) -> PeerCluster:
"""The state for the `peer-cluster-orchestrator` related balancer application."""
balancer_kwargs: dict[str, Any] = (
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
}
if self.runs_balancer
else {}
)
extra_kwargs: dict[str, Any] = {}

if self.runs_broker: # must be providing, initialise with necessary broker data
if self.runs_controller or self.runs_balancer:
extra_kwargs.update(
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
"controller_quorum_uris": self.cluster.controller_quorum_uris,
}
)

# FIXME: `cluster_manager` check instead of running broker
# must be providing, initialise with necessary broker data
if self.runs_broker:
return PeerCluster(
relation=self.peer_cluster_orchestrator_relation, # if same app, this will be None and OK
data_interface=PeerClusterOrchestratorData(
Expand All @@ -185,12 +192,13 @@ def balancer(self) -> PeerCluster:
broker_username=ADMIN_USER,
broker_password=self.cluster.internal_user_credentials.get(ADMIN_USER, ""),
broker_uris=self.bootstrap_server,
cluster_uuid=self.cluster.cluster_uuid,
racks=self.racks,
broker_capacities=self.broker_capacities,
zk_username=self.zookeeper.username,
zk_password=self.zookeeper.password,
zk_uris=self.zookeeper.uris,
**balancer_kwargs, # in case of roles=broker,balancer on this app
**extra_kwargs, # in case of roles=broker,[balancer,controller] on this app
)

else: # must be roles=balancer only then, only load with necessary balancer data
Expand Down Expand Up @@ -346,7 +354,11 @@ def default_auth(self) -> AuthMap:
def enabled_auth(self) -> list[AuthMap]:
"""The currently enabled auth.protocols and their auth.mechanisms, based on related applications."""
enabled_auth = []
if self.client_relations or self.runs_balancer or self.peer_cluster_orchestrator_relation:
if (
self.client_relations
or self.runs_balancer
or BALANCER.value in self.peer_cluster_orchestrator.roles
):
enabled_auth.append(self.default_auth)
if self.oauth_relation:
enabled_auth.append(AuthMap(self.default_auth.protocol, "OAUTHBEARER"))
Expand All @@ -356,6 +368,12 @@ def enabled_auth(self) -> list[AuthMap]:
return enabled_auth

@property
@retry(
wait=wait_fixed(5),
stop=stop_after_attempt(3),
retry=retry_if_exception_cause_type(LightKubeApiError),
reraise=True,
)
def bootstrap_servers_external(self) -> str:
"""Comma-delimited string of `bootstrap-server` for external access."""
return ",".join(
Expand Down Expand Up @@ -394,6 +412,40 @@ def bootstrap_server(self) -> str:
)
)

@property
def bootstrap_server_internal(self) -> str:
"""Comma-delimited string of `bootstrap-server` command flag for internal access.
Returns:
List of `bootstrap-server` servers
"""
if not self.peer_relation:
return ""

return ",".join(
sorted(
[
f"{broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.default_auth].internal}"
for broker in self.brokers
]
)
)

@property
def controller_quorum_uris(self) -> str:
"""The current controller quorum uris when running KRaft mode."""
# FIXME: when running broker node.id will be unit-id + 100. If unit is only running
# the controller node.id == unit-id. This way we can keep a human readable mapping of ids.
if self.runs_controller:
node_offset = KRAFT_NODE_ID_OFFSET if self.runs_broker else 0
return ",".join(
[
f"{broker.unit_id + node_offset}@{broker.internal_address}:{CONTROLLER_PORT}"
for broker in self.brokers
]
)
return ""

@property
def log_dirs(self) -> str:
"""Builds the necessary log.dirs based on mounted storage volumes.
Expand Down Expand Up @@ -446,7 +498,7 @@ def ready_to_start(self) -> Status: # noqa: C901
if not self.peer_relation:
return Status.NO_PEER_RELATION

for status in [self._broker_status, self._balancer_status]:
for status in [self._broker_status, self._balancer_status, self._controller_status]:
if status != Status.ACTIVE:
return status

Expand All @@ -461,29 +513,40 @@ def _balancer_status(self) -> Status:
if not self.peer_cluster_relation and not self.runs_broker:
return Status.NO_PEER_CLUSTER_RELATION

if not self.balancer.broker_connected:
if not self.peer_cluster.broker_connected:
return Status.NO_BROKER_DATA

if len(self.balancer.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS:
if len(self.peer_cluster.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS:
return Status.NOT_ENOUGH_BROKERS

return Status.ACTIVE

@property
def _broker_status(self) -> Status:
def _broker_status(self) -> Status: # noqa: C901
"""Checks for role=broker specific readiness."""
if not self.runs_broker:
return Status.ACTIVE

if not self.zookeeper:
return Status.ZK_NOT_RELATED
# Neither ZooKeeper or KRaft are active
if self.kraft_mode is None:
return Status.MISSING_MODE

if self.kraft_mode:
if not self.peer_cluster.controller_quorum_uris: # FIXME: peer_cluster or cluster?
return Status.NO_QUORUM_URIS
if not self.cluster.cluster_uuid:
return Status.NO_CLUSTER_UUID

if self.kraft_mode == False: # noqa: E712
if not self.zookeeper:
return Status.ZK_NOT_RELATED

if not self.zookeeper.zookeeper_connected:
return Status.ZK_NO_DATA
if not self.zookeeper.zookeeper_connected:
return Status.ZK_NO_DATA

# TLS must be enabled for Kafka and ZK or disabled for both
if self.cluster.tls_enabled ^ self.zookeeper.tls:
return Status.ZK_TLS_MISMATCH
# TLS must be enabled for Kafka and ZK or disabled for both
if self.cluster.tls_enabled ^ self.zookeeper.tls:
return Status.ZK_TLS_MISMATCH

if self.cluster.tls_enabled and not self.unit_broker.certificate:
return Status.NO_CERT
Expand All @@ -493,6 +556,37 @@ def _broker_status(self) -> Status:

return Status.ACTIVE

@property
def _controller_status(self) -> Status:
"""Checks for role=controller specific readiness."""
if not self.runs_controller:
return Status.ACTIVE

if not self.peer_cluster_relation and not self.runs_broker:
return Status.NO_PEER_CLUSTER_RELATION

if not self.peer_cluster.broker_connected_kraft_mode:
return Status.NO_BROKER_DATA

return Status.ACTIVE

@property
def kraft_mode(self) -> bool | None:
"""Is the deployment running in KRaft mode?
Returns:
True if Kraft mode, False if ZooKeeper, None when undefined.
"""
# NOTE: self.roles when running colocated, peer_cluster.roles when multiapp
if CONTROLLER.value in (self.roles + self.peer_cluster.roles):
return True
if self.zookeeper_relation:
return False

# FIXME raise instead of none. `not kraft_mode` is falsy
# NOTE: if previous checks are not met, we don't know yet how the charm is being deployed
return None

@property
def runs_balancer(self) -> bool:
"""Is the charm enabling the balancer?"""
Expand All @@ -502,3 +596,8 @@ def runs_balancer(self) -> bool:
def runs_broker(self) -> bool:
"""Is the charm enabling the broker(s)?"""
return BROKER.value in self.roles

@property
def runs_controller(self) -> bool:
"""Is the charm enabling the controller?"""
return CONTROLLER.value in self.roles
Loading

0 comments on commit 365078a

Please sign in to comment.