Skip to content

Commit

Permalink
Add overridingClientConfig parameter to allow override and fallbacks ot
Browse files Browse the repository at this point in the history
the original configuration. This would be useful when passing parameters
that need to be adapted in some execution scenarios like deployments or
CI/CD.
  • Loading branch information
LeonardoBonacci authored and purbon committed Apr 16, 2021
1 parent 2f46ee0 commit 3d3b211
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 56 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,17 @@ This is how you can run the tool directly as a docker image:
docker run purbon/kafka-topology-builder:latest julie-ops-cli.sh --help
Parsing failed cause of Missing required options: topology, brokers, clientConfig
usage: cli
--allowDelete Permits delete operations for topics and
configs.
--brokers <arg> The Apache Kafka server(s) to connect to.
--clientConfig <arg> The AdminClient configuration file.
--dryRun Print the execution plan without altering
anything.
--help Prints usage information.
--quiet Print minimum status update
--topology <arg> Topology config file.
--version Prints useful version information.
--allowDelete Permits delete operations for topics and
configs.
--brokers <arg> The Apache Kafka server(s) to connect to.
--clientConfig <arg> The client configuration file.
--overridingClientConfig <arg> The overriding client configuration file.
--dryRun Print the execution plan without altering
anything.
--help Prints usage information.
--quiet Print minimum status update
--topology <arg> Topology config file.
--version Prints useful version information.
```

If you install the tool as rpm, you will have available in your $PATH the _julie-ops-cli.sh_.
Expand Down
26 changes: 16 additions & 10 deletions docs/how-to-run-it.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe
$> julie-ops-cli.sh --help
usage: cli
--allowDelete Permits delete operations for topics and
configs. (deprecated, to be removed)
--brokers <arg> The Apache Kafka server(s) to connect to.
--clientConfig <arg> The AdminClient configuration file.
--dryRun Print the execution plan without altering
anything.
--help Prints usage information.
--quiet Print minimum status update
--topology <arg> Topology config file.
--version Prints useful version information.
--allowDelete Permits delete operations for topics
and configs. (deprecated, to be
removed)
--brokers <arg> The Apache Kafka server(s) to connect
to.
--clientConfig <arg> The client configuration file.
--dryRun Print the execution plan without
altering anything.
--help Prints usage information.
--overridingClientConfig <arg> The overriding client configuration
file.
--plans <arg> File describing the predefined plans
--quiet Print minimum status update
--topology <arg> Topology config file.
--version Prints useful version information.
The most important ones are:

* *--brokers*: This is an optional parameter where the user can list the target Kafka cluster urls.
* *--clientConfig*: As other tools, Julie Ops needs it's own configuration. In this parameter users can pass a file listing all different personalisation options.
* *--overridingClientConfig*: The user can pass a second configuration. This configuration takes priority over the default. This mechanism can be used in a CI/CD pipeline, to separate credentials from the main configuration.
* *--dryRun*: When as a user, you don't want to run the tool, but instead see what might happen. This option is very useful to evaluate changes before applying them to the cluster.
* *--topology*: This is where you will pass the topology file. It can be either a single file, or a directory. If a directory is used, all files within are going to be compiled into a single macro topology.
* *--version*: If you wanna know the version you are running.
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/com/purbon/kafka/topology/CommandLineInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ public class CommandLineInterface {
public static final String BROKERS_OPTION = "brokers";
public static final String BROKERS_DESC = "The Apache Kafka server(s) to connect to.";

public static final String ADMIN_CLIENT_CONFIG_OPTION = "clientConfig";
public static final String ADMIN_CLIENT_CONFIG_DESC = "The AdminClient configuration file.";
public static final String CLIENT_CONFIG_OPTION = "clientConfig";
public static final String CLIENT_CONFIG_DESC = "The client configuration file.";

public static final String OVERRIDING_CLIENT_CONFIG_OPTION = "overridingClientConfig";
public static final String OVERRIDING_CLIENT_CONFIG_DESC =
"The overriding AdminClient configuration file.";

public static final String ALLOW_DELETE_OPTION = "allowDelete";
public static final String ALLOW_DELETE_DESC =
Expand Down Expand Up @@ -70,14 +74,22 @@ private Options buildOptions() {
.required(false)
.build();

final Option adminClientConfigFileOption =
final Option clientConfigFileOption =
Option.builder()
.longOpt(ADMIN_CLIENT_CONFIG_OPTION)
.longOpt(CLIENT_CONFIG_OPTION)
.hasArg()
.desc(ADMIN_CLIENT_CONFIG_DESC)
.desc(CLIENT_CONFIG_DESC)
.required()
.build();

final Option overridingAdminClientConfigFileOption =
Option.builder()
.longOpt(OVERRIDING_CLIENT_CONFIG_OPTION)
.hasArg()
.desc(OVERRIDING_CLIENT_CONFIG_DESC)
.required(false)
.build();

final Option allowDeleteOption =
Option.builder()
.longOpt(ALLOW_DELETE_OPTION)
Expand Down Expand Up @@ -118,8 +130,9 @@ private Options buildOptions() {
options.addOption(topologyFileOption);
options.addOption(plansFileOption);
options.addOption(brokersListOption);
options.addOption(adminClientConfigFileOption);
options.addOption(clientConfigFileOption);

options.addOption(overridingAdminClientConfigFileOption);
options.addOption(allowDeleteOption);
options.addOption(dryRunOption);
options.addOption(quietOption);
Expand Down Expand Up @@ -161,7 +174,11 @@ private Map<String, String> parseConfig(CommandLine cmd) {
}
config.put(DRY_RUN_OPTION, String.valueOf(cmd.hasOption(DRY_RUN_OPTION)));
config.put(QUIET_OPTION, String.valueOf(cmd.hasOption(QUIET_OPTION)));
config.put(ADMIN_CLIENT_CONFIG_OPTION, cmd.getOptionValue(ADMIN_CLIENT_CONFIG_OPTION));
config.put(
OVERRIDING_CLIENT_CONFIG_OPTION, cmd.getOptionValue(OVERRIDING_CLIENT_CONFIG_OPTION));
config.put(CLIENT_CONFIG_OPTION, cmd.getOptionValue(CLIENT_CONFIG_OPTION));
config.put(
OVERRIDING_CLIENT_CONFIG_OPTION, cmd.getOptionValue(OVERRIDING_CLIENT_CONFIG_OPTION));
return config;
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.purbon.kafka.topology.serdes.TopologySerdes.FileType;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -26,7 +27,7 @@ public Configuration() {
}

public static Configuration build(Map<String, String> cliParams) {
return build(cliParams, cliParams.get(ADMIN_CLIENT_CONFIG_OPTION));
return build(cliParams, cliParams.get(CLIENT_CONFIG_OPTION));
}

public static Configuration build(Map<String, String> cliParams, String configFile) {
Expand All @@ -35,6 +36,12 @@ public static Configuration build(Map<String, String> cliParams, String configFi
}
ConfigFactory.invalidateCaches();
Config config = ConfigFactory.load();

String overridingConfigFile = cliParams.get(OVERRIDING_CLIENT_CONFIG_OPTION);
if (overridingConfigFile != null) {
Config overridingConfig = ConfigFactory.parseFile(new File(overridingConfigFile));
config = overridingConfig.withFallback(config);
}
return new Configuration(cliParams, config);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class Constants {
"com.purbon.kafka.topology.backend.RedisBackend";

public static final String S3_STATE_PROCESSOR_CLASS =
"com.purbon.kafka.topology.backend.S3Backend";
"com.purbon.kafka.topology.backend.S3Backend";

public static final String GCP_STATE_PROCESSOR_CLASS =
"com.purbon.kafka.topology.backend.GCPBackend";
"com.purbon.kafka.topology.backend.GCPBackend";

public static final String REDIS_HOST_CONFIG = "topology.builder.redis.host";
public static final String REDIS_PORT_CONFIG = "topology.builder.redis.port";
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ static void verifyRequiredParameters(String topologyFile, Map<String, String> co
throw new IOException("Topology file does not exist");
}

String configFilePath = config.get(CommandLineInterface.ADMIN_CLIENT_CONFIG_OPTION);

String configFilePath = config.get(CommandLineInterface.CLIENT_CONFIG_OPTION);
if (!Files.exists(Paths.get(configFilePath))) {
throw new IOException("AdminClient config file does not exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static com.purbon.kafka.topology.BackendController.STATE_FILE_NAME;

import com.purbon.kafka.topology.BackendController;
import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.utils.JSON;
import java.io.IOException;
Expand Down
12 changes: 5 additions & 7 deletions src/test/java/com/purbon/kafka/topology/CLITest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package com.purbon.kafka.topology;

import static com.purbon.kafka.topology.CommandLineInterface.ADMIN_CLIENT_CONFIG_OPTION;
import static com.purbon.kafka.topology.CommandLineInterface.ALLOW_DELETE_OPTION;
import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION;
import static com.purbon.kafka.topology.CommandLineInterface.DRY_RUN_OPTION;
import static com.purbon.kafka.topology.CommandLineInterface.QUIET_OPTION;
import static com.purbon.kafka.topology.CommandLineInterface.*;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -47,7 +43,8 @@ public void testParamPassing() throws Exception {
config.put(ALLOW_DELETE_OPTION, "false");
config.put(DRY_RUN_OPTION, "false");
config.put(QUIET_OPTION, "false");
config.put(ADMIN_CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null);
cli.run(args);

verify(cli, times(1)).processTopology(eq("descriptor.yaml"), eq("default"), eq(config));
Expand All @@ -70,7 +67,8 @@ public void testDryRun() throws Exception {
config.put(ALLOW_DELETE_OPTION, "false");
config.put(DRY_RUN_OPTION, "true");
config.put(QUIET_OPTION, "false");
config.put(ADMIN_CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null);
cli.run(args);

verify(cli, times(1)).processTopology(eq("descriptor.yaml"), eq("default"), eq(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testIncompatiblePrefixValidConfigFields() throws ConfigurationExcept
public void testKafkaInternalTopicDefaultPrefix() {
String clientConfigFile = TestUtils.getResourceFilename("/client-config.properties");

cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
cliOps.put(CLIENT_CONFIG_OPTION, clientConfigFile);

Configuration config = Configuration.build(cliOps);
assertThat(config.getKafkaInternalTopicPrefixes()).isEqualTo(Collections.singletonList("_"));
Expand All @@ -172,7 +172,7 @@ public void testKafkaInternalTopicExtendedPrefix() {
String clientConfigFile =
TestUtils.getResourceFilename("/config-internals-extended.properties");

cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
cliOps.put(CLIENT_CONFIG_OPTION, clientConfigFile);

Configuration config = Configuration.build(cliOps);
assertThat(config.getKafkaInternalTopicPrefixes())
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/purbon/kafka/topology/JulieOpsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class JulieOpsTest {
public void before() throws IOException {
cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, "/fooBar");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

props = new Properties();
props.put(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG, "http://foo:8082");
Expand Down Expand Up @@ -112,7 +112,7 @@ public void verifyProblematicParametersTestOK() throws Exception {
String fileOrDirPath = TestUtils.getResourceFilename("/descriptor.yaml");
String clientConfigFile = TestUtils.getResourceFilename("/client-config.properties");

cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
cliOps.put(CLIENT_CONFIG_OPTION, clientConfigFile);

Configuration builderConfig = new Configuration(cliOps, props);
JulieOps builder =
Expand All @@ -136,7 +136,7 @@ public void builderRunTestAsFromCLI() throws Exception {
config.put(ALLOW_DELETE_OPTION, "false");
config.put(DRY_RUN_OPTION, "true");
config.put(QUIET_OPTION, "false");
config.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
config.put(CLIENT_CONFIG_OPTION, clientConfigFile);

JulieOps builder = JulieOps.build(fileOrDirPath, config);

Expand Down Expand Up @@ -164,7 +164,7 @@ public void builderRunTestAsFromCLIWithARedisBackend() throws Exception {
config.put(ALLOW_DELETE_OPTION, "false");
config.put(DRY_RUN_OPTION, "true");
config.put(QUIET_OPTION, "false");
config.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
config.put(CLIENT_CONFIG_OPTION, clientConfigFile);

JulieOps builder = JulieOps.build(fileOrDirPath, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public void testFreshGeneration() throws IOException {
Topology topology = new TopologyImpl();
topology.setContext("context");
Project project = new ProjectImpl("foo");
project.setConsumers(Arrays.asList(
new Consumer("consumer-principal"),
new Consumer("consumer-principal")));
project.setConsumers(
Arrays.asList(new Consumer("consumer-principal"), new Consumer("consumer-principal")));
project.setProducers(Collections.singletonList(new Producer("producer-principal")));
topology.addProject(project);

Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/purbon/kafka/topology/TopicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void buildTopicNameFormatWithCustomSeparator() {

Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, "/fooBar");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPIC_PREFIX_SEPARATOR_CONFIG, "_");
Expand All @@ -99,7 +99,7 @@ public void buildTopicNameFormatWithCustomPattern() {

Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, "/fooBar");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPIC_PREFIX_FORMAT_CONFIG, "{{otherf}}.{{context}}.{{project}}.{{topic}}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void testWithRBACDescriptor() {
public void testTopicNameWithCustomSeparator() {
Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, "/fooBar");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPIC_PREFIX_SEPARATOR_CONFIG, "_");
Expand All @@ -361,7 +361,7 @@ public void testTopicNameWithCustomSeparator() {
public void testTopicNameWithCustomPattern() {
Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, "/fooBar");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPIC_PREFIX_FORMAT_CONFIG, "{{source}}.{{context}}.{{project}}.{{topic}}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testUsingUnknwownClassName() {
private Configuration createTopologyBuilderConfig(String... validations) {
Map<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, "");
cliOps.put(ADMIN_CLIENT_CONFIG_OPTION, "/fooBar");
cliOps.put(CLIENT_CONFIG_OPTION, "/fooBar");

Properties props = new Properties();
props.put(TOPOLOGY_VALIDATIONS_CONFIG, Arrays.asList(validations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testSetupKafkaTopologyBuilderWithWrongCredentialsHC() throws Excepti
config.put(ALLOW_DELETE_OPTION, "false");
config.put(DRY_RUN_OPTION, "false");
config.put(QUIET_OPTION, "true");
config.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
config.put(CLIENT_CONFIG_OPTION, clientConfigFile);

JulieOps.build(fileOrDirPath, config);
}
Expand All @@ -55,7 +55,7 @@ public void testSetupKafkaTopologyBuilderWithGoodCredentialsHC() throws Exceptio
config.put(ALLOW_DELETE_OPTION, "false");
config.put(DRY_RUN_OPTION, "false");
config.put(QUIET_OPTION, "true");
config.put(ADMIN_CLIENT_CONFIG_OPTION, clientConfigFile);
config.put(CLIENT_CONFIG_OPTION, clientConfigFile);

JulieOps.build(fileOrDirPath, config);
}
Expand Down

0 comments on commit 3d3b211

Please sign in to comment.