Skip to content

Commit

Permalink
feat(provider/aiven): add support for managing topic
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 22, 2024
1 parent ac08662 commit acd7e69
Show file tree
Hide file tree
Showing 50 changed files with 1,061 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,28 @@ public String getKind() {
return Optional.ofNullable(kind).orElse(Resource.getKind(this.getClass()));
}

/** {@inheritDoc} **/
@Override
public GenericResourceChange withApiVersion(final String apiVersion) {
return new GenericResourceChange(
apiVersion,
kind,
metadata,
spec
);
}

/** {@inheritDoc} **/
@Override
public GenericResourceChange withKind(final String kind) {
return new GenericResourceChange(
apiVersion,
kind,
metadata,
spec
);
}

/**
* {@inheritDoc}
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ public interface ResourceChange extends HasSpec<ResourceChangeSpec>, Change {

String RESOURCE_CHANGE_KIND_SUFFIX = "Change";

/**
* Gets a new objects with the given kind.
*
* @param apiVersion the kind of the resource.
* @return a new {@link Resource}.
*/
ResourceChange withApiVersion(final String apiVersion);

/**
* Gets a new objects with the given kind.
*
* @param kind the kind of the resource.
* @return a new {@link Resource}.
*/
ResourceChange withKind(final String kind);

/** {@inheritDoc} **/
@Override
@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public List<R> computeChanges(Iterable<V> actualStates,
}

