diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7fd3ca7e7..fa50f4e39 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -155,13 +155,13 @@ jobs: uname -m ./dev/prepare-minikube-for-e2e-tests.sh ./mvnw install -pl langstream-e2e-tests -am -DskipTests - ./mvnw verify -pl langstream-e2e-tests -De2eTests -DexcludedGroups="needs-credentials" + ./mvnw verify -pl langstream-e2e-tests -De2eTests -DexcludedGroups="needs-credentials" -Dlangstream.tests.recycleenv=true - name: Upload Surefire reports uses: actions/upload-artifact@v3 if: failure() continue-on-error: true with: - name: test-logs-${{ matrix.group }} + name: test-logs-e2e path: "**/target/e2e-test-logs/*" retention-days: 7 diff --git a/examples/applications/webcrawler-source/chatbot.yaml b/examples/applications/webcrawler-source/chatbot.yaml index b8c967d2f..a049c345c 100644 --- a/examples/applications/webcrawler-source/chatbot.yaml +++ b/examples/applications/webcrawler-source/chatbot.yaml @@ -67,7 +67,7 @@ pipeline: messages: - role: system content: | - An user is going to perform a questions, he documents below may help you in answering to their questions. + An user is going to perform a questions, The documents below may help you in answering to their questions. Please try to leverage them in your answer as much as possible. Take into consideration that the user is always asking questions about the LangStream project. If you provide code or YAML snippets, please explicitly state that they are examples. diff --git a/examples/applications/webcrawler-source/crawler.yaml b/examples/applications/webcrawler-source/crawler.yaml index affca5180..9bec4b98d 100644 --- a/examples/applications/webcrawler-source/crawler.yaml +++ b/examples/applications/webcrawler-source/crawler.yaml @@ -31,7 +31,7 @@ pipeline: reindex-interval-seconds: 3600 max-error-count: 5 max-urls: 1000 - max-depth: 10 + max-depth: 50 handle-robots-file: true user-agent: "" # this is computed automatically, but you can override it scan-html-documents: true @@ -53,7 +53,7 @@ pipeline: - name: "Detect language" type: "language-detector" configuration: - allowedLanguages: ["en"] + allowedLanguages: ["en", "fr"] property: "language" - name: "Split into chunks" type: "text-splitter" diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java index 0c57634c2..53423f114 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java @@ -74,7 +74,9 @@ protected TypeCodec createCodec( @Override public void initialize(Map dataSourceConfig) { - log.info("Initializing AstraDBDataSource with config {}", dataSourceConfig); + log.info( + "Initializing CassandraDataSource with config {}", + ConfigurationUtils.redactSecrets(dataSourceConfig)); this.astraToken = ConfigurationUtils.getString("token", "", dataSourceConfig); this.astraEnvironment = ConfigurationUtils.getString("environment", "PROD", dataSourceConfig); diff --git a/langstream-api/src/main/java/ai/langstream/api/util/ConfigurationUtils.java b/langstream-api/src/main/java/ai/langstream/api/util/ConfigurationUtils.java index 4172a522f..595caac1b 100644 --- a/langstream-api/src/main/java/ai/langstream/api/util/ConfigurationUtils.java +++ b/langstream-api/src/main/java/ai/langstream/api/util/ConfigurationUtils.java @@ -15,8 +15,10 @@ */ package ai.langstream.api.util; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -252,4 +254,49 @@ public static void requiredListField( "Expecting a list in the field '" + name + "' in " + definition.get()); } } + + /** + * Remove all the secrets from the configuration. This method is used to avoid logging secrets + * + * @param object + * @return the object without secrets + */ + public static Object redactSecrets(Object object) { + if (object == null) { + return null; + } + + if (object instanceof List list) { + List other = new ArrayList<>(list.size()); + list.forEach(o -> other.add(redactSecrets(o))); + return other; + } + if (object instanceof Set set) { + Set other = new HashSet<>(set.size()); + set.forEach(o -> other.add(redactSecrets(o))); + return other; + } + + if (object instanceof Map map) { + Map other = new HashMap<>(); + map.forEach( + (k, v) -> { + String keyLowercase = (String.valueOf(k)).toLowerCase(); + if (keyLowercase.contains("password") + || keyLowercase.contains("pwd") + || keyLowercase.contains("secure") + || keyLowercase.contains("secret") + || keyLowercase.contains("serviceaccountjson") + || keyLowercase.contains("access-key") + || keyLowercase.contains("token")) { + other.put(k, ""); + } else { + other.put(k, redactSecrets(v)); + } + }); + return other; + } + + return object; + } } diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/GithubRepositoryDownloader.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/GithubRepositoryDownloader.java index b3aedd3f2..abeee12ca 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/GithubRepositoryDownloader.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/GithubRepositoryDownloader.java @@ -39,6 +39,7 @@ public static File downloadGithubRepository(URI uri, Consumer logger) { RequestedDirectory requestedDirectory = parseRequest(uri); final Path directory = Files.createTempDirectory("langstream"); + logger.accept(String.format("Cloning GitHub repository %s locally", uri)); final long start = System.currentTimeMillis(); Git.cloneRepository() diff --git a/langstream-e2e-tests/pom.xml b/langstream-e2e-tests/pom.xml index 6a499d1de..7a280d2ec 100644 --- a/langstream-e2e-tests/pom.xml +++ b/langstream-e2e-tests/pom.xml @@ -52,6 +52,17 @@ ${project.version} test + + ${project.groupId} + langstream-core + ${project.version} + test + + + org.apache.kafka + kafka-clients + test + diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java index 9cf7bf43e..edb893a22 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java @@ -18,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import ai.langstream.tests.util.BaseEndToEndTest; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java index 6805d072a..fb9c61863 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java @@ -15,6 +15,7 @@ */ package ai.langstream.tests; +import ai.langstream.tests.util.BaseEndToEndTest; import ai.langstream.tests.util.ConsumeGatewayMessage; import java.util.List; import java.util.Map; diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java index 3cd70f6ed..a1b8b5f67 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java @@ -17,6 +17,7 @@ import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; +import ai.langstream.tests.util.BaseEndToEndTest; import io.fabric8.kubernetes.api.model.Secret; import java.util.List; import java.util.concurrent.TimeUnit; @@ -96,7 +97,7 @@ public void test(String appDir) { .size()); }); - final List topics = getAllTopicsFromKafka(); - Assertions.assertEquals(List.of("TEST_TOPIC_0"), topics); + final List topics = getAllTopics(); + Assertions.assertEquals(List.of("ls-test-topic0"), topics); } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java index 2e88e905f..880efbbfc 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java @@ -15,6 +15,7 @@ */ package ai.langstream.tests; +import ai.langstream.tests.util.BaseEndToEndTest; import ai.langstream.tests.util.ConsumeGatewayMessage; import java.util.List; import java.util.Map; diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java similarity index 80% rename from langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java rename to langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index 7129318cd..cdcf74995 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -13,22 +13,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.tests; +package ai.langstream.tests.util; +import ai.langstream.api.model.StreamingCluster; import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; -import ai.langstream.tests.util.ConsumeGatewayMessage; -import com.dajudge.kindcontainer.K3sContainer; -import com.dajudge.kindcontainer.K3sContainerVersion; -import com.dajudge.kindcontainer.KubernetesImageSpec; -import com.dajudge.kindcontainer.helm.Helm3Container; +import ai.langstream.impl.parser.ModelBuilder; +import ai.langstream.tests.util.k8s.LocalK3sContainer; +import ai.langstream.tests.util.k8s.RunningHostCluster; +import ai.langstream.tests.util.kafka.LocalRedPandaClusterProvider; +import ai.langstream.tests.util.kafka.RemoteKafkaProvider; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -46,8 +49,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -72,120 +73,31 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.TestWatcher; -import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.BindMode; @Slf4j public class BaseEndToEndTest implements TestWatcher { + public static final String TOPICS_PREFIX = "ls-test-"; + public static final String CATEGORY_NEEDS_CREDENTIALS = "needs-credentials"; + private static final boolean LANGSTREAM_RECYCLE_ENV = + Boolean.parseBoolean(System.getProperty("langstream.tests.recycleenv", "false")); + private static final String LANGSTREAM_TAG = System.getProperty("langstream.tests.tag", "latest-dev"); - private static final boolean REUSE_EXISTING_REDPANDA = - Boolean.parseBoolean(System.getProperty("langstream.tests.reuseRedPanda", "false")); + private static final String LANGSTREAM_K8S = System.getProperty("langstream.tests.k8s", "host"); + private static final String LANGSTREAM_STREAMING = + System.getProperty("langstream.tests.streaming", "local-redpanda"); public static final File TEST_LOGS_DIR = new File("target", "e2e-test-logs"); protected static final String TENANT_NAMESPACE_PREFIX = "ls-tenant-"; protected static final ObjectMapper JSON_MAPPER = new ObjectMapper(); protected static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); - protected static final String KAFKA_NAMESPACE = "kafka-ns"; - - interface KubeServer { - void start(); - - void ensureImage(String image); - - void stop(); - - String getKubeConfig(); - } - - static class RunningHostCluster implements KubeServer { - @Override - public void start() {} - - @Override - public void ensureImage(String image) {} - - @Override - public void stop() {} - - @Override - @SneakyThrows - public String getKubeConfig() { - final String kubeConfig = Config.getKubeconfigFilename(); - return Files.readString(Paths.get(kubeConfig), StandardCharsets.UTF_8); - } - } - - static class LocalK3sContainer implements KubeServer { - - K3sContainer container; - final Path basePath = Paths.get("/tmp", "ls-tests-image"); - - @Override - public void start() { - container = - new K3sContainer( - new KubernetesImageSpec<>(K3sContainerVersion.VERSION_1_25_0) - .withImage("rancher/k3s:v1.25.3-k3s1")); - container.withFileSystemBind( - basePath.toFile().getAbsolutePath(), "/images", BindMode.READ_WRITE); - // container.withNetwork(network); - container.start(); - } - - @Override - public void ensureImage(String image) { - loadImage(basePath, image); - } - - @SneakyThrows - private void loadImage(Path basePath, String image) { - final String id = - DockerClientFactory.lazyClient() - .inspectImageCmd(image) - .exec() - .getId() - .replace("sha256:", ""); - - final Path hostPath = basePath.resolve(id); - if (!hostPath.toFile().exists()) { - log.info("Saving image {} locally", image); - final InputStream in = DockerClientFactory.lazyClient().saveImageCmd(image).exec(); - - try (final OutputStream outputStream = Files.newOutputStream(hostPath)) { - in.transferTo(outputStream); - } catch (Exception ex) { - hostPath.toFile().delete(); - throw ex; - } - } - - log.info("Loading image {} in the k3s container", image); - if (container.execInContainer("ctr", "images", "import", "/images/" + id).getExitCode() - != 0) { - throw new RuntimeException("Failed to load image " + image); - } - } - - @Override - public void stop() { - if (container != null) { - container.stop(); - } - } - - @Override - public String getKubeConfig() { - return container.getKubeconfig(); - } - } - - protected static KubeServer kubeServer; - protected static Helm3Container helm3Container; + protected static KubeCluster kubeCluster; + protected static StreamingClusterProvider streamingClusterProvider; + protected static File instanceFile; protected static KubernetesClient client; protected static String namespace; @@ -289,11 +201,11 @@ protected static ConsumeGatewayMessage consumeOneMessageFromGateway( return ConsumeGatewayMessage.readValue(secondLine); } - protected static void runProcess(String[] allArgs) throws InterruptedException, IOException { + public static void runProcess(String[] allArgs) throws InterruptedException, IOException { runProcess(allArgs, false); } - private static void runProcess(String[] allArgs, boolean allowFailures) + public static void runProcess(String[] allArgs, boolean allowFailures) throws InterruptedException, IOException { ProcessBuilder processBuilder = new ProcessBuilder(allArgs) @@ -430,73 +342,78 @@ public static void closeQuietly(Closeable c) { @SneakyThrows public static void destroy() { cleanupAllEndToEndTestsNamespaces(); - if (client != null) { - client.close(); - } - if (helm3Container != null) { - helm3Container.close(); - } - if (kubeServer != null) { - kubeServer.stop(); + if (!LANGSTREAM_RECYCLE_ENV) { + if (streamingClusterProvider != null) { + streamingClusterProvider.stop(); + streamingClusterProvider = null; + } + if (client != null) { + client.close(); + client = null; + } + if (kubeCluster != null) { + kubeCluster.stop(); + kubeCluster = null; + } } } @BeforeAll @SneakyThrows public static void setup() { + if (kubeCluster == null) { + kubeCluster = getKubeCluster(); + kubeCluster.start(); + } - // kubeServer = new LocalK3sContainer(); - kubeServer = new RunningHostCluster(); - kubeServer.start(); + if (client == null) { + client = + new KubernetesClientBuilder() + .withConfig(Config.fromKubeconfig(kubeCluster.getKubeConfig())) + .build(); + } - client = - new KubernetesClientBuilder() - .withConfig(Config.fromKubeconfig(kubeServer.getKubeConfig())) - .build(); + if (streamingClusterProvider == null) { + streamingClusterProvider = getStreamingClusterProvider(); + } try { final Path tempFile = Files.createTempFile("ls-test-kube", ".yaml"); - Files.writeString(tempFile, kubeServer.getKubeConfig()); + Files.writeString(tempFile, kubeCluster.getKubeConfig()); System.out.println( "To inspect the container\nKUBECONFIG=" + tempFile.toFile().getAbsolutePath() + " k9s"); - final CompletableFuture kafkaFuture = - CompletableFuture.runAsync(BaseEndToEndTest::installKafka); + final CompletableFuture streamingClusterFuture = + CompletableFuture.supplyAsync(() -> streamingClusterProvider.start()); final CompletableFuture minioFuture = CompletableFuture.runAsync(BaseEndToEndTest::installMinio); List> imagesFutures = new ArrayList<>(); - final String baseImageRepository = - LANGSTREAM_TAG.equals("latest-dev") ? "langstream" : "ghcr.io/langstream"; - final String imagePullPolicy = - LANGSTREAM_TAG.equals("latest-dev") ? "Never" : "IfNotPresent"; - imagesFutures.add( CompletableFuture.runAsync( () -> - kubeServer.ensureImage( + kubeCluster.ensureImage( "langstream/langstream-control-plane:latest-dev"))); imagesFutures.add( CompletableFuture.runAsync( () -> - kubeServer.ensureImage( + kubeCluster.ensureImage( "langstream/langstream-deployer:latest-dev"))); imagesFutures.add( CompletableFuture.runAsync( () -> - kubeServer.ensureImage( + kubeCluster.ensureImage( "langstream/langstream-runtime:latest-dev"))); imagesFutures.add( CompletableFuture.runAsync( () -> - kubeServer.ensureImage( + kubeCluster.ensureImage( "langstream/langstream-api-gateway:latest-dev"))); CompletableFuture.allOf( - kafkaFuture, minioFuture, imagesFutures.get(0), imagesFutures.get(1), @@ -504,12 +421,49 @@ public static void setup() { imagesFutures.get(3)) .join(); + final StreamingCluster streamingCluster = streamingClusterFuture.join(); + + final Map> instanceContent = + Map.of( + "instance", + Map.of( + "streamingCluster", + streamingCluster, + "computeCluster", + Map.of("type", "kubernetes"))); + + instanceFile = Files.createTempFile("ls-test", ".yaml").toFile(); + YAML_MAPPER.writeValue(instanceFile, instanceContent); + } catch (Throwable ee) { dumpTest("BeforeAll"); throw ee; } } + private static StreamingClusterProvider getStreamingClusterProvider() { + switch (LANGSTREAM_STREAMING) { + case "local-redpanda": + return new LocalRedPandaClusterProvider(client); + case "remote-kafka": + return new RemoteKafkaProvider(); + default: + throw new IllegalArgumentException( + "Unknown LANGSTREAM_STREAMING: " + LANGSTREAM_STREAMING); + } + } + + private static KubeCluster getKubeCluster() { + switch (LANGSTREAM_K8S) { + case "k3s": + return new LocalK3sContainer(); + case "host": + return new RunningHostCluster(); + default: + throw new IllegalArgumentException("Unknown LANGSTREAM_K8S: " + LANGSTREAM_K8S); + } + } + @BeforeEach @SneakyThrows public void setupSingleTest() { @@ -530,14 +484,47 @@ public void setupSingleTest() { @AfterEach public void cleanupAfterEach() { cleanupAllEndToEndTestsNamespaces(); - execInKafkaPod("rpk topic delete -r \".*\""); + streamingClusterProvider.cleanup(); } private static void cleanupAllEndToEndTestsNamespaces() { client.namespaces().withLabel("app", "ls-test").delete(); client.namespaces().list().getItems().stream() - .filter(ns -> ns.getMetadata().getName().startsWith(TENANT_NAMESPACE_PREFIX)) - .forEach(ns -> client.namespaces().withName(ns.getMetadata().getName()).delete()); + .map(ns -> ns.getMetadata().getName()) + .filter(ns -> ns.startsWith(TENANT_NAMESPACE_PREFIX)) + .forEach(ns -> deleteTenantNamespace(ns)); + } + + private static void deleteTenantNamespace(String ns) { + try { + final List agents = + client.resources(AgentCustomResource.class).inNamespace(ns).list().getItems(); + for (AgentCustomResource agent : agents) { + agent.getMetadata().setFinalizers(List.of()); + client.resource(agent).inNamespace(ns).serverSideApply(); + client.resources(AgentCustomResource.class) + .inNamespace(ns) + .withName(agent.getMetadata().getNamespace()) + .delete(); + } + final List apps = + client.resources(ApplicationCustomResource.class) + .inNamespace(ns) + .list() + .getItems(); + + for (ApplicationCustomResource app : apps) { + app.getMetadata().setFinalizers(List.of()); + client.resource(app).inNamespace(ns).serverSideApply(); + client.resources(ApplicationCustomResource.class) + .inNamespace(ns) + .withName(app.getMetadata().getNamespace()) + .delete(); + } + } catch (Throwable tt) { + log.error("Error deleting tenant namespace: " + ns, tt); + } + client.namespaces().withName(ns).delete(); } @SneakyThrows @@ -696,37 +683,6 @@ private static void awaitApiGatewayReady() { log.info("api gateway ready"); } - @SneakyThrows - private static void installKafka() { - log.info("installing kafka"); - client.resource( - new NamespaceBuilder() - .withNewMetadata() - .withName(KAFKA_NAMESPACE) - .endMetadata() - .build()) - .serverSideApply(); - - if (!REUSE_EXISTING_REDPANDA) { - runProcess("helm delete redpanda --namespace kafka-ns".split(" "), true); - } - - runProcess("helm repo add redpanda https://charts.redpanda.com/".split(" "), true); - runProcess("helm repo update".split(" ")); - // ref https://github.com/redpanda-data/helm-charts/blob/main/charts/redpanda/values.yaml - runProcess( - ("helm upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3" - + " --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console" - + ".enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc" - + ".cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true") - .split(" ")); - log.info("waiting kafka to be ready"); - runProcess( - "kubectl wait pods redpanda-0 --for=condition=Ready --timeout=5m -n kafka-ns" - .split(" ")); - log.info("kafka installed"); - } - static void installMinio() { applyManifestNoNamespace( """ @@ -809,25 +765,35 @@ protected static void withPodLogs( BiConsumer consumer) { if (podName != null) { try { - client.pods() - .inNamespace(namespace) - .withName(podName) - .get() - .getSpec() - .getContainers() - .forEach( - container -> { - final ContainerResource containerResource = - client.pods() - .inNamespace(namespace) - .withName(podName) - .inContainer(container.getName()); - if (tailingLines > 0) { - containerResource.tailingLines(tailingLines); - } - final String containerLog = containerResource.getLog(); - consumer.accept(container.getName(), containerLog); - }); + final PodSpec podSpec = + client.pods().inNamespace(namespace).withName(podName).get().getSpec(); + List all = new ArrayList<>(); + final List containers = podSpec.getContainers(); + all.addAll(containers); + final List init = podSpec.getInitContainers(); + if (init != null) { + all.addAll(init); + } + for (Container container : all) { + try { + final ContainerResource containerResource = + client.pods() + .inNamespace(namespace) + .withName(podName) + .inContainer(container.getName()); + if (tailingLines > 0) { + containerResource.tailingLines(tailingLines); + } + final String containerLog = containerResource.getLog(); + consumer.accept(container.getName(), containerLog); + } catch (Throwable t) { + log.error( + "failed to get pod {} container {} logs: {}", + podName, + container.getName(), + t.getMessage()); + } + } } catch (Throwable t) { log.error("failed to get pod {} logs: {}", podName, t.getMessage()); } @@ -969,29 +935,8 @@ protected static void dumpEvents(String filePrefix) { } @SneakyThrows - protected static List getAllTopicsFromKafka() { - final String result = execInKafkaPod("rpk topic list"); - if (result == null) { - throw new IllegalStateException("failed to get topics from kafka"); - } - - final List topics = new ArrayList<>(); - final List lines = result.lines().collect(Collectors.toList()); - boolean first = true; - for (String line : lines) { - if (first) { - first = false; - continue; - } - topics.add(line.split(" ")[0]); - } - return topics; - } - - @SneakyThrows - private static String execInKafkaPod(String cmd) { - return execInPodInNamespace(KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" ")) - .get(1, TimeUnit.MINUTES); + protected static List getAllTopics() { + return streamingClusterProvider.getTopics(); } protected static void setupTenant(String tenant) { @@ -1044,17 +989,19 @@ protected static boolean isApplicationReady( expectedRunningTotalExecutors + "/" + expectedRunningTotalExecutors); } + @SneakyThrows protected static void deployLocalApplication( - String applicationId, String appDir, Map env) { + String applicationId, String appDirName, Map env) { String testAppsBaseDir = "src/test/resources/apps"; - String testInstanceBaseDir = "src/test/resources/instances"; String testSecretBaseDir = "src/test/resources/secrets"; - copyFileToClientContainer(Paths.get(testAppsBaseDir, appDir).toFile(), "/tmp/app"); - copyFileToClientContainer( - Paths.get(testInstanceBaseDir, "kafka-kubernetes.yaml").toFile(), - "/tmp/instance.yaml"); - copyFileToClientContainer( - Paths.get(testSecretBaseDir, "secret1.yaml").toFile(), "/tmp/secrets.yaml"); + + final File appDir = Paths.get(testAppsBaseDir, appDirName).toFile(); + final File secretFile = Paths.get(testSecretBaseDir, "secret1.yaml").toFile(); + validateApp(appDir, secretFile); + copyFileToClientContainer(appDir, "/tmp/app"); + copyFileToClientContainer(instanceFile, "/tmp/instance.yaml"); + + copyFileToClientContainer(secretFile, "/tmp/secrets.yaml"); String beforeCmd = ""; if (env != null) { @@ -1072,6 +1019,32 @@ protected static void deployLocalApplication( .split(" ")); } + private static void validateApp(File appDir, File secretFile) throws Exception { + final ModelBuilder.ApplicationWithPackageInfo model = + ModelBuilder.buildApplicationInstance( + List.of(appDir.toPath()), + Files.readString(instanceFile.toPath()), + Files.readString(secretFile.toPath())); + model.getApplication() + .getModules() + .values() + .forEach( + m -> { + m.getTopics() + .keySet() + .forEach( + t -> { + if (!t.startsWith(TOPICS_PREFIX)) { + throw new IllegalStateException( + "All topics must start with " + + TOPICS_PREFIX + + ". Found " + + t); + } + }); + }); + } + protected static Map getAppEnvMapFromSystem(List names) { final Map result = new HashMap<>(); for (String name : names) { diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/KubeCluster.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/KubeCluster.java new file mode 100644 index 000000000..01ba03c74 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/KubeCluster.java @@ -0,0 +1,26 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util; + +public interface KubeCluster { + void start(); + + void ensureImage(String image); + + void stop(); + + String getKubeConfig(); +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/StreamingClusterProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/StreamingClusterProvider.java new file mode 100644 index 000000000..34c275a70 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/StreamingClusterProvider.java @@ -0,0 +1,30 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util; + +import ai.langstream.api.model.StreamingCluster; +import java.util.List; + +public interface StreamingClusterProvider { + + StreamingCluster start(); + + void cleanup(); + + void stop(); + + List getTopics(); +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/LocalK3sContainer.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/LocalK3sContainer.java new file mode 100644 index 000000000..71ca7bb64 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/LocalK3sContainer.java @@ -0,0 +1,95 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util.k8s; + +import ai.langstream.tests.util.KubeCluster; +import com.dajudge.kindcontainer.K3sContainer; +import com.dajudge.kindcontainer.K3sContainerVersion; +import com.dajudge.kindcontainer.KubernetesImageSpec; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.BindMode; + +@Slf4j +public class LocalK3sContainer implements KubeCluster { + + K3sContainer container; + final Path basePath = Paths.get("/tmp", "ls-tests-image"); + + @Override + public void start() { + container = + new K3sContainer( + new KubernetesImageSpec<>(K3sContainerVersion.VERSION_1_25_0) + .withImage("rancher/k3s:v1.25.3-k3s1")); + container.withFileSystemBind( + basePath.toFile().getAbsolutePath(), "/images", BindMode.READ_WRITE); + // container.withNetwork(network); + container.start(); + } + + @Override + public void ensureImage(String image) { + loadImage(basePath, image); + } + + @SneakyThrows + private void loadImage(Path basePath, String image) { + final String id = + DockerClientFactory.lazyClient() + .inspectImageCmd(image) + .exec() + .getId() + .replace("sha256:", ""); + + final Path hostPath = basePath.resolve(id); + if (!hostPath.toFile().exists()) { + log.info("Saving image {} locally", image); + final InputStream in = DockerClientFactory.lazyClient().saveImageCmd(image).exec(); + + try (final OutputStream outputStream = Files.newOutputStream(hostPath)) { + in.transferTo(outputStream); + } catch (Exception ex) { + hostPath.toFile().delete(); + throw ex; + } + } + + log.info("Loading image {} in the k3s container", image); + if (container.execInContainer("ctr", "images", "import", "/images/" + id).getExitCode() + != 0) { + throw new RuntimeException("Failed to load image " + image); + } + } + + @Override + public void stop() { + if (container != null) { + container.stop(); + } + } + + @Override + public String getKubeConfig() { + return container.getKubeconfig(); + } +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/RunningHostCluster.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/RunningHostCluster.java new file mode 100644 index 000000000..4705803c0 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/k8s/RunningHostCluster.java @@ -0,0 +1,41 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util.k8s; + +import ai.langstream.tests.util.KubeCluster; +import io.fabric8.kubernetes.client.Config; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import lombok.SneakyThrows; + +public class RunningHostCluster implements KubeCluster { + @Override + public void start() {} + + @Override + public void ensureImage(String image) {} + + @Override + public void stop() {} + + @Override + @SneakyThrows + public String getKubeConfig() { + final String kubeConfig = Config.getKubeconfigFilename(); + return Files.readString(Paths.get(kubeConfig), StandardCharsets.UTF_8); + } +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java new file mode 100644 index 000000000..fd4c6b05e --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java @@ -0,0 +1,130 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util.kafka; + +import ai.langstream.api.model.StreamingCluster; +import ai.langstream.tests.util.BaseEndToEndTest; +import ai.langstream.tests.util.StreamingClusterProvider; +import io.fabric8.kubernetes.api.model.NamespaceBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LocalRedPandaClusterProvider implements StreamingClusterProvider { + + private static final boolean REUSE_EXISTING_REDPANDA = + Boolean.parseBoolean(System.getProperty("langstream.tests.reuseRedPanda", "false")); + protected static final String KAFKA_NAMESPACE = "kafka-ns"; + + private final KubernetesClient client; + private boolean started; + + public LocalRedPandaClusterProvider(KubernetesClient client) { + this.client = client; + } + + @Override + @SneakyThrows + public StreamingCluster start() { + if (!started) { + internalStart(); + started = true; + } + return new StreamingCluster( + "kafka", + Map.of( + "admin", + Map.of( + "bootstrap.servers", + "redpanda-0.redpanda.kafka-ns.svc.cluster.local:9093"))); + } + + private void internalStart() throws InterruptedException, IOException { + log.info("installing redpanda"); + client.resource( + new NamespaceBuilder() + .withNewMetadata() + .withName(KAFKA_NAMESPACE) + .endMetadata() + .build()) + .serverSideApply(); + + if (!REUSE_EXISTING_REDPANDA) { + log.info("try to delete existing redpanda"); + BaseEndToEndTest.runProcess( + "helm delete redpanda --namespace kafka-ns".split(" "), true); + } + + BaseEndToEndTest.runProcess( + "helm repo add redpanda https://charts.redpanda.com/".split(" "), true); + BaseEndToEndTest.runProcess("helm repo update".split(" ")); + // ref https://github.com/redpanda-data/helm-charts/blob/main/charts/redpanda/values.yaml + log.info("running helm command to install redpanda"); + BaseEndToEndTest.runProcess( + ("helm --debug upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3" + + " --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console" + + ".enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc" + + ".cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true --set tuning.tune_aio_events=false --wait --timeout=5m") + .split(" ")); + log.info("waiting redpanda to be ready"); + BaseEndToEndTest.runProcess( + "kubectl wait pods redpanda-0 --for=condition=Ready --timeout=5m -n kafka-ns" + .split(" ")); + log.info("redpanda installed"); + } + + @SneakyThrows + private static String execInKafkaPod(String cmd) { + return BaseEndToEndTest.execInPodInNamespace( + KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" ")) + .get(1, TimeUnit.MINUTES); + } + + @Override + public void cleanup() { + execInKafkaPod("rpk topic delete -r \".*\""); + } + + @Override + public List getTopics() { + final String result = execInKafkaPod("rpk topic list"); + if (result == null) { + throw new IllegalStateException("failed to get topics from kafka"); + } + + final List topics = new ArrayList<>(); + final List lines = result.lines().collect(Collectors.toList()); + boolean first = true; + for (String line : lines) { + if (first) { + first = false; + continue; + } + topics.add(line.split(" ")[0]); + } + return topics; + } + + @Override + public void stop() {} +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/RemoteKafkaProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/RemoteKafkaProvider.java new file mode 100644 index 000000000..07641342b --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/RemoteKafkaProvider.java @@ -0,0 +1,102 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util.kafka; + +import ai.langstream.api.model.StreamingCluster; +import ai.langstream.tests.util.BaseEndToEndTest; +import ai.langstream.tests.util.StreamingClusterProvider; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.Admin; + +@Slf4j +public class RemoteKafkaProvider implements StreamingClusterProvider { + + private static final String SYS_PROPERTIES_PREFIX = "langstream.tests.remotekafka.props."; + private static final String ENV_PREFIX = "LANGSTREAM_TESTS_REMOTEKAFKA_PROPS_"; + private static final Map ADMIN_CONFIG; + + private org.apache.kafka.clients.admin.Admin admin; + + static { + ADMIN_CONFIG = new HashMap<>(); + + final Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + if (env.getKey().startsWith(ENV_PREFIX)) { + final String key = + env.getKey().substring(ENV_PREFIX.length()).toLowerCase().replace("_", "."); + final String value = env.getValue(); + log.info("Loading remote kafka admin config from env: {}={}", key, value); + ADMIN_CONFIG.put(key, value); + } + } + + final Set props = System.getProperties().keySet(); + for (Object prop : props) { + String asString = prop.toString(); + if (asString.startsWith(SYS_PROPERTIES_PREFIX)) { + final String key = asString.substring(SYS_PROPERTIES_PREFIX.length()); + final String value = System.getProperty(asString); + log.info("Loading remote kafka admin config from sys: {}={}", key, value); + ADMIN_CONFIG.put(key, value); + } + } + } + + @Override + public StreamingCluster start() { + admin = Admin.create(ADMIN_CONFIG); + final StreamingCluster streamingCluster = + new StreamingCluster("kafka", Map.of("admin", ADMIN_CONFIG)); + cleanup(); + return streamingCluster; + } + + @Override + @SneakyThrows + public void cleanup() { + final List topics = getTopics(); + if (!topics.isEmpty()) { + log.info("Deleting topics: {}", topics); + for (String topic : topics) { + admin.deleteTopics(List.of(topic)).all().get(); + } + + // admin.deleteTopics(topics).all().get(); + } + } + + @Override + public void stop() { + if (admin != null) { + admin.close(); + } + } + + @Override + @SneakyThrows + public List getTopics() { + return admin.listTopics().names().get().stream() + .filter(s -> s.startsWith(BaseEndToEndTest.TOPICS_PREFIX)) + .collect(Collectors.toList()); + } +} diff --git a/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/gateways.yaml index 4ebbe13f6..1a6ad3386 100644 --- a/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/gateways.yaml @@ -18,4 +18,4 @@ gateways: - id: produce-input type: produce - topic: TEST_TOPIC_C1 \ No newline at end of file + topic: ls-test-topic1 \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/pipeline.yaml index a4cd39e2f..af71b1d67 100644 --- a/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/cassandra-sink/pipeline.yaml @@ -40,12 +40,12 @@ assets: - "CREATE TABLE IF NOT EXISTS vsearch.products (id int PRIMARY KEY, name TEXT, description TEXT);" - "INSERT INTO vsearch.products(id, name, description) VALUES (1, 'test-init', 'test-init');" topics: - - name: "TEST_TOPIC_C1" + - name: "ls-test-topic1" creation-mode: create-if-not-exists pipeline: - name: "Write to Cassandra" type: "sink" - input: "TEST_TOPIC_C1" + input: "ls-test-topic1" resources: size: 3 configuration: @@ -54,5 +54,5 @@ pipeline: value.converter: org.apache.kafka.connect.storage.StringConverter loadBalancing.localDc: "datacenter1" contactPoints: "{{{ secrets.cassandra.contact-points }}}" - topic.TEST_TOPIC_C1.vsearch.products.mapping: "id=value.id,description=value.description,name=value.name" + topic.ls-test-topic1.vsearch.products.mapping: "id=value.id,description=value.description,name=value.name" name: cassandra-sink \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml index 185fd7ac8..bab5fb989 100644 --- a/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml @@ -18,7 +18,7 @@ gateways: - id: produce-input type: produce - topic: input-topic + topic: ls-test-input-topic parameters: - sessionId produce-options: @@ -28,7 +28,7 @@ gateways: - id: consume-history type: consume - topic: history-topic + topic: ls-test-history-topic parameters: - sessionId consume-options: diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml index a3b81ef4d..8c73a9e10 100644 --- a/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml @@ -15,26 +15,26 @@ # topics: - - name: "input-topic" + - name: "ls-test-input-topic" creation-mode: create-if-not-exists - - name: "output-topic" + - name: "ls-test-output-topic" creation-mode: create-if-not-exists - - name: "history-topic" + - name: "ls-test-history-topic" creation-mode: create-if-not-exists pipeline: - name: "convert-to-json" type: "document-to-json" - input: "input-topic" + input: "ls-test-input-topic" configuration: text-field: "question" - name: "ai-chat-completions" type: "ai-chat-completions" - output: "history-topic" + output: "ls-test-history-topic" configuration: model: "{{{secrets.open-ai.chat-completions-model}}}" completion-field: "value.answer" log-field: "value.prompt" - stream-to-topic: "output-topic" + stream-to-topic: "ls-test-output-topic" stream-response-completion-field: "value" min-chunks-per-message: 20 messages: diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/gateways.yaml index 909ce3f59..625b8c2d0 100644 --- a/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/gateways.yaml @@ -17,7 +17,7 @@ gateways: - id: produce-input type: produce - topic: TEST_TOPIC_0 + topic: ls-test-topic0 - id: consume-output type: consume - topic: TEST_TOPIC_1 \ No newline at end of file + topic: ls-test-topic1 \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/pipeline.yaml index 92422cc17..9263a9016 100644 --- a/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-processor/pipeline.yaml @@ -18,13 +18,13 @@ module: "module-1" id: "pipeline-1" name: "Exclamation processor" topics: - - name: TEST_TOPIC_0 + - name: ls-test-topic0 creation-mode: create-if-not-exists schema: type: string keySchema: type: string - - name: TEST_TOPIC_1 + - name: ls-test-topic1 creation-mode: create-if-not-exists deletion-mode: delete schema: @@ -37,8 +37,8 @@ pipeline: size: 2 id: "test-python-processor" type: "experimental-python-processor" - input: TEST_TOPIC_0 - output: TEST_TOPIC_1 + input: ls-test-topic0 + output: ls-test-topic1 configuration: secret_value: "{{ secrets.secret1.value-key }}" className: example.Exclamation \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml index c6519b985..993b300c4 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml @@ -18,7 +18,7 @@ gateways: - id: produce-input type: produce - topic: TEST_TOPIC_0 + topic: ls-test-topic0 - id: consume-output type: consume - topic: TEST_TOPIC_1 \ No newline at end of file + topic: ls-test-topic1 \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml index 697812389..af26c2e89 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml @@ -19,13 +19,13 @@ module: "module-1" id: "pipeline-1" name: "Exclamation processor" topics: - - name: TEST_TOPIC_0 + - name: ls-test-topic0 creation-mode: create-if-not-exists schema: type: string keySchema: type: string - - name: TEST_TOPIC_1 + - name: ls-test-topic1 creation-mode: create-if-not-exists deletion-mode: delete schema: @@ -36,8 +36,8 @@ pipeline: - name: "Process using Python" id: "test-python-processor" type: "python-processor" - input: TEST_TOPIC_0 - output: TEST_TOPIC_1 + input: ls-test-topic0 + output: ls-test-topic1 configuration: secret_value: "{{ secrets.secret1.value-key }}" className: example.Exclamation \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml index b1775c504..40cb9128d 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml @@ -15,11 +15,11 @@ # topics: - - name: "questions-topic" + - name: "ls-test-questions-topic" creation-mode: create-if-not-exists - - name: "answers-topic" + - name: "ls-test-answers-topic" creation-mode: create-if-not-exists - - name: "log-topic" + - name: "ls-test-log-topic" creation-mode: create-if-not-exists errors: on-failure: "skip" @@ -28,7 +28,7 @@ resources: pipeline: - name: "convert-to-structure" type: "document-to-json" - input: "questions-topic" + input: "ls-test-questions-topic" configuration: text-field: "question" - name: "compute-embeddings" @@ -51,13 +51,13 @@ pipeline: configuration: model: "{{{secrets.open-ai.chat-completions-model}}}" # This needs to be set to the model deployment name, not the base name - # on the log-topic we add a field with the answer + # on the ls-test-log-topic we add a field with the answer completion-field: "value.answer" # we are also logging the prompt we sent to the LLM log-field: "value.prompt" # here we configure the streaming behavior - # as soon as the LLM answers with a chunk we send it to the answers-topic - stream-to-topic: "answers-topic" + # as soon as the LLM answers with a chunk we send it to the ls-test-answers-topic + stream-to-topic: "ls-test-answers-topic" # on the streaming answer we send the answer as whole message # the 'value' syntax is used to refer to the whole value of the message stream-response-completion-field: "value" @@ -83,7 +83,7 @@ pipeline: content: "{{% value.question}}" - name: "cleanup-response" type: "drop-fields" - output: "log-topic" + output: "ls-test-log-topic" configuration: fields: - "question_embeddings" diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml index 852914c1d..248f1abfa 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml @@ -16,7 +16,7 @@ name: "Crawl a website" topics: - - name: "chunks-topic" + - name: "ls-test-chunks-topic" creation-mode: create-if-not-exists resources: size: 2 @@ -87,7 +87,7 @@ pipeline: - name: "compute-embeddings" id: "step1" type: "compute-ai-embeddings" - output: "chunks-topic" + output: "ls-test-chunks-topic" configuration: model: "{{{secrets.open-ai.embeddings-model}}}" embeddings-field: "value.embeddings_vector" diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml index 739289814..cc53b8073 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml @@ -18,7 +18,7 @@ gateways: - id: "user-input" type: produce - topic: "questions-topic" + topic: "ls-test-questions-topic" parameters: - sessionId produceOptions: @@ -28,7 +28,7 @@ gateways: - id: "bot-output" type: consume - topic: "answers-topic" + topic: "ls-test-answers-topic" parameters: - sessionId consumeOptions: @@ -40,7 +40,7 @@ gateways: - id: "llm-debug" type: consume - topic: "log-topic" + topic: "ls-test-log-topic" parameters: - sessionId consumeOptions: diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml index f324f7e4f..9ee08a0b5 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml @@ -16,7 +16,7 @@ name: "Write to vector" topics: - - name: "chunks-topic" + - name: "ls-test-chunks-topic" creation-mode: create-if-not-exists assets: - name: "langstream-keyspace" @@ -55,7 +55,7 @@ assets: pipeline: - name: "Write to Astra" type: "vector-db-sink" - input: "chunks-topic" + input: "ls-test-chunks-topic" resources: size: 2 configuration: diff --git a/langstream-e2e-tests/src/test/resources/instances/kafka-kubernetes.yaml b/langstream-e2e-tests/src/test/resources/instances/kafka-kubernetes.yaml deleted file mode 100644 index b822d75a1..000000000 --- a/langstream-e2e-tests/src/test/resources/instances/kafka-kubernetes.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# -# -# Copyright DataStax, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -instance: - streamingCluster: - type: "kafka" - configuration: - admin: - bootstrap.servers: redpanda-0.redpanda.kafka-ns.svc.cluster.local:9093 - computeCluster: - type: "kubernetes"