Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce deletion-mode for topics #418

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ jobs:
fi
e2e-tests:
name: End to End tests
runs-on: ubuntu-latest
timeout-minutes: 30
runs-on: LangStream-4-cores
timeout-minutes: 45
steps:
- name: 'Setup: checkout project'
uses: actions/checkout@v2
Expand All @@ -139,6 +139,10 @@ jobs:
- name: Start minikube
id: minikube
uses: medyagh/setup-minikube@latest
with:
cpus: 4
memory: 8192
kubernetes-version: 1.26.3
- uses: azure/setup-helm@v3
with:
version: v3.7.0
Expand All @@ -149,9 +153,7 @@ jobs:
run: |
chmod +x mvnw
uname -m
eval $(minikube docker-env)
./docker/build.sh
eval $(minikube docker-env -u)
./dev/prepare-minikube-for-e2e-tests.sh
./mvnw install -pl langstream-e2e-tests -am -DskipTests
./mvnw verify -pl langstream-e2e-tests -De2eTests

Expand Down
21 changes: 21 additions & 0 deletions dev/prepare-minikube-for-e2e-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

minikube start --memory=8192 --cpus=4 --kubernetes-version=v1.26.3
eval $(minikube docker-env)
./docker/build.sh
eval $(minikube docker-env -u)
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void deployAsset() throws Exception {
}
}
}

@Override
public void deleteAsset() throws Exception {
throw new UnsupportedOperationException();
}
}

private static class CassandraKeyspaceAssetManager extends BaseCassandraAssetManager {
Expand Down Expand Up @@ -147,6 +152,11 @@ public void deployAsset() throws Exception {
}
}
}

@Override
public void deleteAsset() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a "default" method implementation here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is not actually called, I'll follow up in the next pr

throw new UnsupportedOperationException();
}
}

private abstract static class BaseCassandraAssetManager implements AssetManager {
Expand Down Expand Up @@ -203,6 +213,11 @@ public void deployAsset() throws Exception {
}
}
}

@Override
public void deleteAsset() throws Exception {
throw new UnsupportedOperationException();
}
}

