Skip to content

Commit

Permalink
feat(kafka-connect): add support for config-override annotation (#416)
Browse files Browse the repository at this point in the history
This commit adds the new metadata.annotation.config-override
to specify the kafka connect client configuration.

Fix: #416
  • Loading branch information
fhussonnois committed May 1, 2024
1 parent f0c8242 commit 028e49c
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public final class ObjectTypeConverter<T> implements TypeConverter<T> {

private static final ObjectMapper DEFAULT_OBJECT_OBJECT = JsonMapper.builder()
private static final ObjectMapper DEFAULT_OBJECT_MAPPER = JsonMapper.builder()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true)
.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS, true)
Expand All @@ -38,9 +38,9 @@ public final class ObjectTypeConverter<T> implements TypeConverter<T> {
* @return The converter.
*/
public static <T> ObjectTypeConverter<T> newForType(final TypeReference<T> objectType) {
TypeFactory typeFactory = DEFAULT_OBJECT_OBJECT.getTypeFactory();
TypeFactory typeFactory = DEFAULT_OBJECT_MAPPER.getTypeFactory();
JavaType type = typeFactory.constructType(objectType);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_OBJECT, type);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_MAPPER, type);
}

/**
Expand All @@ -51,9 +51,9 @@ public static <T> ObjectTypeConverter<T> newForType(final TypeReference<T> objec
* @return The converter.
*/
public static <T> ObjectTypeConverter<T> newForType(final Class<T> objectType) {
TypeFactory typeFactory = DEFAULT_OBJECT_OBJECT.getTypeFactory();
TypeFactory typeFactory = DEFAULT_OBJECT_MAPPER.getTypeFactory();
JavaType type = typeFactory.constructType(objectType);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_OBJECT, type);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_MAPPER, type);
}

/**
Expand All @@ -64,9 +64,9 @@ public static <T> ObjectTypeConverter<T> newForType(final Class<T> objectType) {
* @return The converter.
*/
public static <T> ObjectTypeConverter<List<T>> newForList(Class<T> elementClass) {
TypeFactory typeFactory = DEFAULT_OBJECT_OBJECT.getTypeFactory();
TypeFactory typeFactory = DEFAULT_OBJECT_MAPPER.getTypeFactory();
CollectionType type = typeFactory.constructCollectionType(List.class, elementClass);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_OBJECT, type);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_MAPPER, type);
}

/**
Expand All @@ -77,9 +77,9 @@ public static <T> ObjectTypeConverter<List<T>> newForList(Class<T> elementClass)
* @return The converter.
*/
public static <T> ObjectTypeConverter<Set<T>> newForSet(Class<T> elementClass) {
TypeFactory typeFactory = DEFAULT_OBJECT_OBJECT.getTypeFactory();
TypeFactory typeFactory = DEFAULT_OBJECT_MAPPER.getTypeFactory();
CollectionType type = typeFactory.constructCollectionType(Set.class, elementClass);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_OBJECT, type);
return new ObjectTypeConverter<>(DEFAULT_OBJECT_MAPPER, type);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public interface Jackson {
.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT)
.build();

static ObjectMapper json() {
return JSON_OBJECT_MAPPER;
}

