Skip to content

Commit

Permalink
fix: ksqldb delele artefacts delete order, first tables then streams,…
Browse files Browse the repository at this point in the history
… the reverse of the creation (#392)

* in ksql artefact manager, compile the list of artefacts to be orders in reverse order as sorted before this way tables will go first and then streams

* add sample example test for ksql create and delete with dependencies
  • Loading branch information
purbon committed Dec 9, 2021
1 parent 562e148 commit 374f44b
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 6 deletions.
21 changes: 21 additions & 0 deletions example/descriptor-ksql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
context: "context"
projects:
- name: "foo"
ksql:
artefacts:
streams:
- path: "ksql/riderlocations.sql"
name: "riderLocations"
- path: "ksql/orders.sql"
name: "ordersStream"
tables:
- path: "ksql/users.sql"
name: "users"
- path: "ksql/moreOrders.sql"
name: "moreOrders"
topics:
- name: "foo"
config:
replication.factor: "1"
num.partitions: "1"
5 changes: 5 additions & 0 deletions example/ksql/moreOrders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE OR REPLACE TABLE MOREORDERS AS
SELECT STRUCT(TransactionMainId := TransactionId, TransactionMainDate := TransactionMainDate) as TX, COLLECT_LIST(Number)[1] as Number, COLLECT_LIST(Date)[1] as Date, COLLECT_LIST(State)[1] as State
FROM ORDERSSTREAM
GROUP BY STRUCT(TransactionMainId := TransactionId, TransactionMainDate := TransactionMainDate)
EMIT CHANGES;
13 changes: 13 additions & 0 deletions example/ksql/orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE OR REPLACE STREAM ordersStream (
transactionId BIGINT KEY,
transactionMainDate BIGINT KEY,
number BIGINT,
date BIGINT,
state VARCHAR
) WITH (
KAFKA_TOPIC = 'more-orders-avro',
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON',
PARTITIONS=1,
REPLICAS=1
);
2 changes: 1 addition & 1 deletion example/ksql/riderlocations.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);
CREATE OR REPLACE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);
2 changes: 1 addition & 1 deletion example/ksql/users.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE users ( id BIGINT PRIMARY KEY, usertimestamp BIGINT, gender VARCHAR, region_id VARCHAR)
CREATE OR REPLACE TABLE users ( id BIGINT PRIMARY KEY, usertimestamp BIGINT, gender VARCHAR, region_id VARCHAR)
WITH (
KAFKA_TOPIC = 'my-users-topic',
KEY_FORMAT='KAFKA', PARTITIONS=2, REPLICAS=1,
Expand Down
2 changes: 2 additions & 0 deletions example/topology-builder-plain.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
topology.state.cluster.enabled=false
platform.server.ksql.url=http://localhost:8088
12 changes: 8 additions & 4 deletions src/main/java/com/purbon/kafka/topology/ArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ public void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies) thr
}

if (isAllowDelete()) {
List<? extends Artefact> toBeDeleted =
currentArtefacts.stream()
.filter(a -> !artefacts.contains(a))
.collect(Collectors.toList());
List<? extends Artefact> toBeDeleted = findArtefactsToBeDeleted(currentArtefacts, artefacts);

if (toBeDeleted.size() > 0) {
LOGGER.debug("Artefacts to be deleted: " + StringUtils.join(toBeDeleted, ","));
Expand All @@ -79,6 +76,13 @@ public void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies) thr
}
}

protected List<? extends Artefact> findArtefactsToBeDeleted(
Collection<? extends Artefact> currentArtefacts, Set<Artefact> artefacts) {
return currentArtefacts.stream()
.filter(a -> !artefacts.contains(a))
.collect(Collectors.toList());
}

protected ArtefactClient selectClient(Artefact artefact) {
ArtefactClient defaultClient = clients.containsKey("default") ? clients.get("default") : null;
return clients.getOrDefault(artefact.getServerLabel(), defaultClient);
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.Project;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.artefact.KsqlArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlArtefacts;
import com.purbon.kafka.topology.model.artefact.KsqlStreamArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlTableArtefact;
Expand All @@ -15,6 +16,7 @@
import java.nio.file.Paths;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -47,6 +49,15 @@ Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan p
return config.fetchStateFromTheCluster() ? getClustersState() : plan.getKSqlArtefacts();
}

@Override
protected List<? extends Artefact> findArtefactsToBeDeleted(
Collection<? extends Artefact> currentArtefacts, Set<Artefact> artefacts) {
return currentArtefacts.stream()
.filter(a -> !artefacts.contains(a))
.sorted((o1, o2) -> -1 * ((KsqlArtefact) o1).compareTo((KsqlArtefact) o2))
.collect(Collectors.toCollection(LinkedList::new));
}

private Collection<? extends Artefact> getClustersState() throws IOException {
List<Either> list =
clients.values().stream()
Expand Down

0 comments on commit 374f44b

Please sign in to comment.