Skip to content

Commit

Permalink
support ZooKeeper 3.9.2 tls 1.3 (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored May 7, 2024
1 parent 474a5b4 commit 25214b4
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 32 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ jobs:
group: helm-tls
num_agents: 0
setup_helm: "true"
- name: Helm (TLS) Pulsar 3
group: helm-tls-pulsar3
num_agents: 0
setup_helm: "true"
- name: Helm Kafka (TLS)
group: helm-tls-kafka
num_agents: 0
Expand Down
7 changes: 7 additions & 0 deletions docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -14112,6 +14112,13 @@ Configuration for the rack auto configuration.
</tr>
</thead>
<tbody><tr>
<td><b>additionalZookeeperClientConfig</b></td>
<td>JSON</td>
<td>
Additional configuration for the zookeeper client.<br/>
</td>
<td>false</td>
</tr><tr>
<td><b>enabled</b></td>
<td>boolean</td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion helm/examples/kafka/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

kaap:
operator:
enabled: false
enabled: true
cluster:
create: true
spec:
Expand Down
3 changes: 3 additions & 0 deletions helm/kaap/crds/bookkeepers.kaap.oss.datastax.com-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,9 @@ spec:
description: Period for the schedule of the monitoring thread.
minimum: 1000.0
type: integer
additionalZookeeperClientConfig:
description: Additional configuration for the zookeeper client.
x-kubernetes-preserve-unknown-fields: true
enabled:
description: Enable rack configuration monitoring.
type: boolean
Expand Down
3 changes: 3 additions & 0 deletions helm/kaap/crds/pulsarclusters.kaap.oss.datastax.com-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15795,6 +15795,9 @@ spec:
description: Period for the schedule of the monitoring thread.
minimum: 1000.0
type: integer
additionalZookeeperClientConfig:
description: Additional configuration for the zookeeper client.
x-kubernetes-preserve-unknown-fields: true
enabled:
description: Enable rack configuration monitoring.
type: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.security.Security;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import javax.security.auth.x500.X500Principal;
Expand All @@ -48,16 +49,17 @@ public class ZkClientRackClient implements BkRackClient {
public static final String BOOKIES_PATH = "/bookies";
private final CuratorFramework zkClient;

public static ZkClientRackClient plainClient(String zkConnectString) {
return new ZkClientRackClient(zkConnectString, null, null, null);
public static ZkClientRackClient plainClient(String zkConnectString, Map<String, String> additionalZkClientConfig) {
return new ZkClientRackClient(zkConnectString, null, null, null, additionalZkClientConfig);
}
public static ZkClientRackClient sslClient(String zkConnectString, String privateKey, String serverCertificate,
String caCertificate) {
return new ZkClientRackClient(zkConnectString, privateKey, serverCertificate, caCertificate);
String caCertificate, Map<String, String> additionalZkClientConfig) {
return new ZkClientRackClient(zkConnectString, privateKey, serverCertificate, caCertificate, additionalZkClientConfig);
}

public ZkClientRackClient(String zkConnectString, String privateKey, String serverCertificate,
String caCertificate) {
String caCertificate,
Map<String, String> additionalZkClientConfig) {
final ZKClientConfig zkClientConfig = new ZKClientConfig();
if (privateKey != null) {
log.infof("Creating new zookeeper client for %s (ssl)", zkConnectString);
Expand All @@ -73,9 +75,14 @@ public ZkClientRackClient(String zkConnectString, String privateKey, String serv
zkClientConfig.setProperty("zookeeper.ssl.trustStore.location", trustStore.toFile().getAbsolutePath());
zkClientConfig.setProperty("zookeeper.ssl.trustStore.password", truststorePass);
zkClientConfig.setProperty("zookeeper.ssl.hostnameVerification", "true");
zkClientConfig.setProperty("zookeeper.ssl.protocol", "TLSv1.2");
zkClientConfig.setProperty("zookeeper.ssl.enabledProtocols", "TLSv1.3,TLSv1.2");
} else {
log.infof("Creating new zookeeper client for %s (plain)", zkConnectString);
}
if (additionalZkClientConfig != null) {
additionalZkClientConfig.forEach(zkClientConfig::setProperty);
}
this.zkClient = CuratorFrameworkFactory
.newClient(zkConnectString, 60_000, 15_000,
new RetryUntilElapsed(30_000, 5000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import lombok.extern.jbosslog.JBossLog;

@JBossLog
public class ZkClientRackClientFactory implements BkRackClientFactory{
public class ZkClientRackClientFactory implements BkRackClientFactory {

private final Map<String, ZkClientRackClient> zkClients = new ConcurrentHashMap<>();
private final KubernetesClient client;
Expand All @@ -40,7 +40,7 @@ public ZkClientRackClientFactory(KubernetesClient client) {

@Override
public BkRackClient newBkRackClient(String namespace, BookKeeperFullSpec newSpec,
BookKeeperAutoRackConfig autoRackConfig) {
BookKeeperAutoRackConfig autoRackConfig) {
final String zkConnectString = getZkServers(namespace, newSpec);

if (!autoRackConfig.getEnabled()) {
Expand All @@ -61,15 +61,15 @@ public BkRackClient newBkRackClient(String namespace, BookKeeperFullSpec newSpec

final ZkClientRackClient zkClient =
zkClients.computeIfAbsent(zkConnectString,
(k) -> newZkRackClient(k, newSpec.getGlobalSpec(), namespace));
(k) -> newZkRackClient(k, newSpec.getGlobalSpec(), namespace, autoRackConfig.getAdditionalZookeeperClientConfig()));
return zkClient;
}


private ZkClientRackClient newZkRackClient(String zkConnectString, GlobalSpec globalSpec, String namespace) {
private ZkClientRackClient newZkRackClient(String zkConnectString, GlobalSpec globalSpec, String namespace, Map<String, String> additionalZkClientConfig) {
final boolean tlsEnabledOnZooKeeper = BaseResourcesFactory.isTlsEnabledOnZooKeeper(globalSpec);
if (!tlsEnabledOnZooKeeper) {
return ZkClientRackClient.plainClient(zkConnectString);
return ZkClientRackClient.plainClient(zkConnectString, additionalZkClientConfig);
}
final String tlsSecretNameForZookeeper = BaseResourcesFactory.getTlsSecretNameForZookeeper(globalSpec);
final Secret secret = client.secrets()
Expand All @@ -89,7 +89,7 @@ private ZkClientRackClient newZkRackClient(String zkConnectString, GlobalSpec gl
if (caCert != null) {
caCert = new String(Base64.getDecoder().decode(caCert), StandardCharsets.UTF_8);
}
return ZkClientRackClient.sslClient(zkConnectString, privateKey, serverCert, caCert);
return ZkClientRackClient.sslClient(zkConnectString, privateKey, serverCert, caCert, additionalZkClientConfig);
}

protected String getZkServers(String namespace, BookKeeperFullSpec newSpec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ public void patchConfigMap() {
data.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
data.put("secureClientPort", "2281");
data.put("sslQuorum", "true");
// TLSv1.2 is backward compatible with ZK < 3.9.2
data.put("ssl.protocol", "TLSv1.2");
data.put("ssl.quorum.protocol", "TLSv1.2");
}
appendConfigData(data, spec.getConfig());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package com.datastax.oss.kaap.crds.bookkeeper;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import io.fabric8.crd.generator.annotation.SchemaFrom;
import io.fabric8.generator.annotation.Min;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -34,4 +37,8 @@ public class BookKeeperAutoRackConfig {
@javax.validation.constraints.Min(1000)
@JsonPropertyDescription("Period for the schedule of the monitoring thread.")
Long periodMs;
@JsonPropertyDescription("Additional configuration for the zookeeper client.")
@SchemaFrom(type = JsonNode.class)
Map<String, String> additionalZookeeperClientConfig;

}
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ public void testConfigTlsOnZookeeper() throws Exception {
expectedData.put("PULSAR_PREFIX_serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
expectedData.put("PULSAR_PREFIX_secureClientPort", "2281");
expectedData.put("PULSAR_PREFIX_sslQuorum", "true");
expectedData.put("PULSAR_PREFIX_ssl.protocol", "TLSv1.2");
expectedData.put("PULSAR_PREFIX_ssl.quorum.protocol", "TLSv1.2");

final Map<String, String> data = createdResource.getResource().getData();
Assert.assertEquals(data, expectedData);
Expand Down
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<kubernetes-client.version>6.3.1</kubernetes-client.version>
<testng.version>7.7.0</testng.version>
<mockito-core.version>4.10.0</mockito-core.version>
<lombok.version>1.18.24</lombok.version>
<lombok.version>1.18.32</lombok.version>
<spotbugs-maven-plugin.version>4.7.2.1</spotbugs-maven-plugin.version>
<spotbugs.version>4.7.2</spotbugs.version>
<maven-checkstyle-plugin.version>3.2.1</maven-checkstyle-plugin.version>
Expand All @@ -59,7 +59,8 @@
<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
<jsonassert.version>1.5.1</jsonassert.version>
<awaitility.version>4.2.0</awaitility.version>
<curator.version>5.4.0</curator.version>
<curator.version>5.6.0</curator.version>
<zookkeeper.version>3.9.2</zookkeeper.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -160,6 +161,11 @@
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookkeeper.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public abstract class BaseK8sEnvTest {
"datastax/kaap:latest-dev");

public static final String PULSAR_IMAGE = System.getProperty("kaap.tests.pulsar.image",
"docker.io/datastax/lunastreaming-all:2.10_3.4");
"docker.io/datastax/lunastreaming-all:2.10_5.1");

public static final String PULSAR3_IMAGE = System.getProperty("kaap.tests.pulsar.image",
"docker.io/datastax/lunastreaming-all:3.1_4.0");

public static final boolean USE_EXISTING_ENV = Boolean.getBoolean("kaap.tests.env.existing");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,15 @@ protected String specsToYaml(PulsarClusterSpec spec) {
}

protected PulsarClusterSpec getDefaultPulsarClusterSpecs() {
return getDefaultPulsarClusterSpecs(false);
}

protected PulsarClusterSpec getDefaultPulsarClusterSpecs(boolean pulsar3) {
final PulsarClusterSpec defaultSpecs = new PulsarClusterSpec();
defaultSpecs.setGlobal(GlobalSpec.builder()
.name(DEFAULT_PULSAR_CLUSTER_NAME)
.persistence(true)
.image(PULSAR_IMAGE)
.image(pulsar3 ? PULSAR3_IMAGE : PULSAR_IMAGE)
.imagePullPolicy("IfNotPresent")
.storage(GlobalSpec.GlobalStorageConfig.builder()
.existingStorageClassName(env.getStorageClass())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.kaap.tests.helm;

import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "helm-tls")
public class Pulsar2TlsTest extends TlsTest {

@Test
public void testPerComponents() throws Exception {
test(true, false);
}

@Test
public void testGlobal() throws Exception {
test(false, false);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.kaap.tests.helm;

import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "helm-tls-pulsar3")
public class Pulsar3TlsTest extends TlsTest {
@Test
public void testPerComponents() throws Exception {
test(true, true);
}

@Test
public void testGlobal() throws Exception {
test(false, true);

}
}
19 changes: 3 additions & 16 deletions tests/src/test/java/com/datastax/oss/kaap/tests/helm/TlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,11 @@
import com.datastax.oss.kaap.crds.cluster.PulsarClusterSpec;
import com.datastax.oss.kaap.crds.configs.tls.TlsConfig;
import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "helm-tls")
public class TlsTest extends BaseHelmTest {
public abstract class TlsTest extends BaseHelmTest {

@Test
public void testPerComponents() throws Exception {
test(true);
}

@Test
public void testGlobal() throws Exception {
test(false);

}

private void test(boolean perComponentCerts) throws Exception {
protected void test(boolean perComponentCerts, boolean pulsar3) throws Exception {
try {
applyCertManagerCRDs();
helmInstall(Chart.STACK, """
Expand All @@ -52,7 +39,7 @@ private void test(boolean perComponentCerts) throws Exception {
""".formatted(OPERATOR_IMAGE, namespace));
awaitOperatorRunning();

final PulsarClusterSpec specs = getDefaultPulsarClusterSpecs();
final PulsarClusterSpec specs = getDefaultPulsarClusterSpecs(pulsar3);
if (perComponentCerts) {
specs.getGlobal()
.setTls(TlsConfig.builder()
Expand Down

0 comments on commit 25214b4

Please sign in to comment.