abstract class ContextualJsonDeserializer extends JsonDeserializer<Object> implements ContextualDeserializer {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class CoreAnnotations {
public static final String JIKKOU_BYPASS_VALIDATIONS = PREFIX + "bypass-validations";
public static final String JIKKOU_NO_REPORT = PREFIX + "no-report";
public static final String JIKKOU_IO_TRANSFORM_PREFIX = "transform.jikkou.io";
public static final String JIKKOU_IO_CONFIG_OVERRIDE = PREFIX + "config-override";

private CoreAnnotations() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Map<String, Object> getAnnotations() {
}

/**
* Finds the label value for the specified key.
* Get the label value for the specified key.
*
* @param key The label key. Must not be {@code null}.
* @return The optional value.
Expand Down Expand Up @@ -148,6 +148,18 @@ public boolean hasLabel(final String key) {
return findLabelByKey(key).isPresent();
}

/**
* Get the label value for the specified key.
*
* @param key The label key. Must not be {@code null}.
* @return The optional value.
*/
public NamedValue getAnnotationByKey(final String key) {
return findAnnotationByKey(key)
.map(val -> new NamedValue(key, val))
.orElseThrow(() -> new NoSuchElementException("Cannot found annotation for key '" + key + "'"));
}

/**
* Add label if the specified key is not already associated with
* a value (or is mapped to null) associate it with the given value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,24 @@
*/
@SupportedResource(type = V1KafkaConnector.class)
@ExtensionSpec(
options = {
@ExtensionOptionSpec(
name = KafkaConnectorCollector.EXPAND_STATUS_CONFIG,
description = "Retrieves additional information about the status of the connector and its tasks.",
type = Boolean.class,
defaultValue = "false"
),
@ExtensionOptionSpec(
name = KafkaConnectorCollector.CONNECT_CLUSTER_CONFIG,
description = "List of Kafka Connect cluster from which to list connectors.",
type = List.class
)
}
options = {
@ExtensionOptionSpec(
name = KafkaConnectorCollector.EXPAND_STATUS_CONFIG,
description = "Retrieves additional information about the status of the connector and its tasks.",
type = Boolean.class,
defaultValue = "false"
),
@ExtensionOptionSpec(
name = KafkaConnectorCollector.CONNECT_CLUSTER_CONFIG,
description = "List of Kafka Connect cluster from which to list connectors.",
type = List.class
)
}
)
public final class KafkaConnectorCollector extends ContextualExtension implements Collector<V1KafkaConnector> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectorCollector.class);

public static final String EXPAND_STATUS_CONFIG = "expand-status";
public static final String CONNECT_CLUSTER_CONFIG = "connect-cluster";

Expand All @@ -78,51 +79,61 @@ public void init(@NotNull KafkaConnectExtensionConfig configuration) {
* {@inheritDoc}
**/
@Override
public ResourceListObject<V1KafkaConnector> listAll(@NotNull Configuration configuration,
@NotNull Selector selector) {
public ResourceListObject<V1KafkaConnector> listAll(final @NotNull Configuration configuration,
final @NotNull Selector selector) {

Boolean expandStatus = extensionContext()
.<Boolean>configProperty(EXPAND_STATUS_CONFIG).get(configuration);
.<Boolean>configProperty(EXPAND_STATUS_CONFIG).get(configuration);

Set<String> clusters = extensionContext()
.<List<String>>configProperty(CONNECT_CLUSTER_CONFIG)
.getOptional(configuration)
.map(list -> (Set<String>) new HashSet<>(list))
.orElseGet(() -> this.configuration.getClusters());
.<List<String>>configProperty(CONNECT_CLUSTER_CONFIG)
.getOptional(configuration)
.map(list -> (Set<String>) new HashSet<>(list))
.orElseGet(() -> this.configuration.getClusters());

List<V1KafkaConnector> list = clusters
.stream()
.flatMap(connectCluster -> listAll(connectCluster, expandStatus).stream())
.collect(Collectors.toList());
.stream()
.flatMap(connectCluster -> listAll(connectCluster, expandStatus).stream())
.collect(Collectors.toList());
return new V1KafkaConnectorList(list);
}

public List<V1KafkaConnector> listAll(final String cluster, final boolean expandStatus) {
List<V1KafkaConnector> results = new LinkedList<>();
public List<V1KafkaConnector> listAll(final String connectClusterName,
final boolean expandStatus) {
KafkaConnectClientConfig connectClientConfig = configuration
.getConfigForCluster(cluster)
.orElseThrow(() -> new KafkaConnectClusterNotFoundException("No connect cluster configured for name '" + cluster + "'"));
.getConfigForCluster(connectClusterName)
.orElseThrow(() -> new KafkaConnectClusterNotFoundException("No connect cluster configured for name '" + connectClusterName + "'"));

return listAll(connectClusterName, connectClientConfig, expandStatus);
}

public List<V1KafkaConnector> listAll(final String connectClusterName,
final KafkaConnectClientConfig connectClientConfig,
final boolean expandStatus) {
List<V1KafkaConnector> results = new LinkedList<>();
KafkaConnectApi api = KafkaConnectApiFactory.create(connectClientConfig);
try {
final List<String> connectors = api.listConnectors();
for (String connector : connectors) {
try {
KafkaConnectClusterService service = new KafkaConnectClusterService(cluster, api);
KafkaConnectClusterService service = new KafkaConnectClusterService(connectClusterName, api);
CompletableFuture<V1KafkaConnector> future = service.getConnectorAsync(connector, expandStatus);
V1KafkaConnector result = future.get();
results.add(result);
} catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOG.error("Failed to get connector '{}' from connect cluster {}", connector, cluster, ex);
LOG.error("Failed to get connector '{}' from connect cluster: {}",
connector,
connectClientConfig.getConnectUrl(),
ex
);
}
}
} finally {
api.close();
}
return results;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL;
import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.io.Jackson;
import io.streamthoughts.jikkou.core.models.CoreAnnotations;
import io.streamthoughts.jikkou.core.models.HasMetadata;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor;
Expand All @@ -28,22 +33,26 @@
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectLabels;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApiFactory;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectClientConfig;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeDescription;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler;
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;

@SupportedResource(type = V1KafkaConnector.class)
@SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA, kind = "KafkaConnectorChange")
@ControllerConfiguration(
supportedModes = {CREATE, DELETE, UPDATE, FULL}
supportedModes = {CREATE, DELETE, UPDATE, FULL}
)
public final class KafkaConnectorController extends ContextualExtension implements Controller<V1KafkaConnector, ResourceChange> {

Expand Down Expand Up @@ -71,21 +80,22 @@ public List<ChangeResult> execute(@NotNull final ChangeExecutor<ResourceChange>

List<ResourceChange> changes = executor.changes();
Map<String, List<ResourceChange>> changesByCluster = groupByKafkaConnectCluster(
changes,
change -> true
changes,
change -> true
);

List<ChangeResult> results = new LinkedList<>();
for (Map.Entry<String, List<ResourceChange>> entry : changesByCluster.entrySet()) {
final String cluster = entry.getKey();
try (KafkaConnectApi api = KafkaConnectApiFactory.create(configuration.getConfigForCluster(cluster).get())) {
KafkaConnectClientConfig connectClientConfig = getConnectClientConfig(cluster, entry.getValue());
try (KafkaConnectApi api = KafkaConnectApiFactory.create(connectClientConfig)) {
List<ChangeHandler<ResourceChange>> handlers = List.of(
new KafkaConnectorChangeHandler(api, cluster),
new ChangeHandler.None<>(change -> new KafkaConnectorChangeDescription(cluster, change))
new KafkaConnectorChangeHandler(api, cluster),
new ChangeHandler.None<>(change -> new KafkaConnectorChangeDescription(cluster, change))
);
DefaultChangeExecutor<ResourceChange> dedicatedExecutor = new DefaultChangeExecutor<>(
context,
entry.getValue()
context,
entry.getValue()
);
results.addAll(dedicatedExecutor.applyChanges(handlers));
}
Expand All @@ -99,41 +109,82 @@ public List<ChangeResult> execute(@NotNull final ChangeExecutor<ResourceChange>
**/
@Override
public List<ResourceChange> plan(
@NotNull Collection<V1KafkaConnector> resources,
@NotNull ReconciliationContext context) {
@NotNull Collection<V1KafkaConnector> resources,
@NotNull ReconciliationContext context) {

Map<String, List<V1KafkaConnector>> resourcesByCluster = groupByKafkaConnectCluster(
resources,
context.selector()::apply);
resources,
context.selector()::apply);

KafkaConnectorChangeComputer computer = new KafkaConnectorChangeComputer();

List<ResourceChange> allChanges = new LinkedList<>();
for (Map.Entry<String, List<V1KafkaConnector>> entry : resourcesByCluster.entrySet()) {
String clusterName = entry.getKey();
List<V1KafkaConnector> expectedStates = entry.getValue();

List<V1KafkaConnector> actualStates = collector.listAll(clusterName, false)
.stream()
.filter(context.selector()::apply)
.toList();
allChanges.addAll(computer.computeChanges(actualStates, expectedStates));
KafkaConnectClientConfig connectClientConfig = getConnectClientConfig(entry.getKey(), entry.getValue());
List<V1KafkaConnector> actualStates = collector.listAll(entry.getKey(), connectClientConfig, false)
.stream()
.filter(context.selector()::apply)
.toList();
allChanges.addAll(computer.computeChanges(actualStates, entry.getValue()));
}
return allChanges;
}

private KafkaConnectClientConfig getConnectClientConfig(final String connectClusterName,
final List<? extends HasMetadata> connectors) {
List<KafkaConnectClientConfig> clientConfigs = connectors
.stream()
.map(connector -> connector.getMetadata().findAnnotationByKey(CoreAnnotations.JIKKOU_IO_CONFIG_OVERRIDE))
.flatMap(Optional::stream)
.map(config -> {
try {
return Jackson.json().readValue(config.toString(), Map.class);
} catch (JsonProcessingException e) {
throw new JikkouRuntimeException(String.format(
"Failed to parse JSON from metadata.annotation '%s': %s", CoreAnnotations.JIKKOU_IO_CONFIG_OVERRIDE, e.getMessage()
), e);
}
})
.map(Configuration::from)
.map(KafkaConnectClientConfig::new)
.toList();

if (clientConfigs.isEmpty()) {
return configuration.getConfigForCluster(connectClusterName)
.orElseThrow(() -> new KafkaConnectClusterNotFoundException(String.format(
"No connect cluster configured for name '%s'", connectClusterName)
)
);
} else {
if (clientConfigs.size() != connectors.size()) {
throw new JikkouRuntimeException(String.format(
"Not all connector resources for cluster %s define the metadata.annotation '%s'",
connectClusterName, CoreAnnotations.JIKKOU_IO_CONFIG_OVERRIDE
));
}

if (new HashSet<>(clientConfigs).size() > 1) {
throw new JikkouRuntimeException(String.format(
"Multiple config was defined for Kafka Connect cluster '%s' through the metadata.annotation '%s'",
connectClusterName, CoreAnnotations.JIKKOU_IO_CONFIG_OVERRIDE
));
}
return clientConfigs.getFirst();
}
}

@NotNull
private <T extends HasMetadata> Map<String, List<T>> groupByKafkaConnectCluster(@NotNull Collection<T> changes,
@NotNull Predicate<T> predicate) {
return changes
.stream()
.filter(predicate)
.collect(Collectors.groupingBy(
it -> it.getMetadata()
.getLabelByKey(KafkaConnectLabels.KAFKA_CONNECT_CLUSTER)
.getValue()
.toString(),
Collectors.toList())
);
.stream()
.filter(predicate)
.collect(Collectors.groupingBy(
it -> it.getMetadata()
.getLabelByKey(KafkaConnectLabels.KAFKA_CONNECT_CLUSTER)
.getValue()
.toString(),
Collectors.toList())
);
}
}
Loading

0 comments on commit 028e49c

Please sign in to comment.