Skip to content

Commit

Permalink
refactor(jikkou): refactor the internal change api
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Nov 24, 2021
1 parent 9cc7f80 commit c261a04
Show file tree
Hide file tree
Showing 26 changed files with 503 additions and 491 deletions.
21 changes: 11 additions & 10 deletions src/main/java/io/streamthoughts/kafka/specs/Printer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package io.streamthoughts.kafka.specs;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.streamthoughts.kafka.specs.change.Change;
import io.streamthoughts.kafka.specs.change.ChangeResult;

import java.io.PrintStream;
import java.util.Collection;
Expand All @@ -43,21 +45,20 @@ public class Printer {
/**
* Print the specified execution results to stdout and terminate the application with the appropriate exit code.
*
* @param results the execution results to print.
* @param verbose print details.
* @param dryRun is dry-run enabled.
* @param <T> the result-type.
* @return the exit code.
* @param results the execution results to print.
* @param verbose print details.
* @param dryRun is dry-run enabled.
* @return the exit code.
*/
public static <T> int print(final Collection<OperationResult<T>> results,
final boolean verbose,
final boolean dryRun) {
public static <T extends Change<?>> int print(final Collection<ChangeResult<T>> results,
final boolean verbose,
final boolean dryRun) {
int ok = 0;
int created = 0;
int changed = 0;
int deleted = 0;
int failed = 0;
for (OperationResult<?> r : results) {
for (ChangeResult<?> r : results) {
final String json;
try {
json = Jackson.JSON_OBJECT_MAPPER
Expand Down Expand Up @@ -104,7 +105,7 @@ public static <T> int print(final Collection<OperationResult<T>> results,

private static void printTask(final Description description, final String status) {
String text = description.textDescription();
String padding = (text.length() < PADDING.length()) ? PADDING.substring(text.length()) : "";
String padding = (text.length() < PADDING.length()) ? PADDING.substring(text.length()) : "";
PS.printf("%sTASK [%s] %s - %s %s\n", isColor() ? ANSI_WHITE : "", description.operation(), text, status, padding);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Objects;

@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class AclChange implements Change<AclChange> {
public class AclChange implements Change<AccessControlPolicy> {

private final OperationType operation;

Expand All @@ -45,17 +45,34 @@ static AclChange none(final AccessControlPolicy policy) {
return new AclChange(OperationType.NONE, policy);
}

/**
* Creates a new {@link AclChange} instance.
*
* @param operation the {@link OperationType}.
* @param policy the {@link AccessControlPolicy}.
*/
private AclChange(final OperationType operation,
final AccessControlPolicy policy) {
this.operation = Objects.requireNonNull(operation, "'operation' should not be null");
this.policy = Objects.requireNonNull(policy, "'policy' should not be null");
}

/**
* {@inheritDoc}
*/
@Override
public OperationType getOperation() {
return operation;
}

/**
* {@inheritDoc}
*/
@Override
public AccessControlPolicy getKey() {
return policy;
}

@JsonProperty
@JsonUnwrapped
public AccessControlPolicy getAccessControlPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package io.streamthoughts.kafka.specs.change;

import io.streamthoughts.kafka.specs.operation.acls.AclOperation;
import io.streamthoughts.kafka.specs.resources.Named;
import io.streamthoughts.kafka.specs.resources.acl.AccessControlPolicy;
import org.jetbrains.annotations.NotNull;
Expand All @@ -31,14 +30,27 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class AclChanges extends AbstractChanges<AclChange, AccessControlPolicy, Void, AclOperation> {
public class AclChangeComputer implements ChangeComputer<AccessControlPolicy, AccessControlPolicy, AclChange, AclChangeComputer.AclChangeOptions> {

public static AclChanges computeChanges(@NotNull final Iterable<AccessControlPolicy> beforeAccessControlPolicies,
@NotNull final Iterable<AccessControlPolicy> afterAccessControlPolicies,
final boolean deleteOrphans) {

Map<String, List<AccessControlPolicy>> beforePoliciesGroupedByPrincipal = Named.groupByName(beforeAccessControlPolicies);
Map<String, List<AccessControlPolicy>> afterPoliciesGroupedByPrincipal = Named.groupByName(afterAccessControlPolicies);
public static class AclChangeOptions extends Options {
final boolean deleteOrphans;

public AclChangeOptions(boolean deleteOrphans) {
this.deleteOrphans = deleteOrphans;
}
}

/**
* {@inheritDoc}
*/
@Override
public List<AclChange> computeChanges(@NotNull final Iterable<AccessControlPolicy> actualStates,
@NotNull final Iterable<AccessControlPolicy> expectedStates,
@NotNull final AclChangeOptions options) {

Map<String, List<AccessControlPolicy>> beforePoliciesGroupedByPrincipal = Named.groupByName(actualStates);
Map<String, List<AccessControlPolicy>> afterPoliciesGroupedByPrincipal = Named.groupByName(expectedStates);

final Map<String, List<AclChange>> changes = new HashMap<>();
afterPoliciesGroupedByPrincipal.forEach((principal, afterPrincipalPolicies) -> {
Expand Down Expand Up @@ -70,7 +82,7 @@ public static AclChanges computeChanges(@NotNull final Iterable<AccessControlPol

});

if (deleteOrphans) {
if (options.deleteOrphans) {
beforePoliciesGroupedByPrincipal.keySet()
.stream()
.filter(Predicate.not(changes::containsKey))
Expand All @@ -82,17 +94,6 @@ public static AclChanges computeChanges(@NotNull final Iterable<AccessControlPol
.collect(Collectors.toList())
));
}
return new AclChanges(changes.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
}

/**
* Creates a new {@link AclChanges} instance.
*
* @param changes the list of {@link Change}.
*/
public AclChanges(@NotNull final Collection<AclChange> changes) {
super(changes
.stream()
.collect(Collectors.toMap(AclChange::getAccessControlPolicy, it -> it)));
return changes.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
}
}
}
11 changes: 9 additions & 2 deletions src/main/java/io/streamthoughts/kafka/specs/change/Change.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
*/
package io.streamthoughts.kafka.specs.change;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
* Represents a change operation on a resource.
*
* @param <T> the change-type.
*/
public interface Change<T extends Change<T>> {
public interface Change<K> {

/**
* @return the operation associated to this change.
*/
OperationType getOperation();

/**
* @return the key-identifier of the resource to be changed.
*/
@JsonIgnore
K getKey();

/**
* Supported operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,11 @@
*/
package io.streamthoughts.kafka.specs.change;

import io.streamthoughts.kafka.specs.OperationResult;
import io.streamthoughts.kafka.specs.operation.Operation;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;

public interface Changes<T extends Change<T>, K, V, O extends Operation<T, K, V>> extends Iterable<T> {

/**
* @return all {@link Change} object.
*/
Collection<T> all();

T get(@NotNull final K resource);
public interface ChangeComputer<S, K, C extends Change<K>, O extends ChangeComputer.Options> {

/**
* Apply the given operation on changes.
*
* @param operation the operation to apply.
* @return the operation results.
*/
List<OperationResult<T>> apply(@NotNull final O operation);
class Options { }

@NotNull
@Override
default Iterator<T> iterator() {
return all().iterator();
}
List<C> computeChanges(Iterable<S> actualStates, Iterable<S> expectedStates, O options);
}
Loading

0 comments on commit c261a04

Please sign in to comment.