Skip to content

Commit

Permalink
feat(specs): add support for altering quotas (#24)
Browse files Browse the repository at this point in the history
Resolves: #24
  • Loading branch information
fhussonnois committed Oct 27, 2021
1 parent 7a05e6f commit 38b0c6a
Show file tree
Hide file tree
Showing 38 changed files with 2,323 additions and 132 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ repositories {

dependencies {
implementation 'org.jetbrains:annotations:22.0.0'
implementation group: 'io.vavr', name: 'vavr', version: '1.0.0-alpha-3'
testImplementation "org.junit.jupiter:junit-jupiter-params:$junitVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/streamthoughts/kafka/specs/KafkaSpecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import io.streamthoughts.kafka.specs.command.AdminClientMixin;
import io.streamthoughts.kafka.specs.command.acls.AclsCommand;
import io.streamthoughts.kafka.specs.command.broker.BrokerCommand;
import io.streamthoughts.kafka.specs.command.quotas.QuotasCommand;
import io.streamthoughts.kafka.specs.command.topic.TopicsCommand;
import io.streamthoughts.kafka.specs.command.validate.ValidateCommand;
import io.streamthoughts.kafka.specs.error.KafkaSpecsException;
import org.apache.kafka.common.metrics.Quota;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
Expand All @@ -44,7 +46,15 @@
synopsisHeading = "%n",
description = "CLI to ease and automate Apache Kafka cluster configuration management.",
mixinStandardHelpOptions = true,
subcommands = {ValidateCommand.class, TopicsCommand.class, AclsCommand.class, BrokerCommand.class, CommandLine.HelpCommand.class })
subcommands = {
ValidateCommand.class,
TopicsCommand.class,
AclsCommand.class,
BrokerCommand.class,
QuotasCommand.class,
CommandLine.HelpCommand.class,
}
)
public class KafkaSpecs {

static LocalDateTime START_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.streamthoughts.kafka.specs.model.V1SpecsObject;
import io.streamthoughts.kafka.specs.transforms.ApplyConfigMapsTransformation;
import io.streamthoughts.kafka.specs.transforms.Transformation;
import io.streamthoughts.kafka.specs.validations.QuotasEntityValidation;
import io.streamthoughts.kafka.specs.validations.TopicMinNumPartitionsValidation;
import io.streamthoughts.kafka.specs.validations.NoDuplicateRolesAllowedValidation;
import io.streamthoughts.kafka.specs.validations.NoDuplicateTopicsAllowedValidation;
Expand All @@ -47,7 +48,8 @@ public static SpecFileValidator getDefault() {
.withValidation(new NoDuplicateUsersAllowedValidation())
.withValidation(new NoDuplicateRolesAllowedValidation())
.withValidation(new TopicMinNumPartitionsValidation())
.withValidation(new TopicMinReplicationFactorValidation());
.withValidation(new TopicMinReplicationFactorValidation())
.withValidation(new QuotasEntityValidation());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import java.util.Objects;

public class ConfigEntryChange extends ValueChange<String> implements Change<ConfigEntryChange>, Named {
public class ConfigEntryChange extends ValueChange<Object> implements Change<ConfigEntryChange>, Named {

private final String name;

Expand All @@ -35,7 +35,7 @@ public class ConfigEntryChange extends ValueChange<String> implements Change<Con
* @param valueChange the {@link ValueChange}.
*/
public ConfigEntryChange(@NotNull final String name,
@NotNull final ValueChange<String> valueChange) {
@NotNull final ValueChange<Object> valueChange) {
super(valueChange);
this.name = Objects.requireNonNull(name, "'name' should not be null");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.specs.change;

import io.streamthoughts.kafka.specs.resources.ConfigValue;
import io.streamthoughts.kafka.specs.resources.Configs;
import io.streamthoughts.kafka.specs.resources.Named;
import io.vavr.Tuple2;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ConfigEntryChanges {

public static Tuple2<Change.OperationType, List<ConfigEntryChange>> computeChange(@NotNull final Configs beforeConfigs,
@NotNull final Configs afterConfigs) {

final Map<String, ConfigValue> beforeConfigsByName = Named.keyByName(beforeConfigs);
final Map<String, ConfigEntryChange> afterConfigsByName = new HashMap<>();

Change.OperationType op = Change.OperationType.NONE;

for (ConfigValue afterConfigValue : afterConfigs) {
final String configEntryName = afterConfigValue.name();

final ConfigValue beforeConfigValue = beforeConfigsByName.getOrDefault(
configEntryName,
new ConfigValue(configEntryName, null)
);

final ValueChange<Object> change = ValueChange.with(
afterConfigValue.value(),
beforeConfigValue.value()
);

if (change.getOperation() != Change.OperationType.NONE) {
op = Change.OperationType.UPDATE;
}

afterConfigsByName.put(configEntryName, new ConfigEntryChange(configEntryName, change));
}

// Iterate on all configs apply on the topic for
// looking for DYNAMIC_TOPIC_CONFIGS that may be orphan.
List<ConfigEntryChange> orphanChanges = beforeConfigsByName.values()
.stream()
.filter(it -> it.unwrap() == null || it.unwrap().source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
.filter(it -> !afterConfigsByName.containsKey(it.name()))
.map(it -> new ConfigEntryChange(it.name(), ValueChange.withBeforeValue(it.value())))
.collect(Collectors.toList());

if (!orphanChanges.isEmpty()) {
op = Change.OperationType.UPDATE;
}

orphanChanges.forEach(it -> afterConfigsByName.put(it.name(), it));

return new Tuple2<>(op, new ArrayList<>(afterConfigsByName.values()));
}
}
107 changes: 107 additions & 0 deletions src/main/java/io/streamthoughts/kafka/specs/change/QuotaChange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.specs.change;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.streamthoughts.kafka.specs.model.V1QuotaEntityObject;
import io.streamthoughts.kafka.specs.model.V1QuotaType;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Objects;

@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class QuotaChange implements Change<QuotaChange> {

private final V1QuotaEntityObject entity;

private final List<ConfigEntryChange> configs;

private final OperationType operation;

private final V1QuotaType type;

/**
* Creates a new {@link QuotaChange} instance.
*
* @param type the quota type.
* @param entity the quota entity.
* @param configs the quota configuration.
*/
public QuotaChange(@NotNull final OperationType operation,
@NotNull final V1QuotaType type,
@NotNull final V1QuotaEntityObject entity,
@NotNull final List<ConfigEntryChange> configs) {
this.operation = operation;
this.entity = entity;
this.configs = configs;
this.type = type;
}

/**
* {@inheritDoc}
*/
@Override
public OperationType getOperation() {
return operation;
}

@JsonProperty
public V1QuotaEntityObject getEntity() {
return entity;
}

@JsonProperty
public List<ConfigEntryChange> getConfigs() {
return configs;
}

@JsonProperty
public V1QuotaType getType() {
return type;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof QuotaChange)) return false;
QuotaChange that = (QuotaChange) o;
return Objects.equals(entity, that.entity) &&
Objects.equals(configs, that.configs) &&
operation == that.operation &&
type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(entity, configs, operation, type);
}

@Override
public String toString() {
return "QuotaChange{" +
"entity=" + entity +
", configs=" + configs +
", operation=" + operation +
", type=" + type +
'}';
}
}
Loading

0 comments on commit 38b0c6a

Please sign in to comment.