From f4d9336d96fc7e4aa47a5afb112b5dbf8d5ade6e Mon Sep 17 00:00:00 2001 From: Iman Enami <44609233+imanenami@users.noreply.github.com> Date: Tue, 26 Nov 2024 09:22:50 +0400 Subject: [PATCH] [DPE-5553] feat: Don't restart server on keystore/truststore updates (#153) Co-authored-by: Iman Enami --- src/core/cluster.py | 19 ++++++ src/events/tls.py | 10 ++-- src/managers/tls.py | 17 ++++++ tests/integration/helpers.py | 46 +++++++++++++++ tests/integration/test_tls.py | 108 ++++++++++++++++++++++++++++++++++ 5 files changed, 196 insertions(+), 4 deletions(-) diff --git a/src/core/cluster.py b/src/core/cluster.py index f10ee7dd..0c067a2c 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -412,6 +412,25 @@ def bootstrap_server(self) -> str: ) ) + @property + def bootstrap_server_internal(self) -> str: + """Comma-delimited string of `bootstrap-server` command flag for internal access. + + Returns: + List of `bootstrap-server` servers + """ + if not self.peer_relation: + return "" + + return ",".join( + sorted( + [ + f"{broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.default_auth].internal}" + for broker in self.brokers + ] + ) + ) + @property def controller_quorum_uris(self) -> str: """The current controller quorum uris when running KRaft mode.""" diff --git a/src/events/tls.py b/src/events/tls.py index 9970b5e3..9c564161 100644 --- a/src/events/tls.py +++ b/src/events/tls.py @@ -136,8 +136,10 @@ def _trusted_relation_created(self, event: EventBase) -> None: event.defer() return - # Create a "mtls" flag so a new listener (CLIENT_SSL) is created - self.charm.state.cluster.update({"mtls": "enabled"}) + if not self.charm.state.cluster.mtls_enabled: + # Create a "mtls" flag so a new listener (CLIENT_SSL) is created + self.charm.state.cluster.update({"mtls": "enabled"}) + self.charm.on.config_changed.emit() def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None: """Generate a CSR so the tls-certificates operator works as expected.""" @@ -208,8 +210,8 @@ def _trusted_relation_changed(self, event: RelationChangedEvent) -> None: ) self.charm.broker.tls_manager.import_cert(alias=f"{alias}", filename=filename) - # ensuring new config gets applied - self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit() + # Live reload the truststore + self.charm.broker.tls_manager.reload_truststore() def _trusted_relation_broken(self, event: RelationBrokenEvent) -> None: """Handle relation broken for a trusted certificate/ca relation.""" diff --git a/src/managers/tls.py b/src/managers/tls.py index fd937a1c..3ff90055 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -206,3 +206,20 @@ def remove_stores(self) -> None: except (subprocess.CalledProcessError, ExecError) as e: logger.error(e.stdout) raise e + + def reload_truststore(self) -> None: + """Reloads the truststore using `kafka-configs` utility without restarting the broker.""" + bin_args = [ + f"--command-config {self.workload.paths.client_properties}", + f"--bootstrap-server {self.state.bootstrap_server_internal}", + "--entity-type brokers", + f"--entity-name {self.state.unit_broker.unit_id}", + "--alter", + f"--add-config listener.name.CLIENT_SSL_SSL.ssl.truststore.location={self.workload.paths.truststore}", + ] + + logger.info("Reloading truststore") + self.workload.run_bin_command( + bin_keyword="configs", + bin_args=bin_args, + ) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index b2e30bdc..fcd9bbcd 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -315,6 +315,25 @@ async def set_mtls_client_acls(ops_test: OpsTest, bootstrap_server: str) -> str: return result +async def create_test_topic(ops_test: OpsTest, bootstrap_server: str) -> str: + """Creates `test` topic and adds ACLs for principal `User:*`.""" + _ = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju ssh --container kafka kafka-k8s/0 '{BROKER.paths['BIN']}/bin/kafka-topics.sh --bootstrap-server {bootstrap_server} --command-config {BROKER.paths['CONF']}/client.properties -create -topic test'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + result = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju ssh --container kafka kafka-k8s/0 '{BROKER.paths['BIN']}/bin/kafka-acls.sh --bootstrap-server {bootstrap_server} --add --allow-principal=User:* --operation READ --operation WRITE --operation CREATE --topic test --command-config {BROKER.paths['CONF']}/client.properties'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return result + + def count_lines_with(ops_test: OpsTest, unit: str, file: str, pattern: str) -> int: result = check_output( f"JUJU_MODEL={ops_test.model_full_name} juju ssh --container kafka {unit} 'grep \"{pattern}\" {file} | wc -l'", @@ -352,6 +371,33 @@ def get_secret_by_label(ops_test: OpsTest, label: str, owner: str) -> dict[str, return secret_data[secret_id]["content"]["Data"] +def search_secrets(ops_test: OpsTest, owner: str, search_key: str) -> str: + secrets_meta_raw = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju list-secrets --format json", + stderr=PIPE, + shell=True, + universal_newlines=True, + ).strip() + secrets_meta = json.loads(secrets_meta_raw) + + for secret_id in secrets_meta: + if owner and not secrets_meta[secret_id]["owner"] == owner: + continue + + secrets_data_raw = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju show-secret --format json --reveal {secret_id}", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + secret_data = json.loads(secrets_data_raw) + if search_key in secret_data[secret_id]["content"]["Data"]: + return secret_data[secret_id]["content"]["Data"][search_key] + + return "" + + def show_unit(ops_test: OpsTest, unit_name: str) -> Any: result = check_output( f"JUJU_MODEL={ops_test.model_full_name} juju show-unit {unit_name}", diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index a273584e..b4708856 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -4,8 +4,12 @@ import asyncio import base64 +import json import logging +import os +import tempfile +import kafka import pytest from charms.tls_certificates_interface.v1.tls_certificates import generate_private_key from pytest_operator.plugin import OpsTest @@ -18,6 +22,7 @@ REL_NAME_ADMIN, ZK_NAME, check_tls, + create_test_topic, delete_pod, extract_ca, extract_private_key, @@ -25,6 +30,7 @@ get_address, get_kafka_zk_relation_data, get_mtls_nodeport, + search_secrets, set_mtls_client_acls, set_tls_private_key, ) @@ -33,6 +39,7 @@ TLS_NAME = "self-signed-certificates" CERTS_NAME = "tls-certificates-operator" +TLS_REQUIRER = "tls-certificates-requirer" MTLS_NAME = "mtls" DUMMY_NAME = "app" @@ -250,6 +257,107 @@ async def test_mtls(ops_test: OpsTest): assert max_offset == str(num_messages) +@pytest.mark.abort_on_fail +async def test_truststore_live_reload(ops_test: OpsTest): + """Tests truststore live reload functionality using kafka-python client.""" + requirer = "other-req/0" + test_msg = {"test": 123456} + + await ops_test.model.deploy( + TLS_NAME, channel="stable", application_name="other-ca", revision=155 + ) + await ops_test.model.deploy( + TLS_REQUIRER, channel="stable", application_name="other-req", revision=102 + ) + + await ops_test.model.add_relation("other-ca", "other-req") + + await ops_test.model.wait_for_idle( + apps=["other-ca", "other-req"], idle_period=60, timeout=2000, status="active" + ) + + # retrieve required certificates and private key from secrets + local_store = { + "private_key": search_secrets(ops_test=ops_test, owner=requirer, search_key="private-key"), + "cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="certificate"), + "ca_cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="ca-certificate"), + "broker_ca": search_secrets( + ops_test=ops_test, owner=f"{APP_NAME}/0", search_key="ca-cert" + ), + } + + certs_operator_config = { + "generate-self-signed-certificates": "false", + "certificate": base64.b64encode(local_store["cert"].encode("utf-8")).decode("utf-8"), + "ca-certificate": base64.b64encode(local_store["ca_cert"].encode("utf-8")).decode("utf-8"), + } + + await ops_test.model.deploy( + CERTS_NAME, + channel="stable", + series="jammy", + application_name="other-op", + config=certs_operator_config, + ) + + await ops_test.model.wait_for_idle( + apps=["other-op"], idle_period=60, timeout=2000, status="active" + ) + + # We don't expect a broker restart here because of truststore live reload + await ops_test.model.add_relation(f"{APP_NAME}:{TRUSTED_CERTIFICATE_RELATION}", "other-op") + + await ops_test.model.wait_for_idle( + apps=["other-op", APP_NAME], idle_period=60, timeout=2000, status="active" + ) + + address = await get_address(ops_test, app_name=APP_NAME, unit_num=0) + sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client + sasl_bootstrap_server = f"{address}:{sasl_port}" + ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].external + ssl_bootstrap_server = f"{address}:{ssl_port}" + + # create `test` topic and set ACLs + await create_test_topic(ops_test, bootstrap_server=sasl_bootstrap_server) + + # quickly test the producer and consumer side authentication & authorization + tmp_dir = tempfile.TemporaryDirectory() + tmp_paths = {} + for key, content in local_store.items(): + tmp_paths[key] = os.path.join(tmp_dir.name, key) + with open(tmp_paths[key], "w", encoding="utf-8") as f: + f.write(content) + + client_config = { + "bootstrap_servers": ssl_bootstrap_server, + "security_protocol": "SSL", + "api_version": (0, 10), + "ssl_cafile": tmp_paths["broker_ca"], + "ssl_certfile": tmp_paths["cert"], + "ssl_keyfile": tmp_paths["private_key"], + "ssl_check_hostname": False, + } + + producer = kafka.KafkaProducer( + **client_config, + value_serializer=lambda v: json.dumps(v).encode("utf-8"), + ) + + producer.send("test", test_msg) + + consumer = kafka.KafkaConsumer("test", **client_config, auto_offset_reset="earliest") + + msg = next(consumer) + + assert json.loads(msg.value) == test_msg + + # cleanup + await ops_test.model.remove_application("other-ca", block_until_done=True) + await ops_test.model.remove_application("other-op", block_until_done=True) + await ops_test.model.remove_application("other-req", block_until_done=True) + tmp_dir.cleanup() + + @pytest.mark.abort_on_fail async def test_mtls_broken(ops_test: OpsTest): await ops_test.model.remove_application(MTLS_NAME, block_until_done=True)