Skip to content

Commit

Permalink
Add topic description on Confluent Cloud (#383)
Browse files Browse the repository at this point in the history
* Description feature

* Description feature

* Typo fixes

* Style fix 1

* Style fix 2

* Style fix 3

* Style fix 4

* IsConfluent fix for TU

* Style fix 5

* TU fix

* TU fix 2

* TU fix 3

* TU fix 4

* TU fix 5

* TU fix 5

* TU fix 6

* TU fix 7

* TU style fix

* TU fix

* TU fix 2

* TU fix 3

* TU fix 4

* TU fix 5

* Add new TU

* Fix TU

* Widen coverage, cleanup and status update fix for tag addition

* Remove unsued imports

* Rework: type and null possibility for description variable, iteration in topics, and persist topic status update

* Cleanup (unused imports, variables, code)

* unused imports

* switch createTags arguments & update TU

* unused import

* put log.info and log.error earlier for more coverage & TU updates

* Update response POJO + primitive boolean +  description default null fix + enrich topics with description and tags merged

* enriching topic optimized, topic entity renamed, TU fixed, unused imports removed

---------

Co-authored-by: thcai <thomas.cai_ext@michelin.com>
  • Loading branch information
ThomasCAI-mlv and ThomasCAI-mlv authored Mar 22, 2024
1 parent bd6300e commit 94b76bb
Show file tree
Hide file tree
Showing 15 changed files with 791 additions and 219 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/michelin/ns4kafka/models/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static class TopicSpec {
@Builder.Default
@JsonSetter(nulls = Nulls.AS_EMPTY)
private List<String> tags = new ArrayList<>();
private String description;
private Map<String, String> configs;
}

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/michelin/ns4kafka/services/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.michelin.ns4kafka.utils.FormatErrorUtils.invalidTopicCleanupPolicy;
import static com.michelin.ns4kafka.utils.FormatErrorUtils.invalidTopicDeleteRecords;
import static com.michelin.ns4kafka.utils.FormatErrorUtils.invalidTopicTags;
import static com.michelin.ns4kafka.utils.FormatErrorUtils.invalidTopicTagsFormat;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE;
Expand Down Expand Up @@ -318,6 +319,18 @@ public Map<TopicPartition, Long> deleteRecords(Topic topic, Map<TopicPartition,
}
}

/**
* Check if all topic tags respect confluent format (starts with letter followed by alphanumerical or underscore).
*
* @param topic The topic which contains tags
* @return true if yes, false otherwise
*/
public boolean isTagsFormatValid(Topic topic) {
return topic.getSpec().getTags()
.stream()
.allMatch(tag -> tag.matches("^[a-zA-Z]\\w*$"));
}

/**
* Validate tags for topic.
*
Expand All @@ -338,6 +351,11 @@ public List<String> validateTags(Namespace namespace, Topic topic) {
return validationErrors;
}

if (!isTagsFormatValid(topic)) {
validationErrors.add(invalidTopicTagsFormat(String.join(",", topic.getSpec().getTags())));
return validationErrors;
}

return validationErrors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaRequest;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.TagEntities;
import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo;
import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo;
import com.michelin.ns4kafka.services.clients.schema.entities.TopicDescriptionUpdateBody;
import com.michelin.ns4kafka.services.clients.schema.entities.TopicDescriptionUpdateResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.TopicListResponse;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
Expand Down Expand Up @@ -204,37 +206,6 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class));
}

/**
* List tags.
*
* @param kafkaCluster The Kafka cluster
* @return A list of tags
*/
public Mono<List<TagInfo>> getTags(String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}


/**
* List tags of a topic.
*
* @param kafkaCluster The Kafka cluster
* @return A list of tags
*/
public Mono<TagEntities> getTopicWithTags(String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/search/basic?type=kafka_topic&tag=*")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, TagEntities.class));
}

/**
* Add a tag to a topic.
*
Expand All @@ -259,7 +230,7 @@ public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopic
* @param kafkaCluster The Kafka cluster
* @return Information about created tags
*/
public Mono<List<TagInfo>> createTags(List<TagInfo> tags, String kafkaCluster) {
public Mono<List<TagInfo>> createTags(String kafkaCluster, List<TagInfo> tags) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")), tags)
Expand All @@ -285,6 +256,39 @@ public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entity
return Mono.from(httpClient.exchange(request, Void.class));
}

/**
* List topics with catalog info, including tag & description.
*
* @param kafkaCluster The Kafka cluster
* @return A list of description
*/
public Mono<TopicListResponse> getTopicWithCatalogInfo(String kafkaCluster, int limit, int offset) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/search/basic?type=kafka_topic&limit="
+ limit + "&offset=" + offset)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, TopicListResponse.class));
}

/**
* Update a topic description.
*
* @param kafkaCluster The Kafka cluster
* @param body The body passed to the request
* @return Information about description
*/
public Mono<HttpResponse<TopicDescriptionUpdateResponse>> updateDescription(String kafkaCluster,
TopicDescriptionUpdateBody body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.PUT(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/entity")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.exchange(request, TopicDescriptionUpdateResponse.class));
}

/**
* Get the schema registry of the given Kafka cluster.
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Builder;

/**
* Topic description update body's entity's information.
*
* @param qualifiedName topic entity name
* @param description topic description
*/
@Builder
@JsonInclude
public record TopicDescriptionUpdateAttributes(String qualifiedName, String description) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

/**
* Topic description update body.
*
* @param entity entity
*/
@Builder
public record TopicDescriptionUpdateBody(TopicDescriptionUpdateEntity entity) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

/**
* Topic description update body's entity.
*
* @param attributes attributes of the topic
* @param typeName topic type name
*/
@Builder
public record TopicDescriptionUpdateEntity(TopicDescriptionUpdateAttributes attributes, String typeName) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

/**
* Update topic description response.
*
*/
@Builder
public record TopicDescriptionUpdateResponse() {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import java.util.List;
import lombok.Builder;

/**
* Topics list response's entity.
*
* @param attributes attributes of the topic
*/
@Builder
public record TopicEntity(TopicEntityAttributes attributes, List<String> classificationNames) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

/**
* Topics list response's entity's attributes.
*
* @param qualifiedName topic entity name
* @param description topic description if any
* @param name topic name
*/
@Builder
public record TopicEntityAttributes(String qualifiedName, String description, String name) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import lombok.Builder;

/**
* Tag entities.
* Topics list response.
*
* @param entities List of entities
*
* @param entities List of Tag entity
*/
@Builder
public record TagEntities(List<TagEntity> entities) {
public record TopicListResponse(List<TopicEntity> entities) {

}
Loading

0 comments on commit 94b76bb

Please sign in to comment.