diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f167a9f1a..7fd3ca7e7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -155,7 +155,7 @@ jobs: uname -m ./dev/prepare-minikube-for-e2e-tests.sh ./mvnw install -pl langstream-e2e-tests -am -DskipTests - ./mvnw verify -pl langstream-e2e-tests -De2eTests + ./mvnw verify -pl langstream-e2e-tests -De2eTests -DexcludedGroups="needs-credentials" - name: Upload Surefire reports uses: actions/upload-artifact@v3 diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java index ba7b93886..40ce49147 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java @@ -17,6 +17,7 @@ import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; +import ai.langstream.tests.util.ConsumeGatewayMessage; import com.dajudge.kindcontainer.K3sContainer; import com.dajudge.kindcontainer.K3sContainerVersion; import com.dajudge.kindcontainer.KubernetesImageSpec; @@ -61,10 +62,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.stream.Collectors; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -77,9 +78,18 @@ @Slf4j public class BaseEndToEndTest implements TestWatcher { + public static final String CATEGORY_NEEDS_CREDENTIALS = "needs-credentials"; + + private static final String LANGSTREAM_TAG = + System.getProperty("langstream.tests.tag", "latest-dev"); + + private static final boolean REUSE_EXISTING_REDPANDA = + Boolean.parseBoolean(System.getProperty("langstream.tests.reuseRedPanda", "false")); + public static final File TEST_LOGS_DIR = new File("target", "e2e-test-logs"); protected static final String TENANT_NAMESPACE_PREFIX = "ls-tenant-"; - protected static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()); + protected static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + protected static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); protected static final String KAFKA_NAMESPACE = "kafka-ns"; interface KubeServer { @@ -92,7 +102,7 @@ interface KubeServer { String getKubeConfig(); } - static class RunningHostCluster implements PythonFunctionIT.KubeServer { + static class RunningHostCluster implements KubeServer { @Override public void start() {} @@ -110,7 +120,7 @@ public String getKubeConfig() { } } - static class LocalK3sContainer implements PythonFunctionIT.KubeServer { + static class LocalK3sContainer implements KubeServer { K3sContainer container; final Path basePath = Paths.get("/tmp", "ls-tests-image"); @@ -179,8 +189,6 @@ public String getKubeConfig() { protected static KubernetesClient client; protected static String namespace; - protected final Map resolvedTopics = new HashMap<>(); - @Override public void testAborted(ExtensionContext context, Throwable cause) { testFailed(context, cause); @@ -222,13 +230,7 @@ protected static void applyManifestNoNamespace(String manifest) { } @SneakyThrows - protected void copyFileToClientContainer(File file, String toPath) { - copyFileToClientContainer(file, toPath, Function.identity()); - } - - @SneakyThrows - protected void copyFileToClientContainer( - File file, String toPath, Function contentTransformer) { + protected static void copyFileToClientContainer(File file, String toPath) { final String podName = client.pods() .inNamespace(namespace) @@ -239,16 +241,9 @@ protected void copyFileToClientContainer( .getMetadata() .getName(); if (file.isFile()) { - String content = Files.readString(file.toPath(), StandardCharsets.UTF_8); - content = contentTransformer.apply(content); - final Path temp = Files.createTempFile("langstream", ".replaced"); - for (Map.Entry e : resolvedTopics.entrySet()) { - content = content.replace(e.getKey(), e.getValue()); - } - Files.writeString(temp, content); runProcess( "kubectl cp %s %s:%s -n %s" - .formatted(temp.toFile().getAbsolutePath(), podName, toPath, namespace) + .formatted(file.getAbsolutePath(), podName, toPath, namespace) .split(" ")); } else { runProcess( @@ -259,12 +254,12 @@ protected void copyFileToClientContainer( } @SneakyThrows - protected String executeCommandOnClient(String... args) { - return executeCommandOnClient(1, TimeUnit.MINUTES, args); + protected static String executeCommandOnClient(String... args) { + return executeCommandOnClient(2, TimeUnit.MINUTES, args); } @SneakyThrows - protected String executeCommandOnClient(long timeout, TimeUnit unit, String... args) { + protected static String executeCommandOnClient(long timeout, TimeUnit unit, String... args) { final Pod pod = client.pods() .inNamespace(namespace) @@ -279,6 +274,17 @@ protected String executeCommandOnClient(long timeout, TimeUnit unit, String... a .get(timeout, unit); } + @SneakyThrows + protected static ConsumeGatewayMessage consumeOneMessageFromGateway( + String applicationId, String gatewayId, String... extraArgs) { + final String command = + "bin/langstream gateway consume %s %s %s -n 1" + .formatted(applicationId, gatewayId, String.join(" ", extraArgs)); + final String response = executeCommandOnClient(command); + final String secondLine = response.lines().collect(Collectors.toList()).get(1); + return ConsumeGatewayMessage.readValue(secondLine); + } + protected static void runProcess(String[] allArgs) throws InterruptedException, IOException { runProcess(allArgs, false); } @@ -330,7 +336,7 @@ public void onFailure(Throwable t, Response failureResponse) { "Error executing {} encountered; \nstderr: {}\nstdout: {}", cmd, error.toString(StandardCharsets.UTF_8), - out.toString(StandardCharsets.UTF_8), + out.toString(), t); response.completeExceptionally(t); } @@ -431,15 +437,6 @@ public static void destroy() { } } - @BeforeEach - @SneakyThrows - public void beforeEach() { - resolvedTopics.clear(); - for (int i = 0; i < 100; i++) { - resolvedTopics.put("TEST_TOPIC_" + i, "topic-" + i + "-" + System.nanoTime()); - } - } - @BeforeAll @SneakyThrows public static void setup() { @@ -468,6 +465,11 @@ public static void setup() { CompletableFuture.runAsync(BaseEndToEndTest::installMinio); List> imagesFutures = new ArrayList<>(); + final String baseImageRepository = + LANGSTREAM_TAG.equals("latest-dev") ? "langstream" : "ghcr.io/langstream"; + final String imagePullPolicy = + LANGSTREAM_TAG.equals("latest-dev") ? "Never" : "IfNotPresent"; + imagesFutures.add( CompletableFuture.runAsync( () -> @@ -561,75 +563,89 @@ private static void installLangStream(boolean authentication) { .withName("langstream-client-role-binding") .delete(); + final String baseImageRepository = + LANGSTREAM_TAG.equals("latest-dev") ? "langstream" : "ghcr.io/langstream"; + final String imagePullPolicy = + LANGSTREAM_TAG.equals("latest-dev") ? "Never" : "IfNotPresent"; final String values = """ - controlPlane: - image: - repository: langstream/langstream-control-plane - tag: latest-dev - pullPolicy: Never - resources: - requests: - cpu: 0.2 - memory: 256Mi - app: - config: - application.storage.global.type: kubernetes - application.security.enabled: false - - deployer: - image: - repository: langstream/langstream-deployer - tag: latest-dev - pullPolicy: Never - replicaCount: 1 - resources: - requests: - cpu: 0.1 - memory: 256Mi - app: - config: - agentResources: - cpuPerUnit: 0.2 - memPerUnit: 128 - client: - image: - repository: langstream/langstream-cli - tag: latest-dev - pullPolicy: Never - resources: - requests: - cpu: 0.1 - memory: 256Mi - - apiGateway: - image: - repository: langstream/langstream-api-gateway - tag: latest-dev - pullPolicy: Never - resources: - requests: - cpu: 0.2 - memory: 256Mi - app: - config: - logging.level.org.apache.tomcat.websocket: debug - - runtime: - image: langstream/langstream-runtime:latest-dev - imagePullPolicy: Never - tenants: - defaultTenant: - create: false - namespacePrefix: %s - codeStorage: - type: s3 - configuration: - endpoint: http://minio.minio-dev.svc.cluster.local:9000 - access-key: minioadmin - secret-key: minioadmin - """ - .formatted(TENANT_NAMESPACE_PREFIX); + images: + tag: %s + controlPlane: + image: + repository: %s/langstream-control-plane + pullPolicy: %s + resources: + requests: + cpu: 0.2 + memory: 256Mi + app: + config: + application.storage.global.type: kubernetes + application.security.enabled: false + + deployer: + image: + repository: %s/langstream-deployer + pullPolicy: %s + replicaCount: 1 + resources: + requests: + cpu: 0.1 + memory: 256Mi + app: + config: + agentResources: + cpuPerUnit: 0.2 + memPerUnit: 128 + client: + image: + repository: %s/langstream-cli + pullPolicy: %s + resources: + requests: + cpu: 0.1 + memory: 256Mi + + apiGateway: + image: + repository: %s/langstream-api-gateway + pullPolicy: %s + resources: + requests: + cpu: 0.2 + memory: 256Mi + app: + config: + logging.level.org.apache.tomcat.websocket: debug + + runtime: + image: %s/langstream-runtime + imagePullPolicy: %s + tenants: + defaultTenant: + create: false + namespacePrefix: %s + codeStorage: + type: s3 + configuration: + endpoint: http://minio.minio-dev.svc.cluster.local:9000 + access-key: minioadmin + secret-key: minioadmin + """ + .formatted( + LANGSTREAM_TAG, + baseImageRepository, + imagePullPolicy, + baseImageRepository, + imagePullPolicy, + baseImageRepository, + imagePullPolicy, + baseImageRepository, + imagePullPolicy, + baseImageRepository, + imagePullPolicy, + TENANT_NAMESPACE_PREFIX); final Path tempFile = Files.createTempFile("langstream-test", ".yaml"); Files.writeString(tempFile, values); @@ -687,12 +703,18 @@ private static void installKafka() { .build()) .serverSideApply(); - runProcess("helm delete redpanda --namespace kafka-ns".split(" "), true); + if (!REUSE_EXISTING_REDPANDA) { + runProcess("helm delete redpanda --namespace kafka-ns".split(" "), true); + } + runProcess("helm repo add redpanda https://charts.redpanda.com/".split(" "), true); runProcess("helm repo update".split(" ")); // ref https://github.com/redpanda-data/helm-charts/blob/main/charts/redpanda/values.yaml runProcess( - "helm install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3 --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console.enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc.cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true" + ("helm upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3" + + " --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console" + + ".enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc" + + ".cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true") .split(" ")); log.info("waiting kafka to be ready"); runProcess( @@ -704,76 +726,76 @@ private static void installKafka() { static void installMinio() { applyManifestNoNamespace( """ - # Deploys a new Namespace for the MinIO Pod - apiVersion: v1 - kind: Namespace - metadata: - name: minio-dev # Change this value if you want a different namespace name - labels: - name: minio-dev # Change this value to match metadata.name - --- - # Deploys a new MinIO Pod into the metadata.namespace Kubernetes namespace - # - # The `spec.containers[0].args` contains the command run on the pod - # The `/data` directory corresponds to the `spec.containers[0].volumeMounts[0].mountPath` - # That mount path corresponds to a Kubernetes HostPath which binds `/data` to a local drive or volume on the worker node where the pod runs - #\s - apiVersion: v1 - kind: Pod - metadata: - labels: - app: minio - name: minio - namespace: minio-dev # Change this value to match the namespace metadata.name - spec: - containers: - - name: minio - image: quay.io/minio/minio:latest - command: - - /bin/bash - - -c - args:\s - - minio server /data --console-address :9090 - volumeMounts: - - mountPath: /data - name: localvolume # Corresponds to the `spec.volumes` Persistent Volume - ports: - - containerPort: 9090 - protocol: TCP - name: console - - containerPort: 9000 - protocol: TCP - name: s3 - resources: - requests: - cpu: 50m - memory: 512Mi - volumes: - - name: localvolume - hostPath: # MinIO generally recommends using locally-attached volumes - path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node - type: DirectoryOrCreate # The path to the last directory must exist - --- - apiVersion: v1 - kind: Service - metadata: - labels: - app: minio - name: minio - namespace: minio-dev # Change this value to match the namespace metadata.name - spec: - ports: - - port: 9090 - protocol: TCP - targetPort: 9090 - name: console - - port: 9000 - protocol: TCP - targetPort: 9000 - name: s3 - selector: - app: minio - """); + # Deploys a new Namespace for the MinIO Pod + apiVersion: v1 + kind: Namespace + metadata: + name: minio-dev # Change this value if you want a different namespace name + labels: + name: minio-dev # Change this value to match metadata.name + --- + # Deploys a new MinIO Pod into the metadata.namespace Kubernetes namespace + # + # The `spec.containers[0].args` contains the command run on the pod + # The `/data` directory corresponds to the `spec.containers[0].volumeMounts[0].mountPath` + # That mount path corresponds to a Kubernetes HostPath which binds `/data` to a local drive or volume on the worker node where the pod runs + #\s + apiVersion: v1 + kind: Pod + metadata: + labels: + app: minio + name: minio + namespace: minio-dev # Change this value to match the namespace metadata.name + spec: + containers: + - name: minio + image: quay.io/minio/minio:latest + command: + - /bin/bash + - -c + args:\s + - minio server /data --console-address :9090 + volumeMounts: + - mountPath: /data + name: localvolume # Corresponds to the `spec.volumes` Persistent Volume + ports: + - containerPort: 9090 + protocol: TCP + name: console + - containerPort: 9000 + protocol: TCP + name: s3 + resources: + requests: + cpu: 50m + memory: 512Mi + volumes: + - name: localvolume + hostPath: # MinIO generally recommends using locally-attached volumes + path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node + type: DirectoryOrCreate # The path to the last directory must exist + --- + apiVersion: v1 + kind: Service + metadata: + labels: + app: minio + name: minio + namespace: minio-dev # Change this value to match the namespace metadata.name + spec: + ports: + - port: 9090 + protocol: TCP + targetPort: 9090 + name: console + - port: 9000 + protocol: TCP + targetPort: 9000 + name: s3 + selector: + app: minio + """); } protected static void withPodLogs( @@ -885,7 +907,7 @@ protected static void dumpResource(String filePrefix, HasMetadata resource) { resource.getKind(), resource.getMetadata().getName())); try (FileWriter writer = new FileWriter(outputFile)) { - writer.write(MAPPER.writeValueAsString(resource)); + writer.write(YAML_MAPPER.writeValueAsString(resource)); } catch (Throwable e) { log.error("failed to write resource to file {}", outputFile, e); } @@ -967,4 +989,104 @@ private static String execInKafkaPod(String cmd) { return execInPodInNamespace(KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" ")) .get(1, TimeUnit.MINUTES); } + + protected static void setupTenant(String tenant) { + executeCommandOnClient( + """ + bin/langstream tenants put %s && + bin/langstream configure tenant %s""" + .formatted(tenant, tenant) + .replace(System.lineSeparator(), " ") + .split(" ")); + } + + protected static void deployLocalApplication(String applicationId, String appDir) { + deployLocalApplication(applicationId, appDir, null); + } + + protected static void awaitApplicationReady( + String applicationId, int expectedRunningTotalExecutors) { + Awaitility.await() + .atMost(3, TimeUnit.MINUTES) + .pollInterval(5, TimeUnit.SECONDS) + .until(() -> isApplicationReady(applicationId, expectedRunningTotalExecutors)); + } + + protected static boolean isApplicationReady( + String applicationId, int expectedRunningTotalExecutors) { + final String response = + executeCommandOnClient( + "bin/langstream apps get %s".formatted(applicationId).split(" ")); + final List lines = response.lines().collect(Collectors.toList()); + final String appLine = lines.get(1); + final List lineAsList = + Arrays.stream(appLine.split(" ")) + .filter(s -> !s.isBlank()) + .collect(Collectors.toList()); + System.out.println("app line " + lineAsList); + if (lineAsList.size() <= 5) { + return false; + } + final String replicasReady = lineAsList.get(5); + System.out.println("replicasReady " + replicasReady); + return replicasReady.equals( + expectedRunningTotalExecutors + "/" + expectedRunningTotalExecutors); + } + + protected static void deployLocalApplication( + String applicationId, String appDir, Map env) { + String testAppsBaseDir = "src/test/resources/apps"; + String testInstanceBaseDir = "src/test/resources/instances"; + String testSecretBaseDir = "src/test/resources/secrets"; + copyFileToClientContainer(Paths.get(testAppsBaseDir, appDir).toFile(), "/tmp/app"); + copyFileToClientContainer( + Paths.get(testInstanceBaseDir, "kafka-kubernetes.yaml").toFile(), + "/tmp/instance.yaml"); + copyFileToClientContainer( + Paths.get(testSecretBaseDir, "secret1.yaml").toFile(), "/tmp/secrets.yaml"); + + String beforeCmd = ""; + if (env != null) { + beforeCmd = + env.entrySet().stream() + .map(e -> "export %s=%s".formatted(e.getKey(), e.getValue())) + .collect(Collectors.joining(" && ")); + beforeCmd += " && "; + } + + executeCommandOnClient( + (beforeCmd + + "bin/langstream apps deploy %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml") + .formatted(applicationId) + .split(" ")); + } + + protected static Map getAppEnvMapFromSystem(List names) { + final Map result = new HashMap<>(); + for (String name : names) { + result.put(name, getAppEnvFromSystem(name)); + } + return result; + } + + protected static String getAppEnvFromSystem(String name) { + + final String fromSystemProperty = System.getProperty("langstream.tests.app.env." + name); + if (fromSystemProperty != null) { + return null; + } + final String fromEnv = System.getenv("LANGSTREAM_TESTS_APP_ENV_" + name); + if (fromEnv != null) { + return fromEnv; + } + + final String fromEnvDirect = System.getenv(name); + if (fromEnvDirect != null) { + return fromEnvDirect; + } + throw new IllegalStateException( + ("failed to get app env variable %s from system or env. Possible env variables: %s, " + + "LANGSTREAM_TESTS_APP_ENV_%s. Possible system properties: langstream.tests.app.env.%s") + .formatted(name, name, name, name)); + } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java index 252f1f455..9cf7bf43e 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/CassandraSinkIT.java @@ -18,7 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.nio.file.Paths; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.SneakyThrows; @@ -47,40 +47,16 @@ public void removeCassandra() { public void test() throws Exception { installLangStreamCluster(false); final String tenant = "ten-" + System.currentTimeMillis(); - executeCommandOnClient( - """ - bin/langstream tenants put %s && - bin/langstream configure tenant %s""" - .formatted(tenant, tenant) - .replace(System.lineSeparator(), " ") - .split(" ")); - String testAppsBaseDir = "src/test/resources/apps"; - String testInstanceBaseDir = "src/test/resources/instances"; - String testSecretBaseDir = "src/test/resources/secrets"; + setupTenant(tenant); final String applicationId = "my-test-app"; String cassandraHost = "cassandra-0.cassandra." + namespace; - copyFileToClientContainer( - Paths.get(testAppsBaseDir, "cassandra-sink").toFile(), "/tmp/cassandra-sink"); - copyFileToClientContainer( - Paths.get(testInstanceBaseDir, "kafka-kubernetes.yaml").toFile(), - "/tmp/instance.yaml"); - copyFileToClientContainer( - Paths.get(testSecretBaseDir, "secret1.yaml").toFile(), - "/tmp/secrets.yaml", - file -> - file.replace("CASSANDRA-HOST-INJECTED", cassandraHost) - .replace("CASSANDRA-LOCAL-DC-INJECTED", "datacenter1") - .replace("CASSANDRA-PORT-INJECTED", "9042")); - - executeCommandOnClient( - "bin/langstream apps deploy %s -app /tmp/cassandra-sink -i /tmp/instance.yaml -s /tmp/secrets.yaml" - .formatted(applicationId) - .split(" ")); - client.apps() - .statefulSets() - .inNamespace(TENANT_NAMESPACE_PREFIX + tenant) - .withName(applicationId + "-module-1-pipeline-1-sink-1") - .waitUntilReady(4, TimeUnit.MINUTES); + final Map env = + Map.of( + "CASSANDRA_CONTACT_POINTS", cassandraHost, + "CASSANDRA_LOCAL_DC", "datacenter1", + "CASSANDRA_PORT", "9042"); + deployLocalApplication(applicationId, "cassandra-sink", env); + awaitApplicationReady(applicationId, 1); executeCommandOnClient( "bin/langstream gateway produce %s produce-input -v '{\"id\": 10, \"name\": \"test-from-sink\", \"description\": \"test-from-sink\"}'" diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java new file mode 100644 index 000000000..6805d072a --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java @@ -0,0 +1,75 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests; + +import ai.langstream.tests.util.ConsumeGatewayMessage; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@Slf4j +@ExtendWith(BaseEndToEndTest.class) +@Tag(BaseEndToEndTest.CATEGORY_NEEDS_CREDENTIALS) +public class ChatCompletionsIT extends BaseEndToEndTest { + + static Map appEnv; + + @BeforeAll + public static void checkCredentials() { + appEnv = + getAppEnvMapFromSystem( + List.of( + "OPEN_AI_ACCESS_KEY", + "OPEN_AI_URL", + "OPEN_AI_CHAT_COMPLETIONS_MODEL", + "OPEN_AI_PROVIDER")); + } + + @Test + public void test() throws Exception { + installLangStreamCluster(false); + final String tenant = "ten-" + System.currentTimeMillis(); + setupTenant(tenant); + + final String applicationId = "app"; + + deployLocalApplication(applicationId, "chat-completions", appEnv); + awaitApplicationReady(applicationId, 1); + + final String sessionId = UUID.randomUUID().toString(); + + executeCommandOnClient( + "bin/langstream gateway produce %s produce-input -v 'Who was the first president of the United States?' -p sessionId=%s" + .formatted(applicationId, sessionId) + .split(" ")); + + final ConsumeGatewayMessage message = + consumeOneMessageFromGateway( + applicationId, + "consume-history", + "-p sessionId=%s --position earliest --connect-timeout 30" + .formatted(sessionId) + .split(" ")); + log.info("Output: {}", message); + Assertions.assertTrue(message.getAnswerFromChatCompletionsValue().contains("Washington")); + } +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java index 3b9778748..875cda64a 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java @@ -18,7 +18,6 @@ import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; import io.fabric8.kubernetes.api.model.Secret; -import java.nio.file.Paths; import java.util.List; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -35,35 +34,11 @@ public class PythonFunctionIT extends BaseEndToEndTest { public void test() { installLangStreamCluster(false); final String tenant = "ten-" + System.currentTimeMillis(); - executeCommandOnClient( - """ - bin/langstream tenants put %s && - bin/langstream configure tenant %s""" - .formatted(tenant, tenant) - .replace(System.lineSeparator(), " ") - .split(" ")); - String testAppsBaseDir = "src/test/resources/apps"; - String testInstanceBaseDir = "src/test/resources/instances"; - String testSecretBaseDir = "src/test/resources/secrets"; + setupTenant(tenant); final String applicationId = "my-test-app"; - copyFileToClientContainer( - Paths.get(testAppsBaseDir, "python-processor").toFile(), "/tmp/python-processor"); - copyFileToClientContainer( - Paths.get(testInstanceBaseDir, "kafka-kubernetes.yaml").toFile(), - "/tmp/instance.yaml"); - copyFileToClientContainer( - Paths.get(testSecretBaseDir, "secret1.yaml").toFile(), "/tmp/secrets.yaml"); - - executeCommandOnClient( - "bin/langstream apps deploy %s -app /tmp/python-processor -i /tmp/instance.yaml -s /tmp/secrets.yaml" - .formatted(applicationId) - .split(" ")); + deployLocalApplication(applicationId, "python-processor"); final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; - client.apps() - .statefulSets() - .inNamespace(tenantNamespace) - .withName(applicationId + "-test-python-processor") - .waitUntilReady(4, TimeUnit.MINUTES); + awaitApplicationReady(applicationId, 1); executeCommandOnClient( "bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30" diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/ConsumeGatewayMessage.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/ConsumeGatewayMessage.java new file mode 100644 index 000000000..b1cec7681 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/ConsumeGatewayMessage.java @@ -0,0 +1,55 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ConsumeGatewayMessage { + protected static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + + @SneakyThrows + public static ConsumeGatewayMessage readValue(String line) { + return JSON_MAPPER.readValue(line, ConsumeGatewayMessage.class); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static final class Record { + private Object key; + private Object value; + private Map headers; + } + + private Record record; + private String offset; + + @SneakyThrows + public String getAnswerFromChatCompletionsValue() { + final Map chatHistoryModel = + JSON_MAPPER.readValue((String) record.getValue(), Map.class); + final String answer = chatHistoryModel.get("answer").toString(); + return answer; + } +} diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml new file mode 100644 index 000000000..502376f20 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml @@ -0,0 +1,25 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "open-ai-configuration" + name: "OpenAI Azure configuration" + configuration: + url: "{{ secrets.open-ai.url }}" + access-key: "{{ secrets.open-ai.access-key }}" + provider: "{{ secrets.open-ai.provider }}" diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml new file mode 100644 index 000000000..185fd7ac8 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/gateways.yaml @@ -0,0 +1,39 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: produce-input + type: produce + topic: input-topic + parameters: + - sessionId + produce-options: + headers: + - key: langstream-client-session-id + value-from-parameters: sessionId + + - id: consume-history + type: consume + topic: history-topic + parameters: + - sessionId + consume-options: + filters: + headers: + - key: langstream-client-session-id + value-from-parameters: sessionId + diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml new file mode 100644 index 000000000..a3b81ef4d --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml @@ -0,0 +1,42 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "input-topic" + creation-mode: create-if-not-exists + - name: "output-topic" + creation-mode: create-if-not-exists + - name: "history-topic" + creation-mode: create-if-not-exists +pipeline: + - name: "convert-to-json" + type: "document-to-json" + input: "input-topic" + configuration: + text-field: "question" + - name: "ai-chat-completions" + type: "ai-chat-completions" + output: "history-topic" + configuration: + model: "{{{secrets.open-ai.chat-completions-model}}}" + completion-field: "value.answer" + log-field: "value.prompt" + stream-to-topic: "output-topic" + stream-response-completion-field: "value" + min-chunks-per-message: 20 + messages: + - role: user + content: "You are an helpful assistant. Below you can fine a question from the user. Please try to help them the best way you can.\n\n{{% value.question}}" diff --git a/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml b/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml index 943db1e8e..b9aff840a 100644 --- a/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml +++ b/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml @@ -23,6 +23,13 @@ secrets: - name: cassandra id: cassandra data: - contact-points: CASSANDRA-HOST-INJECTED - local-dc: CASSANDRA-LOCAL-DC-INJECTED - port: CASSANDRA-PORT-INJECTED + contact-points: "${CASSANDRA_CONTACT_POINTS}" + local-dc: "${CASSANDRA_LOCAL_DC}" + port: "${CASSANDRA_PORT}" + - id: open-ai + data: + access-key: "${OPEN_AI_ACCESS_KEY}" + url: "${OPEN_AI_URL}" + provider: "${OPEN_AI_PROVIDER}" + embeddings-model: "${OPEN_AI_EMBEDDINGS_MODEL}" + chat-completions-model: "${OPEN_AI_CHAT_COMPLETIONS_MODEL}" \ No newline at end of file