private static CassandraDataSource buildDataSource(AssetDefinition assetDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ public class TopicDefinition {
public static final String CREATE_MODE_NONE = "none";
public static final String CREATE_MODE_CREATE_IF_NOT_EXISTS = "create-if-not-exists";

public static final String DELETE_MODE_NONE = "none";
public static final String DELETE_MODE_DELETE = "delete";

public TopicDefinition() {
creationMode = CREATE_MODE_NONE;
deletionMode = DELETE_MODE_NONE;
}

public static TopicDefinition fromName(String name) {
return new TopicDefinition(
name, CREATE_MODE_NONE, false, 0, null, null, Map.of(), Map.of());
name, CREATE_MODE_NONE, DELETE_MODE_NONE, false, 0, null, null, Map.of(), Map.of());
}

public TopicDefinition(
String name,
String creationMode,
String deletionMode,
boolean implicit,
int partitions,
SchemaDefinition keySchema,
Expand All @@ -53,6 +58,7 @@ public TopicDefinition(
this();
this.name = name;
this.creationMode = Objects.requireNonNullElse(creationMode, CREATE_MODE_NONE);
this.deletionMode = Objects.requireNonNullElse(deletionMode, DELETE_MODE_NONE);
this.implicit = implicit;
this.partitions = partitions;
this.keySchema = keySchema;
Expand All @@ -67,6 +73,9 @@ public TopicDefinition(
@JsonProperty("creation-mode")
private String creationMode;

@JsonProperty("deletion-mode")
private String deletionMode;

// Kafka Admin special configuration options
private Map<String, Object> config;
private Map<String, Object> options;
Expand All @@ -87,10 +96,21 @@ private void validateCreationMode() {
}
}

private void validateDeletionMode() {
switch (deletionMode) {
case DELETE_MODE_DELETE:
case DELETE_MODE_NONE:
break;
default:
throw new IllegalArgumentException("Invalid deletion mode: " + deletionMode);
}
}

public TopicDefinition copy() {
TopicDefinition copy = new TopicDefinition();
copy.setName(name);
copy.setCreationMode(creationMode);
copy.setDeletionMode(deletionMode);
copy.setImplicit(implicit);
copy.setPartitions(partitions);
copy.setKeySchema(keySchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ public interface AssetManager {

void deployAsset() throws Exception;

void deleteAsset() throws Exception;

default void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public void deployAsset() throws Exception {
executeWithContextClassloader(agentCode -> agentCode.deployAsset());
}

@Override
public void deleteAsset() throws Exception {
executeWithContextClassloader(agentCode -> agentCode.deleteAsset());
}

@Override
public void close() throws Exception {
executeWithContextClassloader(agentCode -> agentCode.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ private Topic buildImplicitTopicForDeadletterQueue(
new TopicDefinition(
name,
creationMode,
TopicDefinition.CREATE_MODE_NONE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be the same as the original topic, the same way we do for "creationMode"

inputTopicDefinition.isImplicit(),
inputTopicDefinition.getPartitions(),
inputTopicDefinition.getKeySchema(),
Expand All @@ -372,18 +373,20 @@ protected Topic buildImplicitTopicForAgent(
AgentConfiguration agentConfiguration,
StreamingClusterRuntime streamingClusterRuntime) {
// connecting two agents requires an intermediate topic
String name = "agent-" + agentConfiguration.getId() + "-input";
final String name = "agent-" + agentConfiguration.getId() + "-input";
log.info(
"Automatically creating topic {} in order to connect as input for agent {}",
name,
agentConfiguration.getId());
// short circuit...the Pulsar Runtime works only with Pulsar Topics on the same Pulsar
// Cluster
String creationMode = TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS;
final String creationMode = TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS;
final String deletionMode = TopicDefinition.DELETE_MODE_NONE;
TopicDefinition topicDefinition =
new TopicDefinition(
name,
creationMode,
deletionMode,
true,
DEFAULT_PARTITIONS_FOR_IMPLICIT_TOPICS,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,46 @@ public Object deploy(
}

/**
* Delete the application instance and all the resources associated with it.
* Undeploy the application and delete all the agents.
*
* @param physicalApplicationInstance the application instance
* @param tenant
* @param executionPlan the application plan
* @param codeStorageArchiveId the code storage archive id
*/
public void delete(
String tenant, ExecutionPlan physicalApplicationInstance, String codeStorageArchiveId) {
Application applicationInstance = physicalApplicationInstance.getApplication();
public void delete(String tenant, ExecutionPlan executionPlan, String codeStorageArchiveId) {
Application applicationInstance = executionPlan.getApplication();
ComputeClusterRuntime clusterRuntime =
registry.getClusterRuntime(applicationInstance.getInstance().computeCluster());
StreamingClusterRuntime streamingClusterRuntime =
registry.getStreamingClusterRuntime(
applicationInstance.getInstance().streamingCluster());
clusterRuntime.delete(
tenant,
physicalApplicationInstance,
executionPlan,
streamingClusterRuntime,
codeStorageArchiveId,
deployContext);
}

/**
* Cleanup all the resources associated with an application.
*
* @param tenant
* @param executionPlan the application instance
*/
public void cleanup(String tenant, ExecutionPlan executionPlan) {
cleanupTopics(executionPlan);
}

private void cleanupTopics(ExecutionPlan executionPlan) {
TopicConnectionsRuntime topicConnectionsRuntime =
topicConnectionsRuntimeRegistry
.getTopicConnectionsRuntime(
executionPlan.getApplication().getInstance().streamingCluster())
.asTopicConnectionsRuntime();
topicConnectionsRuntime.delete(executionPlan);
}

@Override
public void close() {
registry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ private static void parsePipelineFile(String filename, String content, Applicati
new TopicDefinition(
topicDefinition.getName(),
topicDefinition.getCreationMode(),
topicDefinition.getDeletionMode(),
false,
topicDefinition.getPartitions(),
topicDefinition.getKeySchema(),
Expand Down Expand Up @@ -631,6 +632,9 @@ public static final class TopicDefinitionModel {
@JsonProperty("creation-mode")
private String creationMode;

@JsonProperty("deletion-mode")
private String deletionMode;

private SchemaDefinition schema;

private int partitions = 0;
Expand Down
Loading
Loading