Skip to content

Commit

Permalink
wip: use JVM embedded kafka connect cluster for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Oct 14, 2024
1 parent 0c1555f commit 26b8618
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 22 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,14 @@ lazy val kafka = (project in file("connectors/kafka"))
"org.junit.jupiter" % "junit-jupiter-engine" % "5.10.1" % Test,
"org.junit.platform" % "junit-platform-suite-api" % "1.10.3" % Test,
"org.junit.platform" % "junit-platform-suite-engine" % "1.10.3" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.10.1" % Test
"org.junit.vintage" % "junit-vintage-engine" % "5.10.1" % Test,
"org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % Test,
"org.apache.kafka" % "connect-runtime" % kafkaVersion,
"org.apache.kafka" % "connect-runtime" % kafkaVersion % Test classifier "test",
"org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion % Test,
"org.apache.kafka" %% "kafka" % kafkaVersion % Test,
"org.apache.kafka" %% "kafka" % kafkaVersion % Test classifier "test",
"org.apache.kafka" % "kafka-clients" % kafkaVersion % Test classifier "test",
),
dependencyOverrides += "com.github.luben" % "zstd-jni" % "1.5.6-3",
// Compile, patch and generated Iceberg JARs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
*/
package io.delta.kafka;

import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
Expand All @@ -33,17 +38,22 @@
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.awaitility.Awaitility;

public class KafkaConnectUtils {

private static final HttpClient HTTP = HttpClients.createDefault();
private static final boolean USE_EMBEDDED_CONNECT = true;

private static EmbeddedConnectCluster connectCluster;

// JavaBean-style for serialization
public static class Config {

private final String name;
private final Map<String, Object> config = Maps.newHashMap();
private final Map<String, String> config = Maps.newHashMap();

public Config(String name) {
this.name = name;
Expand All @@ -53,32 +63,64 @@ public String getName() {
return name;
}

public Map<String, Object> getConfig() {
public Map<String, String> getConfig() {
return config;
}

public Config config(String key, Object value) {
config.put(key, value);
config.put(key, value.toString());
return this;
}
}

public static void startConnector(Config config) {
try {
HttpPost request =
new HttpPost(String.format("http://localhost:%d/connectors", TestContext.CONNECT_PORT));
String body = TestContext.MAPPER.writeValueAsString(config);
request.setHeader("Content-Type", "application/json");
request.setEntity(new StringEntity(body));
HTTP.execute(
request,
response -> {
if (response.getCode() != HttpStatus.SC_CREATED) {
// throw new RuntimeException("Failed to start connector: " + response.getCode());
}
return null;
});
} catch (IOException e) {
if (USE_EMBEDDED_CONNECT) {
Map<String, String> workerProps = new HashMap<>();
// permit all Kafka client overrides; required for testing different consumer partition
// assignment strategies
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
workerProps.put(PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN.toString());

// setup Kafka broker properties
Properties brokerProps = new Properties();
brokerProps.put("auto.create.topics.enable", "false");
brokerProps.put("delete.topic.enable", "true");
brokerProps.put(PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN.toString());

connectCluster =
new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(2)
.workerProps(workerProps)
.brokerProps(brokerProps)
.build();
connectCluster.start();

connectCluster.configureConnector(config.getName(), config.getConfig());
connectCluster
.assertions()
.assertConnectorAndAtLeastNumTasksAreRunning(
config.name,
Integer.parseInt(config.getConfig().get("tasks.max")),
"Connector tasks did not start in time.");
} else {

HttpPost request =
new HttpPost(String.format("http://localhost:%d/connectors", TestContext.CONNECT_PORT));
String body = TestContext.MAPPER.writeValueAsString(config);
request.setHeader("Content-Type", "application/json");
request.setEntity(new StringEntity(body));
HTTP.execute(
request,
response -> {
if (response.getCode() != HttpStatus.SC_CREATED) {
throw new RuntimeException("Failed to start connector: " + response.getCode());
}
return null;
});
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -111,10 +153,16 @@ public static void ensureConnectorRunning(String name) {

public static void stopConnector(String name) {
try {
HttpDelete request =
new HttpDelete(
String.format("http://localhost:%d/connectors/%s", TestContext.CONNECT_PORT, name));
HTTP.execute(request, response -> null);
if (USE_EMBEDDED_CONNECT) {
if (connectCluster != null) {
connectCluster.stopConnector(name);
}
} else {
HttpDelete request =
new HttpDelete(
String.format("http://localhost:%d/connectors/%s", TestContext.CONNECT_PORT, name));
HTTP.execute(request, response -> null);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 26b8618

Please sign in to comment.