diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4f3301ec3..f167a9f1a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -118,8 +118,8 @@ jobs: fi e2e-tests: name: End to End tests - runs-on: ubuntu-latest - timeout-minutes: 30 + runs-on: LangStream-4-cores + timeout-minutes: 45 steps: - name: 'Setup: checkout project' uses: actions/checkout@v2 @@ -139,6 +139,10 @@ jobs: - name: Start minikube id: minikube uses: medyagh/setup-minikube@latest + with: + cpus: 4 + memory: 8192 + kubernetes-version: 1.26.3 - uses: azure/setup-helm@v3 with: version: v3.7.0 @@ -149,9 +153,7 @@ jobs: run: | chmod +x mvnw uname -m - eval $(minikube docker-env) - ./docker/build.sh - eval $(minikube docker-env -u) + ./dev/prepare-minikube-for-e2e-tests.sh ./mvnw install -pl langstream-e2e-tests -am -DskipTests ./mvnw verify -pl langstream-e2e-tests -De2eTests diff --git a/dev/prepare-minikube-for-e2e-tests.sh b/dev/prepare-minikube-for-e2e-tests.sh new file mode 100755 index 000000000..02dc9a2f0 --- /dev/null +++ b/dev/prepare-minikube-for-e2e-tests.sh @@ -0,0 +1,21 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +minikube start --memory=8192 --cpus=4 --kubernetes-version=v1.26.3 +eval $(minikube docker-env) +./docker/build.sh +eval $(minikube docker-env -u) \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java index e1fe7223e..dab573af0 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java @@ -110,6 +110,11 @@ public void deployAsset() throws Exception { } } } + + @Override + public void deleteAsset() throws Exception { + throw new UnsupportedOperationException(); + } } private static class CassandraKeyspaceAssetManager extends BaseCassandraAssetManager { @@ -147,6 +152,11 @@ public void deployAsset() throws Exception { } } } + + @Override + public void deleteAsset() throws Exception { + throw new UnsupportedOperationException(); + } } private abstract static class BaseCassandraAssetManager implements AssetManager { @@ -203,6 +213,11 @@ public void deployAsset() throws Exception { } } } + + @Override + public void deleteAsset() throws Exception { + throw new UnsupportedOperationException(); + } } private static CassandraDataSource buildDataSource(AssetDefinition assetDefinition) { diff --git a/langstream-api/src/main/java/ai/langstream/api/model/TopicDefinition.java b/langstream-api/src/main/java/ai/langstream/api/model/TopicDefinition.java index 1be43bd6b..b6f60ced5 100644 --- a/langstream-api/src/main/java/ai/langstream/api/model/TopicDefinition.java +++ b/langstream-api/src/main/java/ai/langstream/api/model/TopicDefinition.java @@ -32,18 +32,23 @@ public class TopicDefinition { public static final String CREATE_MODE_NONE = "none"; public static final String CREATE_MODE_CREATE_IF_NOT_EXISTS = "create-if-not-exists"; + public static final String DELETE_MODE_NONE = "none"; + public static final String DELETE_MODE_DELETE = "delete"; + public TopicDefinition() { creationMode = CREATE_MODE_NONE; + deletionMode = DELETE_MODE_NONE; } public static TopicDefinition fromName(String name) { return new TopicDefinition( - name, CREATE_MODE_NONE, false, 0, null, null, Map.of(), Map.of()); + name, CREATE_MODE_NONE, DELETE_MODE_NONE, false, 0, null, null, Map.of(), Map.of()); } public TopicDefinition( String name, String creationMode, + String deletionMode, boolean implicit, int partitions, SchemaDefinition keySchema, @@ -53,6 +58,7 @@ public TopicDefinition( this(); this.name = name; this.creationMode = Objects.requireNonNullElse(creationMode, CREATE_MODE_NONE); + this.deletionMode = Objects.requireNonNullElse(deletionMode, DELETE_MODE_NONE); this.implicit = implicit; this.partitions = partitions; this.keySchema = keySchema; @@ -67,6 +73,9 @@ public TopicDefinition( @JsonProperty("creation-mode") private String creationMode; + @JsonProperty("deletion-mode") + private String deletionMode; + // Kafka Admin special configuration options private Map config; private Map options; @@ -87,10 +96,21 @@ private void validateCreationMode() { } } + private void validateDeletionMode() { + switch (deletionMode) { + case DELETE_MODE_DELETE: + case DELETE_MODE_NONE: + break; + default: + throw new IllegalArgumentException("Invalid deletion mode: " + deletionMode); + } + } + public TopicDefinition copy() { TopicDefinition copy = new TopicDefinition(); copy.setName(name); copy.setCreationMode(creationMode); + copy.setDeletionMode(deletionMode); copy.setImplicit(implicit); copy.setPartitions(partitions); copy.setKeySchema(keySchema); diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManager.java b/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManager.java index 1d4a0ab7c..7ea3da5b0 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManager.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManager.java @@ -26,5 +26,7 @@ public interface AssetManager { void deployAsset() throws Exception; + void deleteAsset() throws Exception; + default void close() throws Exception {} } diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManagerAndLoader.java b/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManagerAndLoader.java index 9a95b5e3f..e20c378af 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManagerAndLoader.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/assets/AssetManagerAndLoader.java @@ -58,6 +58,11 @@ public void deployAsset() throws Exception { executeWithContextClassloader(agentCode -> agentCode.deployAsset()); } + @Override + public void deleteAsset() throws Exception { + executeWithContextClassloader(agentCode -> agentCode.deleteAsset()); + } + @Override public void close() throws Exception { executeWithContextClassloader(agentCode -> agentCode.close()); diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java b/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java index b71841054..87f1d4eef 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java @@ -354,6 +354,7 @@ private Topic buildImplicitTopicForDeadletterQueue( new TopicDefinition( name, creationMode, + TopicDefinition.CREATE_MODE_NONE, inputTopicDefinition.isImplicit(), inputTopicDefinition.getPartitions(), inputTopicDefinition.getKeySchema(), @@ -372,18 +373,20 @@ protected Topic buildImplicitTopicForAgent( AgentConfiguration agentConfiguration, StreamingClusterRuntime streamingClusterRuntime) { // connecting two agents requires an intermediate topic - String name = "agent-" + agentConfiguration.getId() + "-input"; + final String name = "agent-" + agentConfiguration.getId() + "-input"; log.info( "Automatically creating topic {} in order to connect as input for agent {}", name, agentConfiguration.getId()); // short circuit...the Pulsar Runtime works only with Pulsar Topics on the same Pulsar // Cluster - String creationMode = TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS; + final String creationMode = TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS; + final String deletionMode = TopicDefinition.DELETE_MODE_NONE; TopicDefinition topicDefinition = new TopicDefinition( name, creationMode, + deletionMode, true, DEFAULT_PARTITIONS_FOR_IMPLICIT_TOPICS, null, diff --git a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java index ba83169bf..1f36b82bd 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java +++ b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java @@ -155,14 +155,14 @@ public Object deploy( } /** - * Delete the application instance and all the resources associated with it. + * Undeploy the application and delete all the agents. * - * @param physicalApplicationInstance the application instance + * @param tenant + * @param executionPlan the application plan * @param codeStorageArchiveId the code storage archive id */ - public void delete( - String tenant, ExecutionPlan physicalApplicationInstance, String codeStorageArchiveId) { - Application applicationInstance = physicalApplicationInstance.getApplication(); + public void delete(String tenant, ExecutionPlan executionPlan, String codeStorageArchiveId) { + Application applicationInstance = executionPlan.getApplication(); ComputeClusterRuntime clusterRuntime = registry.getClusterRuntime(applicationInstance.getInstance().computeCluster()); StreamingClusterRuntime streamingClusterRuntime = @@ -170,12 +170,31 @@ public void delete( applicationInstance.getInstance().streamingCluster()); clusterRuntime.delete( tenant, - physicalApplicationInstance, + executionPlan, streamingClusterRuntime, codeStorageArchiveId, deployContext); } + /** + * Cleanup all the resources associated with an application. + * + * @param tenant + * @param executionPlan the application instance + */ + public void cleanup(String tenant, ExecutionPlan executionPlan) { + cleanupTopics(executionPlan); + } + + private void cleanupTopics(ExecutionPlan executionPlan) { + TopicConnectionsRuntime topicConnectionsRuntime = + topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime( + executionPlan.getApplication().getInstance().streamingCluster()) + .asTopicConnectionsRuntime(); + topicConnectionsRuntime.delete(executionPlan); + } + @Override public void close() { registry.close(); diff --git a/langstream-core/src/main/java/ai/langstream/impl/parser/ModelBuilder.java b/langstream-core/src/main/java/ai/langstream/impl/parser/ModelBuilder.java index eaeb3619c..905e60167 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/parser/ModelBuilder.java +++ b/langstream-core/src/main/java/ai/langstream/impl/parser/ModelBuilder.java @@ -455,6 +455,7 @@ private static void parsePipelineFile(String filename, String content, Applicati new TopicDefinition( topicDefinition.getName(), topicDefinition.getCreationMode(), + topicDefinition.getDeletionMode(), false, topicDefinition.getPartitions(), topicDefinition.getKeySchema(), @@ -631,6 +632,9 @@ public static final class TopicDefinitionModel { @JsonProperty("creation-mode") private String creationMode; + @JsonProperty("deletion-mode") + private String deletionMode; + private SchemaDefinition schema; private int partitions = 0; diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java index 64f8f17a2..7cd8c90f2 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java @@ -66,6 +66,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtensionContext; @@ -79,6 +80,7 @@ public class BaseEndToEndTest implements TestWatcher { public static final File TEST_LOGS_DIR = new File("target", "e2e-test-logs"); protected static final String TENANT_NAMESPACE_PREFIX = "ls-tenant-"; protected static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()); + protected static final String KAFKA_NAMESPACE = "kafka-ns"; interface KubeServer { void start(); @@ -98,14 +100,7 @@ public void start() {} public void ensureImage(String image) {} @Override - public void stop() { - try (final KubernetesClient client = - new KubernetesClientBuilder() - .withConfig(Config.fromKubeconfig(kubeServer.getKubeConfig())) - .build()) { - client.namespaces().withName(namespace).delete(); - } - } + public void stop() {} @Override @SneakyThrows @@ -299,6 +294,11 @@ private static void runProcess(String[] allArgs, boolean allowFailures) public static CompletableFuture execInPod( String podName, String containerName, String... cmds) { + return execInPodInNamespace(namespace, podName, containerName, cmds); + } + + public static CompletableFuture execInPodInNamespace( + String namespace, String podName, String containerName, String... cmds) { final String cmd = String.join(" ", cmds); log.info( @@ -515,6 +515,12 @@ public void setupSingleTest() { .serverSideApply(); } + @AfterEach + public void cleanupAfterEach() { + cleanupAllEndToEndTestsNamespaces(); + execInKafkaPod("rpk topic delete -r \".*\""); + } + private static void cleanupAllEndToEndTestsNamespaces() { client.namespaces().withLabel("app", "ls-test").delete(); client.namespaces().list().getItems().stream() @@ -670,7 +676,7 @@ private static void installKafka() { client.resource( new NamespaceBuilder() .withNewMetadata() - .withName("kafka-ns") + .withName(KAFKA_NAMESPACE) .endMetadata() .build()) .serverSideApply(); @@ -846,12 +852,19 @@ private static List getAllUserNamespaces() { private static void dumpResources( String filePrefix, Class clazz, List namespaces) { - for (String namespace : namespaces) { - client.resources(clazz) - .inNamespace(namespace) - .list() - .getItems() - .forEach(resource -> dumpResource(filePrefix, resource)); + try { + for (String namespace : namespaces) { + client.resources(clazz) + .inNamespace(namespace) + .list() + .getItems() + .forEach(resource -> dumpResource(filePrefix, resource)); + } + } catch (Throwable t) { + log.warn( + "failed to dump resources of type {}: {}", + clazz.getSimpleName(), + t.getMessage()); } } @@ -922,4 +935,30 @@ protected static void dumpEvents(String filePrefix) { log.error("failed to write events logs to file {}", outputFile, e); } } + + @SneakyThrows + protected static List getAllTopicsFromKafka() { + final String result = execInKafkaPod("rpk topic list"); + if (result == null) { + throw new IllegalStateException("failed to get topics from kafka"); + } + + final List topics = new ArrayList<>(); + final List lines = result.lines().collect(Collectors.toList()); + boolean first = true; + for (String line : lines) { + if (first) { + first = false; + continue; + } + topics.add(line.split(" ")[0]); + } + return topics; + } + + @SneakyThrows + private static String execInKafkaPod(String cmd) { + return execInPodInNamespace(KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" ")) + .get(1, TimeUnit.MINUTES); + } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java index 7e19800d0..3b9778748 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java @@ -15,9 +15,14 @@ */ package ai.langstream.tests; +import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; +import io.fabric8.kubernetes.api.model.Secret; import java.nio.file.Paths; +import java.util.List; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -53,9 +58,10 @@ public void test() { "bin/langstream apps deploy %s -app /tmp/python-processor -i /tmp/instance.yaml -s /tmp/secrets.yaml" .formatted(applicationId) .split(" ")); + final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; client.apps() .statefulSets() - .inNamespace(TENANT_NAMESPACE_PREFIX + tenant) + .inNamespace(tenantNamespace) .withName(applicationId + "-test-python-processor") .waitUntilReady(4, TimeUnit.MINUTES); @@ -74,5 +80,46 @@ public void test() { output.contains( "{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\"," + "\"headers\":{}}")); + + executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" ")); + + Awaitility.await() + .atMost(1, TimeUnit.MINUTES) + .untilAsserted( + () -> { + Assertions.assertNull( + client.apps() + .statefulSets() + .inNamespace(tenantNamespace) + .withName(applicationId + "-test-python-processor") + .get()); + + Assertions.assertEquals( + 0, + client.resources(AgentCustomResource.class) + .inNamespace(tenantNamespace) + .list() + .getItems() + .size()); + + Assertions.assertEquals( + 0, + client.resources(ApplicationCustomResource.class) + .inNamespace(tenantNamespace) + .list() + .getItems() + .size()); + + Assertions.assertEquals( + 1, + client.resources(Secret.class) + .inNamespace(tenantNamespace) + .list() + .getItems() + .size()); + }); + + final List topics = getAllTopicsFromKafka(); + Assertions.assertEquals(List.of("TEST_TOPIC_0"), topics); } } diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml index b6dd7891c..697812389 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml @@ -27,6 +27,7 @@ topics: type: string - name: TEST_TOPIC_1 creation-mode: create-if-not-exists + deletion-mode: delete schema: type: string keySchema: diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java index 7c5104980..51a50a0da 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java @@ -271,7 +271,9 @@ public static Job generateSetupJob(GenerateJobParams params) { .withName(ApplicationSetupConstants.APP_SECRETS_ENV) .withValue("/app-secrets/secrets") .build()); - final List args = List.of("application-setup", "deploy"); + final String cmd = isDeleteJob ? "cleanup" : "deploy"; + + final List args = List.of("application-setup", cmd); final String containerName = "setup"; final Container container = createContainer( diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java index e54a9a0f6..7bc915809 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java @@ -382,7 +382,7 @@ void testSetupJob() { containers: - args: - application-setup - - deploy + - cleanup env: - name: LANGSTREAM_APPLICATION_SETUP_APP_CONFIGURATION value: /app-config/config diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java index cff2410c1..11e2c226f 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java @@ -89,7 +89,21 @@ protected UpdateControl patchResources( protected DeleteControl cleanupResources( ApplicationCustomResource resource, Context context) { appResourcesLimiter.onAppBeingDeleted(resource); - final Duration rescheduleDuration = handleJob(resource, false, true); + Duration rescheduleDuration = handleJob(resource, false, true); + if (rescheduleDuration == null) { + log.infof( + "deployer cleanup job for %s is completed, checking setup cleanup", + resource.getMetadata().getName()); + rescheduleDuration = handleJob(resource, true, true); + log.infof( + "setup cleanup job for %s is %s", + resource.getMetadata().getName(), + rescheduleDuration != null ? "not completed" : "completed"); + } else { + log.infof( + "deployer cleanup job for %s is not completed yet", + resource.getMetadata().getName()); + } return rescheduleDuration != null ? DeleteControl.noFinalizerRemoval().rescheduleAfter(rescheduleDuration) : DeleteControl.defaultDelete(); diff --git a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaTopicConnectionsRuntime.java b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaTopicConnectionsRuntime.java index 80edb4cfb..6e04e7db1 100644 --- a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaTopicConnectionsRuntime.java +++ b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaTopicConnectionsRuntime.java @@ -265,20 +265,30 @@ public void delete(ExecutionPlan applicationInstance) { private void deleteTopic(AdminClient admin, KafkaTopic topic) { switch (topic.createMode()) { case TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS -> { - log.info("Deleting Kafka topic {}", topic.name()); - try { - admin.deleteTopics(List.of(topic.name()), new DeleteTopicsOptions()) - .all() - .get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof UnknownTopicOrPartitionException) { - log.info("Topic {} does not exist", topic.name()); - } else { - throw e; + if (topic.deleteMode().equals(TopicDefinition.DELETE_MODE_DELETE)) { + log.info("Deleting Kafka topic {}", topic.name()); + try { + admin.deleteTopics(List.of(topic.name()), new DeleteTopicsOptions()) + .all() + .get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + log.info("Topic {} does not exist", topic.name()); + } else { + throw e; + } } + } else { + log.info( + "Keeping Kafka topic {} since deletion-mode is {}", + topic.name(), + topic.deleteMode()); } } - default -> log.info("Keeping Kafka topic {}", topic.name()); + default -> log.info( + "Keeping Kafka topic {} since creation-mode is {}", + topic.name(), + topic.createMode()); } } diff --git a/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaClusterRuntimeDockerTest.java b/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaClusterRuntimeDockerTest.java index a2ecda81b..85e92b32c 100644 --- a/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaClusterRuntimeDockerTest.java +++ b/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaClusterRuntimeDockerTest.java @@ -16,6 +16,7 @@ package ai.langstream.kafka; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import ai.langstream.api.model.Application; @@ -60,7 +61,11 @@ public void testMapKafkaTopics() throws Exception { creation-mode: create-if-not-exists - name: "input-topic-2-partitions" creation-mode: create-if-not-exists + deletion-mode: none partitions: 2 + - name: "input-topic-delete" + creation-mode: create-if-not-exists + deletion-mode: delete """), buildInstanceYaml(), null) @@ -90,6 +95,7 @@ public void testMapKafkaTopics() throws Exception { log.info("Topics {}", topics); assertTrue(topics.contains("input-topic")); assertTrue(topics.contains("input-topic-2-partitions")); + assertTrue(topics.contains("input-topic-delete")); Map stats = admin.describeTopics(Set.of("input-topic", "input-topic-2-partitions")).all().get(); @@ -97,11 +103,18 @@ public void testMapKafkaTopics() throws Exception { assertEquals(2, stats.get("input-topic-2-partitions").partitions().size()); deployer.delete("tenant", implementation, null); - // delete should only delete the agents topics = admin.listTopics().names().get(); log.info("Topics {}", topics); assertTrue(topics.contains("input-topic")); assertTrue(topics.contains("input-topic-2-partitions")); + assertTrue(topics.contains("input-topic-delete")); + + deployer.cleanup("tenant", implementation); + topics = admin.listTopics().names().get(); + log.info("Topics {}", topics); + assertTrue(topics.contains("input-topic")); + assertTrue(topics.contains("input-topic-2-partitions")); + assertFalse(topics.contains("input-topic-delete")); } private static String buildInstanceYaml() { diff --git a/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaConsumerTest.java b/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaConsumerTest.java index 25e1f9119..32f182ff8 100644 --- a/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaConsumerTest.java +++ b/langstream-kafka-runtime/src/test/java/ai/langstream/kafka/KafkaConsumerTest.java @@ -80,6 +80,7 @@ public void testKafkaConsumerCommitOffsets(int numPartitions) throws Exception { topics: - name: %s creation-mode: create-if-not-exists + deletion-mode: delete partitions: %d """ .formatted(topicName, numPartitions)), @@ -116,14 +117,7 @@ public void testKafkaConsumerCommitOffsets(int numPartitions) throws Exception { assertEquals(numPartitions, stats.get(topicName).partitions().size()); deployer.delete("tenant", implementation, null); - topics = admin.listTopics().names().get(); - log.info("Topics {}", topics); - assertTrue(topics.contains(topicName)); - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime( - implementation.getApplication().getInstance().streamingCluster()) - .asTopicConnectionsRuntime() - .delete(implementation); + deployer.cleanup("tenant", implementation); topics = admin.listTopics().names().get(); log.info("Topics {}", topics); assertFalse(topics.contains(topicName)); @@ -215,6 +209,7 @@ public void testKafkaConsumerCommitOffsetsMultiThread() throws Exception { topics: - name: %s creation-mode: create-if-not-exists + deletion-mode: delete partitions: %d """ .formatted(topicName, numPartitions)), @@ -250,16 +245,9 @@ public void testKafkaConsumerCommitOffsetsMultiThread() throws Exception { Map stats = admin.describeTopics(Set.of(topicName)).all().get(); assertEquals(numPartitions, stats.get(topicName).partitions().size()); + deployer.cleanup("tenant", implementation); deployer.delete("tenant", implementation, null); - topics = admin.listTopics().names().get(); - log.info("Topics {}", topics); - assertTrue(topics.contains(topicName)); - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime( - implementation.getApplication().getInstance().streamingCluster()) - .asTopicConnectionsRuntime() - .delete(implementation); topics = admin.listTopics().names().get(); log.info("Topics {}", topics); assertFalse(topics.contains(topicName)); @@ -322,6 +310,7 @@ public void testRestartConsumer() throws Exception { topics: - name: %s creation-mode: create-if-not-exists + deletion-mode: delete partitions: %d """ .formatted(topicName, numPartitions)), @@ -357,16 +346,8 @@ public void testRestartConsumer() throws Exception { Map stats = admin.describeTopics(Set.of(topicName)).all().get(); assertEquals(numPartitions, stats.get(topicName).partitions().size()); - topics = admin.listTopics().names().get(); - log.info("Topics {}", topics); - assertTrue(topics.contains(topicName)); - - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime( - implementation.getApplication().getInstance().streamingCluster()) - .asTopicConnectionsRuntime() - .delete(implementation); deployer.delete("tenant", implementation, null); + deployer.cleanup("tenant", implementation); topics = admin.listTopics().names().get(); log.info("Topics {}", topics); assertFalse(topics.contains(topicName)); @@ -423,6 +404,7 @@ public void testMultipleSchemas() throws Exception { topics: - name: %s creation-mode: create-if-not-exists + deletion-mode: delete partitions: %d """ .formatted(topicName, numPartitions)), @@ -459,15 +441,8 @@ public void testMultipleSchemas() throws Exception { assertEquals(numPartitions, stats.get(topicName).partitions().size()); deployer.delete("tenant", implementation, null); - topics = admin.listTopics().names().get(); - log.info("Topics {}", topics); - assertTrue(topics.contains(topicName)); + deployer.cleanup("tenant", implementation); - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime( - implementation.getApplication().getInstance().streamingCluster()) - .asTopicConnectionsRuntime() - .delete(implementation); topics = admin.listTopics().names().get(); log.info("Topics {}", topics); assertFalse(topics.contains(topicName)); diff --git a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java index d5d7c0233..b593864e4 100644 --- a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java +++ b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java @@ -57,6 +57,7 @@ public Topic createTopicImplementation( topicDefinition.getKeySchema(), topicDefinition.getValueSchema(), creationMode, + topicDefinition.getDeletionMode(), topicDefinition.isImplicit(), configs, options); diff --git a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaTopic.java b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaTopic.java index d34707826..9900e6088 100644 --- a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaTopic.java +++ b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaTopic.java @@ -31,6 +31,7 @@ public record KafkaTopic( SchemaDefinition keySchema, SchemaDefinition valueSchema, String createMode, + String deleteMode, boolean implicit, Map config, Map options) diff --git a/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java b/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java index 4579efb32..33fdeaecd 100644 --- a/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java +++ b/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java @@ -278,11 +278,22 @@ private static void deleteTopic(PulsarAdmin admin, PulsarTopic topic) switch (topic.createMode()) { case TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS -> {} default -> { - log.info("Keeping Pulsar topic {}", topic.name()); + log.info( + "Keeping Pulsar topic {} since creation-mode is {}", + topic.name(), + topic.createMode()); return; } } + if (!topic.deleteMode().equals(TopicDefinition.DELETE_MODE_DELETE)) { + log.info( + "Keeping Pulsar topic {} since deletion-mode is {}", + topic.name(), + topic.deleteMode()); + return; + } + String topicName = topic.name().tenant() + "/" diff --git a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java index eef482f26..9f946f714 100644 --- a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java +++ b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java @@ -52,6 +52,7 @@ public Topic createTopicImplementation( keySchema, valueSchema, creationMode, + topicDefinition.getDeletionMode(), topicDefinition.isImplicit()); } diff --git a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarTopic.java b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarTopic.java index 5b15687e7..6f3f76939 100644 --- a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarTopic.java +++ b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarTopic.java @@ -27,6 +27,7 @@ public record PulsarTopic( SchemaDefinition keySchema, SchemaDefinition valueSchema, String createMode, + String deleteMode, boolean implicit) implements ConnectionImplementation, Topic { diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java index 59bf502a0..a5055f1be 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java @@ -27,6 +27,7 @@ import ai.langstream.impl.deploy.ApplicationDeployer; import ai.langstream.impl.nar.NarFileHandler; import ai.langstream.runtime.api.application.ApplicationSetupConfiguration; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.file.Path; import java.util.List; @@ -51,28 +52,59 @@ public void runSetup( final String applicationId = configuration.getApplicationId(); log.info("Setup application {}", applicationId); - final String applicationConfig = configuration.getApplication(); - - final Application appInstance = MAPPER.readValue(applicationConfig, Application.class); - appInstance.setSecrets(secrets); + final Application appInstance = parseApplicationInstance(configuration, secrets); - try (NarFileHandler narFileHandler = - new NarFileHandler( - packagesDirectory, - List.of(), - Thread.currentThread().getContextClassLoader())) { + try (NarFileHandler narFileHandler = getNarFileHandler(packagesDirectory)) { narFileHandler.scan(); try (ApplicationDeployer deployer = buildDeployer(clusterRuntimeConfiguration, narFileHandler)) { final ExecutionPlan executionPlan = deployer.createImplementation(applicationId, appInstance); - deployer.setup(configuration.getTenant(), executionPlan); log.info("Application {} setup done", applicationId); } } } + public void runCleanup( + Map> clusterRuntimeConfiguration, + ApplicationSetupConfiguration configuration, + Secrets secrets, + Path packagesDirectory) + throws Exception { + + final String applicationId = configuration.getApplicationId(); + log.info("Cleanup application {}", applicationId); + final Application appInstance = parseApplicationInstance(configuration, secrets); + + try (NarFileHandler narFileHandler = getNarFileHandler(packagesDirectory)) { + narFileHandler.scan(); + try (ApplicationDeployer deployer = + buildDeployer(clusterRuntimeConfiguration, narFileHandler)) { + final ExecutionPlan executionPlan = + deployer.createImplementation(applicationId, appInstance); + + deployer.cleanup(configuration.getTenant(), executionPlan); + log.info("Application {} cleanup done", applicationId); + } + } + } + + private NarFileHandler getNarFileHandler(Path packagesDirectory) throws Exception { + return new NarFileHandler( + packagesDirectory, List.of(), Thread.currentThread().getContextClassLoader()); + } + + private Application parseApplicationInstance( + ApplicationSetupConfiguration configuration, Secrets secrets) + throws JsonProcessingException { + final String applicationConfig = configuration.getApplication(); + + final Application appInstance = MAPPER.readValue(applicationConfig, Application.class); + appInstance.setSecrets(secrets); + return appInstance; + } + private ApplicationDeployer buildDeployer( Map> clusterRuntimeConfiguration, NarFileHandler narFileHandler) { diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunnerStarter.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunnerStarter.java index d71edf83d..181c90fa1 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunnerStarter.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunnerStarter.java @@ -89,6 +89,8 @@ public void start(String... args) throws Exception { switch (arg0) { case "deploy" -> applicationSetupRunner.runSetup( clusterRuntimeConfiguration, configuration, secrets, packagesDirectory); + case "cleanup" -> applicationSetupRunner.runCleanup( + clusterRuntimeConfiguration, configuration, secrets, packagesDirectory); default -> throw new IllegalArgumentException("Unknown command " + arg0); } } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java index ad9b23ac2..54a07b617 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java @@ -78,5 +78,14 @@ public synchronized void deployAsset() throws Exception { } DEPLOYED_ASSETS.add(assetDefinition); } + + @Override + public synchronized void deleteAsset() throws Exception { + log.info("Deleting asset {}", assetDefinition); + final boolean remove = DEPLOYED_ASSETS.remove(assetDefinition); + if (!remove) { + throw new IllegalStateException("Asset not found: " + assetDefinition); + } + } } } diff --git a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceValidateUpdateTest.java b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceValidateUpdateTest.java index b27688ce9..f44c0a638 100644 --- a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceValidateUpdateTest.java +++ b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceValidateUpdateTest.java @@ -41,41 +41,41 @@ void testTopics() { checkTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), true); checkTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic1", null, null, 0, null, null, null)), + "input-topic1", null, null, null, 0, null, null, null)), false); checkTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null), + "input-topic", null, null, null, 0, null, null, null), new ModelBuilder.TopicDefinitionModel( - "input-topic1", null, null, 0, null, null, null)), + "input-topic1", null, null, null, 0, null, null, null)), false); checkTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null), + "input-topic", null, null, null, 0, null, null, null), new ModelBuilder.TopicDefinitionModel( - "input-topic1", null, null, 0, null, null, null)), + "input-topic1", null, null, null, 0, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), false); checkTopics( @@ -84,6 +84,7 @@ void testTopics() { "input-topic", TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS, null, + null, 0, null, null, @@ -93,6 +94,7 @@ void testTopics() { "input-topic", TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS, null, + null, 0, null, null, @@ -105,13 +107,14 @@ void testTopics() { "input-topic", TopicDefinition.CREATE_MODE_NONE, null, + null, 0, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), true); checkTopics( @@ -119,6 +122,7 @@ void testTopics() { new ModelBuilder.TopicDefinitionModel( "input-topic", null, + null, new SchemaDefinition("avro", "{}", null), 0, null, @@ -126,7 +130,7 @@ void testTopics() { null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), false); checkTopics( @@ -134,6 +138,7 @@ void testTopics() { new ModelBuilder.TopicDefinitionModel( "input-topic", null, + null, new SchemaDefinition("avro", "{}", null), 0, null, @@ -143,6 +148,7 @@ void testTopics() { new ModelBuilder.TopicDefinitionModel( "input-topic", null, + null, new SchemaDefinition("json", "{}", null), 0, null, @@ -155,6 +161,7 @@ void testTopics() { new ModelBuilder.TopicDefinitionModel( "input-topic", null, + null, new SchemaDefinition("avro", "{}", null), 0, null, @@ -164,6 +171,7 @@ void testTopics() { new ModelBuilder.TopicDefinitionModel( "input-topic", null, + null, new SchemaDefinition("avro", "{schema:true}", null), 0, null, @@ -174,19 +182,19 @@ void testTopics() { checkTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 1, null, null, null)), + "input-topic", null, null, null, 1, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null)), + "input-topic", null, null, null, 0, null, null, null)), false); checkTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 1, null, null, null)), + "input-topic", null, null, null, 1, null, null, null)), List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 2, null, null, null)), + "input-topic", null, null, null, 2, null, null, null)), false); } @@ -659,9 +667,9 @@ private static ExecutionPlan buildPlanWithModels(List a pipelineFileModel.setTopics( List.of( new ModelBuilder.TopicDefinitionModel( - "input-topic", null, null, 0, null, null, null), + "input-topic", null, null, null, 0, null, null, null), new ModelBuilder.TopicDefinitionModel( - "output-topic", null, null, 0, null, null, null))); + "output-topic", null, null, null, 0, null, null, null))); pipelineFileModel.setPipeline(agents); Application applicationInstance = ModelBuilder.buildApplicationInstance(