@NotNull
private Map<K, V> groupById(Iterable<V> actualStates) {
private Map<K, V> groupById(final Iterable<V> actualStates) {
if (actualStates == null) {
return Collections.emptyMap();
}
Expand All @@ -77,8 +77,8 @@ private Map<K, V> groupById(Iterable<V> actualStates) {
.collect(Collectors.toMap(keyMapper::apply, Function.identity(), this::duplicateKeyException, LinkedHashMap::new));
}

public List<R> computeChanges(Map<K, V> actualStatesByID,
Map<K, V> expectStatesByID) {
public List<R> computeChanges(final Map<K, V> actualStatesByID,
final Map<K, V> expectStatesByID) {

int maxTotalStates = actualStatesByID.size() + expectStatesByID.size();
List<BeforeAndAfter<K, V>> joined = new ArrayList<>(maxTotalStates);
Expand All @@ -93,13 +93,13 @@ public List<R> computeChanges(Map<K, V> actualStatesByID,
}

@NotNull
private Stream<? extends R> createChange(BeforeAndAfter<K, V> states) {
private Stream<? extends R> createChange(final BeforeAndAfter<K, V> states) {
return changeFactory.createChange(states.key(), states.before(), states.after()).stream();
}

@NotNull
private List<BeforeAndAfter<K, V>> leftJoinWhereRightIsNull(Map<K, V> leftStatesByID,
Map<K, V> rightStatesByID) {
private List<BeforeAndAfter<K, V>> leftJoinWhereRightIsNull(final Map<K, V> leftStatesByID,
final Map<K, V> rightStatesByID) {
return leftStatesByID.entrySet()
.stream()
.map(left -> {
Expand All @@ -112,8 +112,8 @@ private List<BeforeAndAfter<K, V>> leftJoinWhereRightIsNull(Map<K, V> leftStates


@NotNull
private List<BeforeAndAfter<K, V>> rightJoin(Map<K, V> leftStatesByID,
Map<K, V> rightStatesByID) {
private List<BeforeAndAfter<K, V>> rightJoin(final Map<K, V> leftStatesByID,
final Map<K, V> rightStatesByID) {
return rightStatesByID.entrySet()
.stream()
.map(right -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public DefaultChangeComputerBuilder<I, T, R> withDeleteOrphans(boolean isDeleteO
* {@inheritDoc}
**/
@Override
public DefaultChangeComputerBuilder<I, T, R> withKeyMapper(KeyMapper<T, I> keyMapper) {
public DefaultChangeComputerBuilder<I, T, R> withKeyMapper(final KeyMapper<T, I> keyMapper) {
this.keyMapper = keyMapper;
return this;
}
Expand All @@ -45,7 +45,7 @@ public DefaultChangeComputerBuilder<I, T, R> withKeyMapper(KeyMapper<T, I> keyMa
* {@inheritDoc}
**/
@Override
public DefaultChangeComputerBuilder<I, T, R> withChangeFactory(ChangeFactory<I, T, R> changeFactory) {
public DefaultChangeComputerBuilder<I, T, R> withChangeFactory(final ChangeFactory<I, T, R> changeFactory) {
this.changeFactory = changeFactory;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public ResourceChangeComputer(final KeyMapper<V, K> keyMapper,
final ResourceChangeFactory<K, V, R> changeFactory) {

this(keyMapper, changeFactory, false);

}

/**
Expand Down
5 changes: 5 additions & 0 deletions providers/jikkou-provider-aiven/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>jikkou-provider-schema-registry</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>jikkou-provider-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>jikkou-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void shouldCreateSchemaRegistrySubject() {
);
V1SchemaRegistrySubject resource = V1SchemaRegistrySubject
.builder()
.withApiVersion(ApiVersions.KAFKA_REGISTRY_API_VERSION)
.withApiVersion(ApiVersions.KAFKA_AIVEN_V1BETA1)
.withMetadata(ObjectMeta.builder()
.withName(TEST_SUBJECT)
.build()
Expand All @@ -148,7 +148,7 @@ void shouldCreateSchemaRegistrySubject() {
ResourceChange actual = result.change();
ResourceChange expected = GenericResourceChange
.builder(V1SchemaRegistrySubject.class)
.withApiVersion(ApiVersions.KAFKA_REGISTRY_API_VERSION)
.withApiVersion(ApiVersions.KAFKA_AIVEN_V1BETA1)
.withMetadata(ObjectMeta
.builder()
.withName(TEST_SUBJECT)
Expand Down Expand Up @@ -217,7 +217,7 @@ void shouldUpdateSchemaRegistrySubject() {
""")
);
V1SchemaRegistrySubject resource = V1SchemaRegistrySubject.builder()
.withApiVersion(ApiVersions.KAFKA_REGISTRY_API_VERSION)
.withApiVersion(ApiVersions.KAFKA_AIVEN_V1BETA1)
.withMetadata(ObjectMeta.builder()
.withName(TEST_SUBJECT)
.build()
Expand All @@ -243,7 +243,7 @@ void shouldUpdateSchemaRegistrySubject() {
ResourceChange actual = result.change();
ResourceChange expected = GenericResourceChange
.builder(V1SchemaRegistrySubject.class)
.withApiVersion(ApiVersions.KAFKA_REGISTRY_API_VERSION)
.withApiVersion(ApiVersions.KAFKA_AIVEN_V1BETA1)
.withMetadata(ObjectMeta
.builder()
.withName(TEST_SUBJECT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
*/
package io.streamthoughts.jikkou.extension.aiven;

import static io.streamthoughts.jikkou.extension.aiven.ApiVersions.KAFKA_REGISTRY_API_VERSION;
import static io.streamthoughts.jikkou.extension.aiven.ApiVersions.KAFKA_AIVEN_V1BETA1;
import static io.streamthoughts.jikkou.extension.aiven.ApiVersions.KAFKA_AIVEN_V1BETA2;

import io.streamthoughts.jikkou.core.annotation.Named;
import io.streamthoughts.jikkou.core.extension.ExtensionRegistry;
Expand All @@ -25,14 +26,18 @@
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenKafkaQuotaController;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenKafkaTopicAclEntryCollector;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenKafkaTopicAclEntryController;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenKafkaTopicCollector;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenKafkaTopicController;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenSchemaRegistryAclEntryCollector;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenSchemaRegistryAclEntryController;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenSchemaRegistrySubjectCollector;
import io.streamthoughts.jikkou.extension.aiven.reconciler.AivenSchemaRegistrySubjectController;
import io.streamthoughts.jikkou.extension.aiven.validation.AivenSchemaCompatibilityValidation;
import io.streamthoughts.jikkou.extension.aiven.validation.SchemaRegistryAclEntryValidation;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;
import io.streamthoughts.jikkou.spi.AbstractExtensionProvider;
import java.util.Set;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

Expand All @@ -55,6 +60,8 @@ public void registerExtensions(@NotNull ExtensionRegistry registry) {
registry.register(AivenKafkaQuotaCollector.class, AivenKafkaQuotaCollector::new);
registry.register(AivenKafkaQuotaController.class, AivenKafkaQuotaController::new);
registry.register(AivenSchemaCompatibilityValidation.class, AivenSchemaCompatibilityValidation::new);
registry.register(AivenKafkaTopicController.class, AivenKafkaTopicController::new);
registry.register(AivenKafkaTopicCollector.class, AivenKafkaTopicCollector::new);
}

/**
Expand All @@ -70,14 +77,25 @@ public void registerResources(@NotNull ResourceRegistry registry) {
V1KafkaQuotaList.class
).forEach(cls -> registerResource(registry, cls));

registry.register(V1SchemaRegistrySubject.class, KAFKA_REGISTRY_API_VERSION)
registry.register(V1SchemaRegistrySubject.class, KAFKA_AIVEN_V1BETA1)
.setSingularName("avn-schemaregistrysubject")
.setPluralName("avn-schemaregistrysubjects")
.setShortNames(null);

registry.register(GenericResourceChange.class, ResourceType.of(
ResourceChange.getChangeKindFromResource(V1SchemaRegistrySubject.class),
KAFKA_REGISTRY_API_VERSION
KAFKA_AIVEN_V1BETA1
));

registry.register(V1KafkaTopic.class, KAFKA_AIVEN_V1BETA2)
.setSingularName("avn-kafkatopic")
.setPluralName("avn-kafkatopics")
.setShortNames(Set.of("avn-kt"));

registry.register(GenericResourceChange.class, ResourceType.of(
ResourceChange.getChangeKindFromResource(V1KafkaTopic.class),
KAFKA_AIVEN_V1BETA2
));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

public final class ApiVersions {

public static final String KAFKA_REGISTRY_API_VERSION = "kafka.aiven.io/v1beta1";
public static final String KAFKA_AIVEN_V1BETA1 = "kafka.aiven.io/v1beta1";
public static final String KAFKA_AIVEN_V1BETA2 = "kafka.aiven.io/v1beta2";
public static final String SCHEMA_REGISTRY_KIND = "SchemaRegistrySubject";
public static final String SCHEMA_REGISTRY_CHANGE_KIND = "SchemaRegistrySubjectChange";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) The original authors
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.jikkou.extension.aiven.adapter;

import io.streamthoughts.jikkou.core.models.ConfigValue;
import io.streamthoughts.jikkou.core.models.Configs;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.extension.aiven.ApiVersions;
import io.streamthoughts.jikkou.extension.aiven.api.data.KafkaTopicConfigInfo;
import io.streamthoughts.jikkou.extension.aiven.api.data.KafkaTopicInfo;
import io.streamthoughts.jikkou.extension.aiven.api.data.Tag;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicSpec;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;


public final class KafkaTopicAdapter {

public static final String TAG_AIVEN_IO_PREFIX = "tag.aiven.io/";

public static V1KafkaTopic map(@NotNull final KafkaTopicInfo kafka,
@NotNull final Predicate<KafkaTopicConfigInfo> filter) {
Map<String, Object> labels = Optional.ofNullable(kafka.tags())
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toMap(it -> TAG_AIVEN_IO_PREFIX + it.key(), Tag::value));

Configs topicConfigs = new Configs(kafka.config()
.entrySet()
.stream()
.filter(it -> filter.test(it.getValue()))
.map(it -> {
KafkaTopicConfigInfo configInfo = it.getValue();
return new ConfigValue(
configKeyFromAiven(it.getKey()),
configInfo.value(),
configInfo.source().equals(KafkaTopicConfigInfo.Source.DEFAULT_CONFIG),
configInfo.source().equals(KafkaTopicConfigInfo.Source.TOPIC_CONFIG)
);
}).collect(Collectors.toSet()));

return V1KafkaTopic
.builder()
.withApiVersion(ApiVersions.KAFKA_AIVEN_V1BETA2)
.withMetadata(ObjectMeta
.builder()
.withName(kafka.topicName())
.withLabels(labels)
.build()
)
.withSpec(V1KafkaTopicSpec
.builder()
.withPartitions(kafka.partitions().size())
.withReplicas(kafka.replication().shortValue())
.withConfigs(topicConfigs)
.build()
)
.build();
}

public static String configKeyToAiven(final String key) {
return key.replaceAll("\\.", "_");
}

public static String configKeyFromAiven(final String key) {
return key.replaceAll("_", ".");
}


}
Loading

0 comments on commit acd7e69

Please sign in to comment.