diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7fd3ca7e7..fa50f4e39 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -155,13 +155,13 @@ jobs:
uname -m
./dev/prepare-minikube-for-e2e-tests.sh
./mvnw install -pl langstream-e2e-tests -am -DskipTests
- ./mvnw verify -pl langstream-e2e-tests -De2eTests -DexcludedGroups="needs-credentials"
+ ./mvnw verify -pl langstream-e2e-tests -De2eTests -DexcludedGroups="needs-credentials" -Dlangstream.tests.recycleenv=true
- name: Upload Surefire reports
uses: actions/upload-artifact@v3
if: failure()
continue-on-error: true
with:
- name: test-logs-${{ matrix.group }}
+ name: test-logs-e2e
path: "**/target/e2e-test-logs/*"
retention-days: 7
diff --git a/langstream-e2e-tests/pom.xml b/langstream-e2e-tests/pom.xml
index 6a499d1de..7a280d2ec 100644
--- a/langstream-e2e-tests/pom.xml
+++ b/langstream-e2e-tests/pom.xml
@@ -52,6 +52,17 @@
${project.version}
test
+
+ ${project.groupId}
+ langstream-core
+ ${project.version}
+ test
+
+
+ org.apache.kafka
+ kafka-clients
+ test
+
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java
index 9cf7bf43e..edb893a22 100644
--- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java
@@ -18,6 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import ai.langstream.tests.util.BaseEndToEndTest;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java
index 6805d072a..fb9c61863 100644
--- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java
@@ -15,6 +15,7 @@
*/
package ai.langstream.tests;
+import ai.langstream.tests.util.BaseEndToEndTest;
import ai.langstream.tests.util.ConsumeGatewayMessage;
import java.util.List;
import java.util.Map;
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java
index 3cd70f6ed..a1b8b5f67 100644
--- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java
@@ -17,6 +17,7 @@
import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource;
import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource;
+import ai.langstream.tests.util.BaseEndToEndTest;
import io.fabric8.kubernetes.api.model.Secret;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -96,7 +97,7 @@ public void test(String appDir) {
.size());
});
- final List topics = getAllTopicsFromKafka();
- Assertions.assertEquals(List.of("TEST_TOPIC_0"), topics);
+ final List topics = getAllTopics();
+ Assertions.assertEquals(List.of("ls-test-topic0"), topics);
}
}
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java
index 2e88e905f..880efbbfc 100644
--- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java
@@ -15,6 +15,7 @@
*/
package ai.langstream.tests;
+import ai.langstream.tests.util.BaseEndToEndTest;
import ai.langstream.tests.util.ConsumeGatewayMessage;
import java.util.List;
import java.util.Map;
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java
similarity index 80%
rename from langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java
rename to langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java
index 7129318cd..cdcf74995 100644
--- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java
@@ -13,22 +13,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package ai.langstream.tests;
+package ai.langstream.tests.util;
+import ai.langstream.api.model.StreamingCluster;
import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource;
import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource;
-import ai.langstream.tests.util.ConsumeGatewayMessage;
-import com.dajudge.kindcontainer.K3sContainer;
-import com.dajudge.kindcontainer.K3sContainerVersion;
-import com.dajudge.kindcontainer.KubernetesImageSpec;
-import com.dajudge.kindcontainer.helm.Helm3Container;
+import ai.langstream.impl.parser.ModelBuilder;
+import ai.langstream.tests.util.k8s.LocalK3sContainer;
+import ai.langstream.tests.util.k8s.RunningHostCluster;
+import ai.langstream.tests.util.kafka.LocalRedPandaClusterProvider;
+import ai.langstream.tests.util.kafka.RemoteKafkaProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -46,8 +49,6 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -72,120 +73,31 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestWatcher;
-import org.testcontainers.DockerClientFactory;
-import org.testcontainers.containers.BindMode;
@Slf4j
public class BaseEndToEndTest implements TestWatcher {
+ public static final String TOPICS_PREFIX = "ls-test-";
+
public static final String CATEGORY_NEEDS_CREDENTIALS = "needs-credentials";
+ private static final boolean LANGSTREAM_RECYCLE_ENV =
+ Boolean.parseBoolean(System.getProperty("langstream.tests.recycleenv", "false"));
+
private static final String LANGSTREAM_TAG =
System.getProperty("langstream.tests.tag", "latest-dev");
- private static final boolean REUSE_EXISTING_REDPANDA =
- Boolean.parseBoolean(System.getProperty("langstream.tests.reuseRedPanda", "false"));
+ private static final String LANGSTREAM_K8S = System.getProperty("langstream.tests.k8s", "host");
+ private static final String LANGSTREAM_STREAMING =
+ System.getProperty("langstream.tests.streaming", "local-redpanda");
public static final File TEST_LOGS_DIR = new File("target", "e2e-test-logs");
protected static final String TENANT_NAMESPACE_PREFIX = "ls-tenant-";
protected static final ObjectMapper JSON_MAPPER = new ObjectMapper();
protected static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory());
- protected static final String KAFKA_NAMESPACE = "kafka-ns";
-
- interface KubeServer {
- void start();
-
- void ensureImage(String image);
-
- void stop();
-
- String getKubeConfig();
- }
-
- static class RunningHostCluster implements KubeServer {
- @Override
- public void start() {}
-
- @Override
- public void ensureImage(String image) {}
-
- @Override
- public void stop() {}
-
- @Override
- @SneakyThrows
- public String getKubeConfig() {
- final String kubeConfig = Config.getKubeconfigFilename();
- return Files.readString(Paths.get(kubeConfig), StandardCharsets.UTF_8);
- }
- }
-
- static class LocalK3sContainer implements KubeServer {
-
- K3sContainer container;
- final Path basePath = Paths.get("/tmp", "ls-tests-image");
-
- @Override
- public void start() {
- container =
- new K3sContainer(
- new KubernetesImageSpec<>(K3sContainerVersion.VERSION_1_25_0)
- .withImage("rancher/k3s:v1.25.3-k3s1"));
- container.withFileSystemBind(
- basePath.toFile().getAbsolutePath(), "/images", BindMode.READ_WRITE);
- // container.withNetwork(network);
- container.start();
- }
-
- @Override
- public void ensureImage(String image) {
- loadImage(basePath, image);
- }
-
- @SneakyThrows
- private void loadImage(Path basePath, String image) {
- final String id =
- DockerClientFactory.lazyClient()
- .inspectImageCmd(image)
- .exec()
- .getId()
- .replace("sha256:", "");
-
- final Path hostPath = basePath.resolve(id);
- if (!hostPath.toFile().exists()) {
- log.info("Saving image {} locally", image);
- final InputStream in = DockerClientFactory.lazyClient().saveImageCmd(image).exec();
-
- try (final OutputStream outputStream = Files.newOutputStream(hostPath)) {
- in.transferTo(outputStream);
- } catch (Exception ex) {
- hostPath.toFile().delete();
- throw ex;
- }
- }
-
- log.info("Loading image {} in the k3s container", image);
- if (container.execInContainer("ctr", "images", "import", "/images/" + id).getExitCode()
- != 0) {
- throw new RuntimeException("Failed to load image " + image);
- }
- }
-
- @Override
- public void stop() {
- if (container != null) {
- container.stop();
- }
- }
-
- @Override
- public String getKubeConfig() {
- return container.getKubeconfig();
- }
- }
-
- protected static KubeServer kubeServer;
- protected static Helm3Container helm3Container;
+ protected static KubeCluster kubeCluster;
+ protected static StreamingClusterProvider streamingClusterProvider;
+ protected static File instanceFile;
protected static KubernetesClient client;
protected static String namespace;
@@ -289,11 +201,11 @@ protected static ConsumeGatewayMessage consumeOneMessageFromGateway(
return ConsumeGatewayMessage.readValue(secondLine);
}
- protected static void runProcess(String[] allArgs) throws InterruptedException, IOException {
+ public static void runProcess(String[] allArgs) throws InterruptedException, IOException {
runProcess(allArgs, false);
}
- private static void runProcess(String[] allArgs, boolean allowFailures)
+ public static void runProcess(String[] allArgs, boolean allowFailures)
throws InterruptedException, IOException {
ProcessBuilder processBuilder =
new ProcessBuilder(allArgs)
@@ -430,73 +342,78 @@ public static void closeQuietly(Closeable c) {
@SneakyThrows
public static void destroy() {
cleanupAllEndToEndTestsNamespaces();
- if (client != null) {
- client.close();
- }
- if (helm3Container != null) {
- helm3Container.close();
- }
- if (kubeServer != null) {
- kubeServer.stop();
+ if (!LANGSTREAM_RECYCLE_ENV) {
+ if (streamingClusterProvider != null) {
+ streamingClusterProvider.stop();
+ streamingClusterProvider = null;
+ }
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ if (kubeCluster != null) {
+ kubeCluster.stop();
+ kubeCluster = null;
+ }
}
}
@BeforeAll
@SneakyThrows
public static void setup() {
+ if (kubeCluster == null) {
+ kubeCluster = getKubeCluster();
+ kubeCluster.start();
+ }
- // kubeServer = new LocalK3sContainer();
- kubeServer = new RunningHostCluster();
- kubeServer.start();
+ if (client == null) {
+ client =
+ new KubernetesClientBuilder()
+ .withConfig(Config.fromKubeconfig(kubeCluster.getKubeConfig()))
+ .build();
+ }
- client =
- new KubernetesClientBuilder()
- .withConfig(Config.fromKubeconfig(kubeServer.getKubeConfig()))
- .build();
+ if (streamingClusterProvider == null) {
+ streamingClusterProvider = getStreamingClusterProvider();
+ }
try {
final Path tempFile = Files.createTempFile("ls-test-kube", ".yaml");
- Files.writeString(tempFile, kubeServer.getKubeConfig());
+ Files.writeString(tempFile, kubeCluster.getKubeConfig());
System.out.println(
"To inspect the container\nKUBECONFIG="
+ tempFile.toFile().getAbsolutePath()
+ " k9s");
- final CompletableFuture kafkaFuture =
- CompletableFuture.runAsync(BaseEndToEndTest::installKafka);
+ final CompletableFuture streamingClusterFuture =
+ CompletableFuture.supplyAsync(() -> streamingClusterProvider.start());
final CompletableFuture minioFuture =
CompletableFuture.runAsync(BaseEndToEndTest::installMinio);
List> imagesFutures = new ArrayList<>();
- final String baseImageRepository =
- LANGSTREAM_TAG.equals("latest-dev") ? "langstream" : "ghcr.io/langstream";
- final String imagePullPolicy =
- LANGSTREAM_TAG.equals("latest-dev") ? "Never" : "IfNotPresent";
-
imagesFutures.add(
CompletableFuture.runAsync(
() ->
- kubeServer.ensureImage(
+ kubeCluster.ensureImage(
"langstream/langstream-control-plane:latest-dev")));
imagesFutures.add(
CompletableFuture.runAsync(
() ->
- kubeServer.ensureImage(
+ kubeCluster.ensureImage(
"langstream/langstream-deployer:latest-dev")));
imagesFutures.add(
CompletableFuture.runAsync(
() ->
- kubeServer.ensureImage(
+ kubeCluster.ensureImage(
"langstream/langstream-runtime:latest-dev")));
imagesFutures.add(
CompletableFuture.runAsync(
() ->
- kubeServer.ensureImage(
+ kubeCluster.ensureImage(
"langstream/langstream-api-gateway:latest-dev")));
CompletableFuture.allOf(
- kafkaFuture,
minioFuture,
imagesFutures.get(0),
imagesFutures.get(1),
@@ -504,12 +421,49 @@ public static void setup() {
imagesFutures.get(3))
.join();
+ final StreamingCluster streamingCluster = streamingClusterFuture.join();
+
+ final Map> instanceContent =
+ Map.of(
+ "instance",
+ Map.of(
+ "streamingCluster",
+ streamingCluster,
+ "computeCluster",
+ Map.of("type", "kubernetes")));
+
+ instanceFile = Files.createTempFile("ls-test", ".yaml").toFile();
+ YAML_MAPPER.writeValue(instanceFile, instanceContent);
+
} catch (Throwable ee) {
dumpTest("BeforeAll");
throw ee;
}
}
+ private static StreamingClusterProvider getStreamingClusterProvider() {
+ switch (LANGSTREAM_STREAMING) {
+ case "local-redpanda":
+ return new LocalRedPandaClusterProvider(client);
+ case "remote-kafka":
+ return new RemoteKafkaProvider();
+ default:
+ throw new IllegalArgumentException(
+ "Unknown LANGSTREAM_STREAMING: " + LANGSTREAM_STREAMING);
+ }
+ }
+
+ private static KubeCluster getKubeCluster() {
+ switch (LANGSTREAM_K8S) {
+ case "k3s":
+ return new LocalK3sContainer();
+ case "host":
+ return new RunningHostCluster();
+ default:
+ throw new IllegalArgumentException("Unknown LANGSTREAM_K8S: " + LANGSTREAM_K8S);
+ }
+ }
+
@BeforeEach
@SneakyThrows
public void setupSingleTest() {
@@ -530,14 +484,47 @@ public void setupSingleTest() {
@AfterEach
public void cleanupAfterEach() {
cleanupAllEndToEndTestsNamespaces();
- execInKafkaPod("rpk topic delete -r \".*\"");
+ streamingClusterProvider.cleanup();
}
private static void cleanupAllEndToEndTestsNamespaces() {
client.namespaces().withLabel("app", "ls-test").delete();
client.namespaces().list().getItems().stream()
- .filter(ns -> ns.getMetadata().getName().startsWith(TENANT_NAMESPACE_PREFIX))
- .forEach(ns -> client.namespaces().withName(ns.getMetadata().getName()).delete());
+ .map(ns -> ns.getMetadata().getName())
+ .filter(ns -> ns.startsWith(TENANT_NAMESPACE_PREFIX))
+ .forEach(ns -> deleteTenantNamespace(ns));
+ }
+
+ private static void deleteTenantNamespace(String ns) {
+ try {
+ final List agents =
+ client.resources(AgentCustomResource.class).inNamespace(ns).list().getItems();
+ for (AgentCustomResource agent : agents) {
+ agent.getMetadata().setFinalizers(List.of());
+ client.resource(agent).inNamespace(ns).serverSideApply();
+ client.resources(AgentCustomResource.class)
+ .inNamespace(ns)
+ .withName(agent.getMetadata().getNamespace())
+ .delete();
+ }
+ final List apps =
+ client.resources(ApplicationCustomResource.class)
+ .inNamespace(ns)
+ .list()
+ .getItems();
+
+ for (ApplicationCustomResource app : apps) {
+ app.getMetadata().setFinalizers(List.of());
+ client.resource(app).inNamespace(ns).serverSideApply();
+ client.resources(ApplicationCustomResource.class)
+ .inNamespace(ns)
+ .withName(app.getMetadata().getNamespace())
+ .delete();
+ }
+ } catch (Throwable tt) {
+ log.error("Error deleting tenant namespace: " + ns, tt);
+ }
+ client.namespaces().withName(ns).delete();
}
@SneakyThrows
@@ -696,37 +683,6 @@ private static void awaitApiGatewayReady() {
log.info("api gateway ready");
}
- @SneakyThrows
- private static void installKafka() {
- log.info("installing kafka");
- client.resource(
- new NamespaceBuilder()
- .withNewMetadata()
- .withName(KAFKA_NAMESPACE)
- .endMetadata()
- .build())
- .serverSideApply();
-
- if (!REUSE_EXISTING_REDPANDA) {
- runProcess("helm delete redpanda --namespace kafka-ns".split(" "), true);
- }
-
- runProcess("helm repo add redpanda https://charts.redpanda.com/".split(" "), true);
- runProcess("helm repo update".split(" "));
- // ref https://github.com/redpanda-data/helm-charts/blob/main/charts/redpanda/values.yaml
- runProcess(
- ("helm upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3"
- + " --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console"
- + ".enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc"
- + ".cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true")
- .split(" "));
- log.info("waiting kafka to be ready");
- runProcess(
- "kubectl wait pods redpanda-0 --for=condition=Ready --timeout=5m -n kafka-ns"
- .split(" "));
- log.info("kafka installed");
- }
-
static void installMinio() {
applyManifestNoNamespace(
"""
@@ -809,25 +765,35 @@ protected static void withPodLogs(
BiConsumer consumer) {
if (podName != null) {
try {
- client.pods()
- .inNamespace(namespace)
- .withName(podName)
- .get()
- .getSpec()
- .getContainers()
- .forEach(
- container -> {
- final ContainerResource containerResource =
- client.pods()
- .inNamespace(namespace)
- .withName(podName)
- .inContainer(container.getName());
- if (tailingLines > 0) {
- containerResource.tailingLines(tailingLines);
- }
- final String containerLog = containerResource.getLog();
- consumer.accept(container.getName(), containerLog);
- });
+ final PodSpec podSpec =
+ client.pods().inNamespace(namespace).withName(podName).get().getSpec();
+ List all = new ArrayList<>();
+ final List containers = podSpec.getContainers();
+ all.addAll(containers);
+ final List init = podSpec.getInitContainers();
+ if (init != null) {
+ all.addAll(init);
+ }
+ for (Container container : all) {
+ try {
+ final ContainerResource containerResource =
+ client.pods()
+ .inNamespace(namespace)
+ .withName(podName)
+ .inContainer(container.getName());
+ if (tailingLines > 0) {
+ containerResource.tailingLines(tailingLines);
+ }
+ final String containerLog = containerResource.getLog();
+ consumer.accept(container.getName(), containerLog);
+ } catch (Throwable t) {
+ log.error(
+ "failed to get pod {} container {} logs: {}",
+ podName,
+ container.getName(),
+ t.getMessage());
+ }
+ }
} catch (Throwable t) {
log.error("failed to get pod {} logs: {}", podName, t.getMessage());
}
@@ -969,29 +935,8 @@ protected static void dumpEvents(String filePrefix) {
}
@SneakyThrows
- protected static List getAllTopicsFromKafka() {
- final String result = execInKafkaPod("rpk topic list");
- if (result == null) {
- throw new IllegalStateException("failed to get topics from kafka");
- }
-
- final List topics = new ArrayList<>();
- final List lines = result.lines().collect(Collectors.toList());
- boolean first = true;
- for (String line : lines) {
- if (first) {
- first = false;
- continue;
- }
- topics.add(line.split(" ")[0]);
- }
- return topics;
- }
-
- @SneakyThrows
- private static String execInKafkaPod(String cmd) {
- return execInPodInNamespace(KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" "))
- .get(1, TimeUnit.MINUTES);
+ protected static List getAllTopics() {
+ return streamingClusterProvider.getTopics();
}
protected static void setupTenant(String tenant) {
@@ -1044,17 +989,19 @@ protected static boolean isApplicationReady(
expectedRunningTotalExecutors + "/" + expectedRunningTotalExecutors);
}
+ @SneakyThrows
protected static void deployLocalApplication(
- String applicationId, String appDir, Map env) {
+ String applicationId, String appDirName, Map env) {
String testAppsBaseDir = "src/test/resources/apps";
- String testInstanceBaseDir = "src/test/resources/instances";
String testSecretBaseDir = "src/test/resources/secrets";
- copyFileToClientContainer(Paths.get(testAppsBaseDir, appDir).toFile(), "/tmp/app");
- copyFileToClientContainer(
- Paths.get(testInstanceBaseDir, "kafka-kubernetes.yaml").toFile(),
- "/tmp/instance.yaml");
- copyFileToClientContainer(
- Paths.get(testSecretBaseDir, "secret1.yaml").toFile(), "/tmp/secrets.yaml");
+
+ final File appDir = Paths.get(testAppsBaseDir, appDirName).toFile();
+ final File secretFile = Paths.get(testSecretBaseDir, "secret1.yaml").toFile();
+ validateApp(appDir, secretFile);
+ copyFileToClientContainer(appDir, "/tmp/app");
+ copyFileToClientContainer(instanceFile, "/tmp/instance.yaml");
+
+ copyFileToClientContainer(secretFile, "/tmp/secrets.yaml");
String beforeCmd = "";
if (env != null) {
@@ -1072,6 +1019,32 @@ protected static void deployLocalApplication(
.split(" "));
}
+ private static void validateApp(File appDir, File secretFile) throws Exception {
+ final ModelBuilder.ApplicationWithPackageInfo model =
+ ModelBuilder.buildApplicationInstance(
+ List.of(appDir.toPath()),
+ Files.readString(instanceFile.toPath()),
+ Files.readString(secretFile.toPath()));
+ model.getApplication()
+ .getModules()
+ .values()
+ .forEach(
+ m -> {
+ m.getTopics()
+ .keySet()
+ .forEach(
+ t -> {
+ if (!t.startsWith(TOPICS_PREFIX)) {
+ throw new IllegalStateException(
+ "All topics must start with "
+ + TOPICS_PREFIX
+ + ". Found "
+ + t);
+ }
+ });
+ });
+ }
+
protected static Map getAppEnvMapFromSystem(List names) {
final Map result = new HashMap<>();
for (String name : names) {
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/KubeCluster.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/KubeCluster.java
new file mode 100644
index 000000000..01ba03c74
--- /dev/null
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/KubeCluster.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.tests.util;
+
+public interface KubeCluster {
+ void start();
+
+ void ensureImage(String image);
+
+ void stop();
+
+ String getKubeConfig();
+}
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/StreamingClusterProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/StreamingClusterProvider.java
new file mode 100644
index 000000000..34c275a70
--- /dev/null
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/StreamingClusterProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.tests.util;
+
+import ai.langstream.api.model.StreamingCluster;
+import java.util.List;
+
+public interface StreamingClusterProvider {
+
+ StreamingCluster start();
+
+ void cleanup();
+
+ void stop();
+
+ List getTopics();
+}
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/LocalK3sContainer.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/LocalK3sContainer.java
new file mode 100644
index 000000000..71ca7bb64
--- /dev/null
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/LocalK3sContainer.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.tests.util.k8s;
+
+import ai.langstream.tests.util.KubeCluster;
+import com.dajudge.kindcontainer.K3sContainer;
+import com.dajudge.kindcontainer.K3sContainerVersion;
+import com.dajudge.kindcontainer.KubernetesImageSpec;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+
+@Slf4j
+public class LocalK3sContainer implements KubeCluster {
+
+ K3sContainer container;
+ final Path basePath = Paths.get("/tmp", "ls-tests-image");
+
+ @Override
+ public void start() {
+ container =
+ new K3sContainer(
+ new KubernetesImageSpec<>(K3sContainerVersion.VERSION_1_25_0)
+ .withImage("rancher/k3s:v1.25.3-k3s1"));
+ container.withFileSystemBind(
+ basePath.toFile().getAbsolutePath(), "/images", BindMode.READ_WRITE);
+ // container.withNetwork(network);
+ container.start();
+ }
+
+ @Override
+ public void ensureImage(String image) {
+ loadImage(basePath, image);
+ }
+
+ @SneakyThrows
+ private void loadImage(Path basePath, String image) {
+ final String id =
+ DockerClientFactory.lazyClient()
+ .inspectImageCmd(image)
+ .exec()
+ .getId()
+ .replace("sha256:", "");
+
+ final Path hostPath = basePath.resolve(id);
+ if (!hostPath.toFile().exists()) {
+ log.info("Saving image {} locally", image);
+ final InputStream in = DockerClientFactory.lazyClient().saveImageCmd(image).exec();
+
+ try (final OutputStream outputStream = Files.newOutputStream(hostPath)) {
+ in.transferTo(outputStream);
+ } catch (Exception ex) {
+ hostPath.toFile().delete();
+ throw ex;
+ }
+ }
+
+ log.info("Loading image {} in the k3s container", image);
+ if (container.execInContainer("ctr", "images", "import", "/images/" + id).getExitCode()
+ != 0) {
+ throw new RuntimeException("Failed to load image " + image);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (container != null) {
+ container.stop();
+ }
+ }
+
+ @Override
+ public String getKubeConfig() {
+ return container.getKubeconfig();
+ }
+}
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/RunningHostCluster.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/RunningHostCluster.java
new file mode 100644
index 000000000..4705803c0
--- /dev/null
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/RunningHostCluster.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.tests.util.k8s;
+
+import ai.langstream.tests.util.KubeCluster;
+import io.fabric8.kubernetes.client.Config;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import lombok.SneakyThrows;
+
+public class RunningHostCluster implements KubeCluster {
+ @Override
+ public void start() {}
+
+ @Override
+ public void ensureImage(String image) {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ @SneakyThrows
+ public String getKubeConfig() {
+ final String kubeConfig = Config.getKubeconfigFilename();
+ return Files.readString(Paths.get(kubeConfig), StandardCharsets.UTF_8);
+ }
+}
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java
new file mode 100644
index 000000000..fd4c6b05e
--- /dev/null
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.tests.util.kafka;
+
+import ai.langstream.api.model.StreamingCluster;
+import ai.langstream.tests.util.BaseEndToEndTest;
+import ai.langstream.tests.util.StreamingClusterProvider;
+import io.fabric8.kubernetes.api.model.NamespaceBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class LocalRedPandaClusterProvider implements StreamingClusterProvider {
+
+ private static final boolean REUSE_EXISTING_REDPANDA =
+ Boolean.parseBoolean(System.getProperty("langstream.tests.reuseRedPanda", "false"));
+ protected static final String KAFKA_NAMESPACE = "kafka-ns";
+
+ private final KubernetesClient client;
+ private boolean started;
+
+ public LocalRedPandaClusterProvider(KubernetesClient client) {
+ this.client = client;
+ }
+
+ @Override
+ @SneakyThrows
+ public StreamingCluster start() {
+ if (!started) {
+ internalStart();
+ started = true;
+ }
+ return new StreamingCluster(
+ "kafka",
+ Map.of(
+ "admin",
+ Map.of(
+ "bootstrap.servers",
+ "redpanda-0.redpanda.kafka-ns.svc.cluster.local:9093")));
+ }
+
+ private void internalStart() throws InterruptedException, IOException {
+ log.info("installing redpanda");
+ client.resource(
+ new NamespaceBuilder()
+ .withNewMetadata()
+ .withName(KAFKA_NAMESPACE)
+ .endMetadata()
+ .build())
+ .serverSideApply();
+
+ if (!REUSE_EXISTING_REDPANDA) {
+ log.info("try to delete existing redpanda");
+ BaseEndToEndTest.runProcess(
+ "helm delete redpanda --namespace kafka-ns".split(" "), true);
+ }
+
+ BaseEndToEndTest.runProcess(
+ "helm repo add redpanda https://charts.redpanda.com/".split(" "), true);
+ BaseEndToEndTest.runProcess("helm repo update".split(" "));
+ // ref https://github.com/redpanda-data/helm-charts/blob/main/charts/redpanda/values.yaml
+ log.info("running helm command to install redpanda");
+ BaseEndToEndTest.runProcess(
+ ("helm --debug upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3"
+ + " --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console"
+ + ".enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc"
+ + ".cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true --set tuning.tune_aio_events=false --wait --timeout=5m")
+ .split(" "));
+ log.info("waiting redpanda to be ready");
+ BaseEndToEndTest.runProcess(
+ "kubectl wait pods redpanda-0 --for=condition=Ready --timeout=5m -n kafka-ns"
+ .split(" "));
+ log.info("redpanda installed");
+ }
+
+ @SneakyThrows
+ private static String execInKafkaPod(String cmd) {
+ return BaseEndToEndTest.execInPodInNamespace(
+ KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" "))
+ .get(1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void cleanup() {
+ execInKafkaPod("rpk topic delete -r \".*\"");
+ }
+
+ @Override
+ public List getTopics() {
+ final String result = execInKafkaPod("rpk topic list");
+ if (result == null) {
+ throw new IllegalStateException("failed to get topics from kafka");
+ }
+
+ final List topics = new ArrayList<>();
+ final List lines = result.lines().collect(Collectors.toList());
+ boolean first = true;
+ for (String line : lines) {
+ if (first) {
+ first = false;
+ continue;
+ }
+ topics.add(line.split(" ")[0]);
+ }
+ return topics;
+ }
+
+ @Override
+ public void stop() {}
+}
diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/RemoteKafkaProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/RemoteKafkaProvider.java
new file mode 100644
index 000000000..07641342b
--- /dev/null
+++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/RemoteKafkaProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.tests.util.kafka;
+
+import ai.langstream.api.model.StreamingCluster;
+import ai.langstream.tests.util.BaseEndToEndTest;
+import ai.langstream.tests.util.StreamingClusterProvider;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.Admin;
+
+@Slf4j
+public class RemoteKafkaProvider implements StreamingClusterProvider {
+
+ private static final String SYS_PROPERTIES_PREFIX = "langstream.tests.remotekafka.props.";
+ private static final String ENV_PREFIX = "LANGSTREAM_TESTS_REMOTEKAFKA_PROPS_";
+ private static final Map ADMIN_CONFIG;
+
+ private org.apache.kafka.clients.admin.Admin admin;
+
+ static {
+ ADMIN_CONFIG = new HashMap<>();
+
+ final Map envs = System.getenv();
+ for (Map.Entry env : envs.entrySet()) {
+ if (env.getKey().startsWith(ENV_PREFIX)) {
+ final String key =
+ env.getKey().substring(ENV_PREFIX.length()).toLowerCase().replace("_", ".");
+ final String value = env.getValue();
+ log.info("Loading remote kafka admin config from env: {}={}", key, value);
+ ADMIN_CONFIG.put(key, value);
+ }
+ }
+
+ final Set