Skip to content

Commit

Permalink
force special topics to be using the name topic naming convention to …
Browse files Browse the repository at this point in the history
…avoid cases when the prefix formats break the flow (#507)

* force special topics to be using the name topic naming convention to avoid cases when the prefix formats break the flow

* clean up warnings
  • Loading branch information
purbon authored Jul 27, 2022
1 parent 378f061 commit 64465a8
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 106 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public class Constants {
public static final String CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED =
"ccloud.service_account.translation.enabled";

public static final String CCLOUD_SA_ACCOUNT_QUERY_PAGE_SIZE = "ccloud.service_account.query.page.size";
public static final String CCLOUD_SA_ACCOUNT_QUERY_PAGE_SIZE =
"ccloud.service_account.query.page.size";

public static final String TOPOLOGY_EXPERIMENTAL_ENABLED_CONFIG =
"topology.features.experimental";
Expand Down Expand Up @@ -177,5 +178,4 @@ public class Constants {

public static final String JULIE_HTTP_RETRY_TIMES = "julie.http.retry.times";
public static final String JULIE_HTTP_BACKOFF_TIME_MS = "julie.http.retry.backoff.time.ms";

}
11 changes: 6 additions & 5 deletions src/main/java/com/purbon/kafka/topology/TopicManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies) thr
Set<Action> createTopicActions = new HashSet<>();
Set<Action> updateTopicConfigActions = new HashSet<>();
for (Topology topology : topologies.values()) {
var entryTopics = parseMapOfTopics(topology);
Map<String, Topic> entryTopics = parseMapOfTopics(topology);
entryTopics.forEach(
(topicName, topic) -> {
if (currentTopics.contains(topicName)) {
Expand Down Expand Up @@ -107,12 +107,13 @@ public void updatePlan(ExecutionPlan plan, Map<String, Topology> topologies) thr
}

private Map<String, Topic> parseMapOfTopics(Topology topology) {
var topics =
Stream<Topic> topics =
topology.getProjects().stream()
.flatMap(project -> project.getTopics().stream())
.filter(this::matchesPrefixList);

var specialTopics = topology.getSpecialTopics().stream().filter(this::matchesPrefixList);
Stream<Topic> specialTopics =
topology.getSpecialTopics().stream().filter(this::matchesPrefixList);

return Stream.concat(topics, specialTopics)
.collect(Collectors.toMap(Topic::toString, topic -> topic));
Expand Down Expand Up @@ -150,8 +151,8 @@ private Set<String> loadActualClusterStateIfAvailable(ExecutionPlan plan) throws
}

private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan) throws IOException {
var remoteTopics = adminClient.listApplicationTopics();
var delta =
Set<String> remoteTopics = adminClient.listApplicationTopics();
List<String> delta =
plan.getTopics().stream()
.filter(localTopic -> !remoteTopics.contains(localTopic))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public Map<String, Collection<AclBinding>> fetchAclsList() {

public void createAcls(Collection<AclBinding> acls) {
try {
var aclsDump = acls.stream().map(AclBinding::toString).collect(Collectors.joining(", "));
String aclsDump = acls.stream().map(AclBinding::toString).collect(Collectors.joining(", "));
LOGGER.debug("createAcls: " + aclsDump);
adminClient.createAcls(acls).all().get();
} catch (InvalidConfigurationException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Set<ServiceAccount> listServiceAccounts() throws IOException {

public Set<ServiceAccountV1> listServiceAccountsV1() throws IOException {
Set<ServiceAccountV1> accounts = new HashSet<>();
var response = getServiceAccountsV1(V1_IAM_SERVICE_ACCOUNTS_URL);
ServiceAccountV1Response response = getServiceAccountsV1(V1_IAM_SERVICE_ACCOUNTS_URL);
if (response.getError() == null) {
accounts = new HashSet<>(response.getUsers());
}
Expand All @@ -155,7 +155,8 @@ private ServiceAccountV1Response getServiceAccountsV1(String url) throws IOExcep
JSON.toObject(r.getResponseAsString(), ServiceAccountV1Response.class);
}

private ListServiceAccountResponse getListServiceAccounts(String url, int page_size) throws IOException {
private ListServiceAccountResponse getListServiceAccounts(String url, int page_size)
throws IOException {
Response r = ccloudApiHttpClient.doGet(String.format("%s?page_size=%d", url, page_size));
return (ListServiceAccountResponse)
JSON.toObject(r.getResponseAsString(), ListServiceAccountResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class KConnectApiClient extends JulieHttpClient implements ArtefactClient {
Expand All @@ -27,7 +28,7 @@ public KConnectApiClient(String server, String label, Configuration config) thro
super(server, Optional.of(config));
this.label = label;
// configure basic authentication if available
var basicAuths = config.getServersBasicAuthMap();
Map<String, String> basicAuths = config.getServersBasicAuthMap();
if (basicAuths.containsKey(label)) {
String[] values = basicAuths.get(label).split(":");
setBasicAuth(new BasicAuth(values[0], values[1]));
Expand Down Expand Up @@ -60,7 +61,7 @@ public Collection<? extends Artefact> getClusterState() throws IOException {
public Map<String, Object> add(String name, String config) throws IOException {
String url = String.format("/connectors/%s/config", name);

var map = JSON.toMap(config);
Map<String, Object> map = JSON.toMap(config);
if (mayBeAConfigRecord(map)) {
var content = map.get("config");
if (!name.equalsIgnoreCase(map.get("name").toString())) {
Expand All @@ -74,7 +75,7 @@ public Map<String, Object> add(String name, String config) throws IOException {
}

private boolean mayBeAConfigRecord(Map<String, Object> map) {
var keySet = map.keySet();
Set<String> keySet = map.keySet();
return keySet.contains("config") && keySet.contains("name") && keySet.size() == 2;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ public JulieHttpClient(String server, Optional<Configuration> configOptional) th
this.server = server;
this.token = "";
this.httpClient = configureHttpOrHttpsClient(configOptional);
configOptional.ifPresentOrElse(e -> {
retryTimes = e.getHttpRetryTimes();
backoffTimesMs = e.getHttpBackoffTimeMs();
}, () -> {
retryTimes = 0;
backoffTimesMs = 0;
});


configOptional.ifPresentOrElse(
e -> {
retryTimes = e.getHttpRetryTimes();
backoffTimesMs = e.getHttpBackoffTimeMs();
},
() -> {
retryTimes = 0;
backoffTimesMs = 0;
});
}

private HttpRequest.Builder setupARequest(String url, long timeoutMs) {
Expand Down Expand Up @@ -267,7 +267,7 @@ private CompletableFuture<HttpResponse<String>> tryResend(
Throwable throwable) {

if (shouldRetry(response, throwable, count)) {
System.out.println("shouldRetry: count="+count);
System.out.println("shouldRetry: count=" + count);
return httpClient
.sendAsync(request, handler)
.handleAsync((r, t) -> tryResend(request, handler, count + 1, r, t))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public List<Topic> getSpecialTopics() {

@Override
public void addSpecialTopic(Topic topic) {
topic.setTopicNamePattern("name");
this.specialTopics.add(topic);
}

Expand Down
17 changes: 17 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,23 @@ public void shouldParseSpecialTopics() {
var topicNames = topics.stream().map(Topic::getName).collect(Collectors.toList());
assertThat(topicNames).contains("foo");
assertThat(topicNames).contains("bar");

Properties props = new Properties();
props.put(PROJECT_PREFIX_FORMAT_CONFIG, "{{project}}");
props.put(TOPIC_PREFIX_FORMAT_CONFIG, "{{project}}.{{topic}}");
HashMap<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");

Configuration config = new Configuration(cliOps, props);

TopologySerdes parser = new TopologySerdes(config, FileType.YAML, new PlanMap());
topology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml"));
topics = topology.getSpecialTopics();
assertThat(topics).hasSize(2);

var topicFullNames = topics.stream().map(Topic::toString).collect(Collectors.toList());
assertThat(topicFullNames).contains("foo");
assertThat(topicFullNames).contains("bar");
}

private List<Project> buildProjects() {
Expand Down
155 changes: 76 additions & 79 deletions src/test/java/com/purbon/kafka/topology/api/JulieHttpClientTest.java
Original file line number Diff line number Diff line change
@@ -1,93 +1,90 @@
package com.purbon.kafka.topology.api;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION;
import static com.purbon.kafka.topology.Constants.JULIE_HTTP_BACKOFF_TIME_MS;
import static com.purbon.kafka.topology.Constants.JULIE_HTTP_RETRY_TIMES;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.utils.PTHttpClient;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION;
import static com.purbon.kafka.topology.Constants.JULIE_HTTP_BACKOFF_TIME_MS;
import static com.purbon.kafka.topology.Constants.JULIE_HTTP_RETRY_TIMES;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class JulieHttpClientTest {

@Rule
public WireMockRule wireMockRule = new WireMockRule(8089);

private Map<String, String> cliOps;
private Properties props;

private PTHttpClient client;

@Before
public void before() throws IOException {
cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
props = new Properties();

props.put(JULIE_HTTP_BACKOFF_TIME_MS, 0);
Configuration config = new Configuration(cliOps, props);

client = new PTHttpClient(wireMockRule.baseUrl(), Optional.of(config));
}

@Test
public void shouldResponseFastToNonRetryErrorCodes() throws IOException {
stubFor(get(urlEqualTo("/some/thing"))
.willReturn(aResponse()
.withHeader("Content-Type", "text/plain")
.withBody("Hello world!")));
stubFor(get(urlEqualTo("/some/thing/else"))
.willReturn(aResponse()
.withStatus(404)));
assertThat(client.doGet("/some/thing").getStatus()).isEqualTo(200);
assertThat(client.doGet("/some/thing/else").getStatus()).isEqualTo(404);
}

@Test
public void shouldRunTheRetryFlowForRetrievableErrorCodes() throws IOException {

cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
props = new Properties();

props.put(JULIE_HTTP_BACKOFF_TIME_MS, 0);
props.put(JULIE_HTTP_RETRY_TIMES, 5);
Configuration config = new Configuration(cliOps, props);

client = new PTHttpClient(wireMockRule.baseUrl(), Optional.of(config));

stubFor(get(urlEqualTo("/some/thing")).inScenario("retrievable")
.whenScenarioStateIs(STARTED)
.willReturn(aResponse()
.withStatus(429))
.willSetStateTo("retry1")
);

stubFor(get(urlEqualTo("/some/thing")).inScenario("retrievable")
.whenScenarioStateIs("retry1")
.willReturn(aResponse()
.withStatus(503))
.willSetStateTo("retry2")
);

stubFor(get(urlEqualTo("/some/thing")).inScenario("retrievable")
.whenScenarioStateIs("retry2")
.willReturn(aResponse()
.withHeader("Content-type", "text/plain")
.withBody("Hello world!")));

assertThat(client.doGet("/some/thing").getStatus()).isEqualTo(200);
}
@Rule public WireMockRule wireMockRule = new WireMockRule(8089);

private Map<String, String> cliOps;
private Properties props;

private PTHttpClient client;

@Before
public void before() throws IOException {
cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
props = new Properties();

props.put(JULIE_HTTP_BACKOFF_TIME_MS, 0);
Configuration config = new Configuration(cliOps, props);

client = new PTHttpClient(wireMockRule.baseUrl(), Optional.of(config));
}

@Test
public void shouldResponseFastToNonRetryErrorCodes() throws IOException {
stubFor(
get(urlEqualTo("/some/thing"))
.willReturn(
aResponse().withHeader("Content-Type", "text/plain").withBody("Hello world!")));
stubFor(get(urlEqualTo("/some/thing/else")).willReturn(aResponse().withStatus(404)));
assertThat(client.doGet("/some/thing").getStatus()).isEqualTo(200);
assertThat(client.doGet("/some/thing/else").getStatus()).isEqualTo(404);
}

@Test
public void shouldRunTheRetryFlowForRetrievableErrorCodes() throws IOException {

cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
props = new Properties();

props.put(JULIE_HTTP_BACKOFF_TIME_MS, 0);
props.put(JULIE_HTTP_RETRY_TIMES, 5);
Configuration config = new Configuration(cliOps, props);

client = new PTHttpClient(wireMockRule.baseUrl(), Optional.of(config));

stubFor(
get(urlEqualTo("/some/thing"))
.inScenario("retrievable")
.whenScenarioStateIs(STARTED)
.willReturn(aResponse().withStatus(429))
.willSetStateTo("retry1"));

stubFor(
get(urlEqualTo("/some/thing"))
.inScenario("retrievable")
.whenScenarioStateIs("retry1")
.willReturn(aResponse().withStatus(503))
.willSetStateTo("retry2"));

stubFor(
get(urlEqualTo("/some/thing"))
.inScenario("retrievable")
.whenScenarioStateIs("retry2")
.willReturn(
aResponse().withHeader("Content-type", "text/plain").withBody("Hello world!")));

assertThat(client.doGet("/some/thing").getStatus()).isEqualTo(200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.clients.JulieHttpClient;

import java.io.IOException;
import java.util.Optional;

public class PTHttpClient extends JulieHttpClient {

public PTHttpClient(String server, Optional<Configuration> config) throws IOException {
super(server, config);
}
public PTHttpClient(String server, Optional<Configuration> config) throws IOException {
super(server, config);
}
}

0 comments on commit 64465a8

Please sign in to comment.