From cbce5fe5cd41ba28d66c85c41ba60fe39596d4a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Fri, 27 Sep 2024 22:28:39 +0200 Subject: [PATCH] Fix restart operation not restarting all connector tasks --- .../ns4kafka/service/ConnectorService.java | 22 +++---- .../client/connect/KafkaConnectClient.java | 7 +- .../connect/entities/ConnectorStateInfo.java | 3 +- .../service/ConnectorServiceTest.java | 65 ++++++++++++++++--- 4 files changed, 71 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index cb307201..575c50ef 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -256,20 +256,14 @@ public Flux listUnsynchronizedConnectors(Namespace namespace) { public Mono> restart(Namespace namespace, Connector connector) { return kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(), connector.getMetadata().getName()) - .flatMap(status -> { - Flux> responses = Flux.fromIterable(status.tasks()) - .flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(), - connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId())) - .map(response -> { - log.info("Success restarting connector [{}] on namespace [{}] connect [{}]", - connector.getMetadata().getName(), - namespace.getMetadata().getName(), - connector.getSpec().getConnectCluster()); - return HttpResponse.ok(); - }); - - return Mono.from(responses); - }); + .flatMap(status -> Flux.fromIterable(status.tasks()) + .flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId())) + .doOnNext(restart -> log.info("Success restarting connector [{}] on namespace [{}] connect [{}]", + connector.getMetadata().getName(), + namespace.getMetadata().getName(), + connector.getSpec().getConnectCluster())) + .then(Mono.just(HttpResponse.ok()))); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java index dca32ed1..3f2a9818 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java @@ -37,13 +37,17 @@ @Singleton public class KafkaConnectClient { private static final String CONNECTORS = "/connectors/"; + @Inject ConnectClusterRepository connectClusterRepository; + @Inject @Client(id = "kafka-connect") private HttpClient httpClient; + @Inject private List managedClusterProperties; + @Inject private SecurityProperties securityProperties; @@ -222,7 +226,8 @@ public Mono> resume(String kafkaCluster, String connectCluste * @return The Kafka Connect configuration */ public KafkaConnectClient.KafkaConnectHttpConfig getKafkaConnectConfig(String kafkaCluster, String connectCluster) { - Optional config = managedClusterProperties.stream() + Optional config = managedClusterProperties + .stream() .filter(kafkaAsyncExecutorConfig -> kafkaAsyncExecutorConfig.getName().equals(kafkaCluster)) .findFirst(); diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java index 32a4bbc6..d639aa45 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java @@ -67,7 +67,8 @@ public ConnectorState(@JsonProperty("state") String state, @JsonProperty("worker public static class TaskState extends AbstractState implements Comparable { private final int id; - public TaskState(@JsonProperty("id") int id, @JsonProperty("state") String state, + public TaskState(@JsonProperty("id") int id, + @JsonProperty("state") String state, @JsonProperty("worker_id") String worker, @JsonProperty("msg") String msg) { super(state, worker, msg); diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index db5dd59d..447f84c1 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -1,7 +1,11 @@ package com.michelin.ns4kafka.service; +import static com.michelin.ns4kafka.service.client.connect.entities.ConnectorType.SOURCE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -21,6 +25,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConfigKeyInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConfigValueInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorType; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; import com.michelin.ns4kafka.validation.ConnectValidator; @@ -35,7 +40,6 @@ import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentMatchers; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -868,11 +872,7 @@ void shouldNotValidateRemotelyWhenErrorHappens() { List.of(new ConfigInfo(new ConfigKeyInfo(null, null, false, null, null, null, null, 0, null, null, null), new ConfigValueInfo(null, null, null, List.of("error_message"), true)))); - when(kafkaConnectClient.validate( - ArgumentMatchers.eq("local"), - ArgumentMatchers.eq("local-name"), - ArgumentMatchers.any(), - ArgumentMatchers.any())) + when(kafkaConnectClient.validate(eq("local"), eq("local-name"), any(), any())) .thenReturn(Mono.just(configInfos)); StepVerifier.create(connectorService.validateRemotely(ns, connector)) @@ -908,10 +908,10 @@ void shouldValidateRemotely() { ConfigInfos configInfos = new ConfigInfos("name", 1, List.of(), List.of()); when(kafkaConnectClient.validate( - ArgumentMatchers.eq("local"), - ArgumentMatchers.eq("local-name"), - ArgumentMatchers.any(), - ArgumentMatchers.any())) + eq("local"), + eq("local-name"), + any(), + any())) .thenReturn(Mono.just(configInfos)); StepVerifier.create(connectorService.validateRemotely(ns, connector)) @@ -1305,4 +1305,49 @@ void shouldNotDeleteConnectorWhenConnectClusterReturnsError() { verify(connectorRepository, never()).delete(connector); } + + @Test + void shouldRestartAllTasksOfConnector() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Metadata.builder() + .name("ns-connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .connectCluster("local-name") + .build()) + .build(); + + when(kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(), + connector.getMetadata().getName())) + .thenReturn(Mono.just(new ConnectorStateInfo( + "connector", + new ConnectorStateInfo.ConnectorState("RUNNING", "worker", "message"), + List.of( + new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "message"), + new ConnectorStateInfo.TaskState(1, "RUNNING", "worker", "message"), + new ConnectorStateInfo.TaskState(2, "RUNNING", "worker", "message") + ), + SOURCE))); + + when(kafkaConnectClient.restart(any(), any(), any(), anyInt())) + .thenReturn(Mono.just(HttpResponse.ok())); + + StepVerifier.create(connectorService.restart(namespace, connector)) + .consumeNextWith(response -> assertEquals(HttpStatus.OK, response.getStatus())) + .verifyComplete(); + + verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 0); + verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 1); + verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 2); + } }