Skip to content

Commit

Permalink
add client host enviroment variable for all services
Browse files Browse the repository at this point in the history
  • Loading branch information
jetoile committed Oct 31, 2019
1 parent 2d324c3 commit d38c2ac
Show file tree
Hide file tree
Showing 65 changed files with 782 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,30 @@ private void loadConfig() {
tmpDirPath = getTmpDirPath(configuration, BookkeeperConfig.BOOKKEEPER_TEMP_DIR_KEY);

zookeeperPort = configuration.getInt(ZookeeperConfig.ZOOKEEPER_PORT_KEY);
zookeeperHost = configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_KEY);
zookeeperHost = configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY);
}

@Override
public void loadConfig(Map<String, String> configs) {
if (StringUtils.isNotEmpty(configs.get(BookkeeperConfig.BOOKKEEPER_PORT_KEY))) {
port = Integer.parseInt(configs.get(BookkeeperConfig.BOOKKEEPER_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(BookkeeperConfig.BOOKKEEPER_HTTP_PORT_KEY))) {
httpPort = Integer.parseInt(configs.get(BookkeeperConfig.BOOKKEEPER_HTTP_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(BookkeeperConfig.BOOKKEEPER_IP_KEY))) {
ip = configs.get(BookkeeperConfig.BOOKKEEPER_IP_KEY);
}
if (StringUtils.isNotEmpty(configs.get(BookkeeperConfig.BOOKKEEPER_TEMP_DIR_KEY))) {
tmpDirPath = getTmpDirPath(configs, BookkeeperConfig.BOOKKEEPER_TEMP_DIR_KEY);
}

if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
zookeeperHost = configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY);
}
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
zookeeperPort = Integer.valueOf(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
}
}

private void build() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public class BookkeeperConfig {
public static final String BOOKKEEPER_HTTP_PORT_KEY = "bookkeeper.http.port";
public static final String BOOKKEEPER_TEMP_DIR_KEY = "bookkeeper.temp.dir";

public static final String BOOKKEEPER_IP_CLIENT_KEY = "bookkeeper.client.ip";

private BookkeeperConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ tmp.dir.path=/tmp
zookeeper.temp.dir=/embedded_zk
zookeeper.host=127.0.0.1
zookeeper.port=22010
zookeeper.client.host=127.0.0.1

# BookKeeper
bookkeeper.ip=127.0.0.1
bookkeeper.port=31810
bookkeeper.http.port=31900
bookkeeper.temp.dir=/bookeeper
bookkeeper.temp.dir=/bookeeper

bookkeeper.client.ip=127.0.0.1
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void tearDown() throws BootstrapException {
public void bookkeeperShouldStart() throws NotFoundServiceException {

Client client = ClientBuilder.newClient();
String uri = "http://localhost:" + configuration.getInt(BookkeeperConfig.BOOKKEEPER_HTTP_PORT_KEY);
String uri = "http://" + configuration.getString(BookkeeperConfig.BOOKKEEPER_IP_CLIENT_KEY) + ":" + configuration.getInt(BookkeeperConfig.BOOKKEEPER_HTTP_PORT_KEY);

Response hearbeatResponse = client.target(uri + "/heartbeat").request().get();
assertThat(hearbeatResponse.getStatus()).isEqualTo(200);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ public class CassandraBootstrap implements Bootstrap {
private Configuration configuration;
private CassandraShutDownHook shutdownHook;
private int port;
private String ip;
private String listenAddressIp;
private String tmpDirPath;
private String rpcAddressIp;
private String broadcastAddressIp;
private String broadcastRpcAddressIp;

public CassandraBootstrap() {
try {
Expand All @@ -67,13 +70,19 @@ public ComponentMetadata getMetadata() {

@Override
public String getProperties() {
return "\n \t\t\t ip:" + ip +
return "\n \t\t\t listenAddressIp:" + listenAddressIp +
"\n \t\t\t rpcAddressIp:" + rpcAddressIp +
"\n \t\t\t broadcastAddressIp:" + broadcastAddressIp +
"\n \t\t\t broadcastRpcAddressIp:" + broadcastRpcAddressIp +
"\n \t\t\t port:" + port;
}

private void loadConfig() {
port = configuration.getInt(CassandraConfig.CASSANDRA_PORT_KEY);
ip = configuration.getString(CassandraConfig.CASSANDRA_IP_KEY);
listenAddressIp = configuration.getString(CassandraConfig.CASSANDRA_LISTEN_ADDRESS_IP_KEY);
rpcAddressIp = configuration.getString(CassandraConfig.CASSANDRA_RPC_ADDRESS_IP_KEY);
broadcastAddressIp = configuration.getString(CassandraConfig.CASSANDRA_BROADCAST_ADDRESS_IP_KEY);
broadcastRpcAddressIp = configuration.getString(CassandraConfig.CASSANDRA_BROADCAST_RPC_ADDRESS_IP_KEY);
tmpDirPath = getTmpDirPath(configuration, CassandraConfig.CASSANDRA_TEMP_DIR_KEY);
}

Expand All @@ -82,8 +91,17 @@ public void loadConfig(Map<String, String> configs) {
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_PORT_KEY))) {
port = Integer.parseInt(configs.get(CassandraConfig.CASSANDRA_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_IP_KEY))) {
ip = configs.get(CassandraConfig.CASSANDRA_IP_KEY);
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_LISTEN_ADDRESS_IP_KEY))) {
listenAddressIp = configs.get(CassandraConfig.CASSANDRA_LISTEN_ADDRESS_IP_KEY);
}
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_RPC_ADDRESS_IP_KEY))) {
rpcAddressIp = configs.get(CassandraConfig.CASSANDRA_RPC_ADDRESS_IP_KEY);
}
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_BROADCAST_ADDRESS_IP_KEY))) {
broadcastAddressIp = configs.get(CassandraConfig.CASSANDRA_BROADCAST_ADDRESS_IP_KEY);
}
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_BROADCAST_RPC_ADDRESS_IP_KEY))) {
broadcastRpcAddressIp = configs.get(CassandraConfig.CASSANDRA_BROADCAST_RPC_ADDRESS_IP_KEY);
}
if (StringUtils.isNotEmpty(configs.get(CassandraConfig.CASSANDRA_TEMP_DIR_KEY))) {
tmpDirPath = getTmpDirPath(configs, CassandraConfig.CASSANDRA_TEMP_DIR_KEY);
Expand All @@ -100,10 +118,10 @@ private void build() throws IOException {
shutdownHook = new CassandraShutDownHook();

session = CassandraEmbeddedServerBuilder.builder()
.withListenAddress(ip)
.withRpcAddress(ip)
.withBroadcastAddress(ip)
.withBroadcastRpcAddress(ip)
.withListenAddress(listenAddressIp)
.withRpcAddress(rpcAddressIp)
.withBroadcastAddress(broadcastAddressIp)
.withBroadcastRpcAddress(broadcastRpcAddressIp)
.withCQLPort(port)
.withDataFolder(tmpDirPath + "/data")
.withCommitLogFolder(tmpDirPath + "/commitlog")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
public class CassandraConfig {

// Cassandra
public static final String CASSANDRA_IP_KEY = "cassandra.ip";
public static final String CASSANDRA_LISTEN_ADDRESS_IP_KEY = "cassandra.listen.address.ip";
public static final String CASSANDRA_RPC_ADDRESS_IP_KEY = "cassandra.rpc.address.ip";
public static final String CASSANDRA_BROADCAST_ADDRESS_IP_KEY = "cassandra.broadcast.address.ip";
public static final String CASSANDRA_BROADCAST_RPC_ADDRESS_IP_KEY = "cassandra.broadcast.rpc.address.ip";
public static final String CASSANDRA_PORT_KEY = "cassandra.port";
public static final String CASSANDRA_TEMP_DIR_KEY = "cassandra.temp.dir";

public static final String CASSANDRA_LISTEN_ADDRESS_IP_CLIENT_KEY = "cassandra.listen.address.client.ip";

private CassandraConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ mongo.database.name=test_database
mongo.collection.name=test_collection

# Cassandra
cassandra.ip=127.0.0.1
cassandra.listen.address.ip=127.0.0.1
cassandra.rpc.address.ip=0.0.0.0
cassandra.broadcast.address.ip=127.0.0.1
cassandra.broadcast.rpc.address.ip=127.0.0.1
cassandra.port=13433
cassandra.temp.dir=/embedded_cassandra

cassandra.listen.address.client.ip=127.0.0.1

# ElasticSearch
elasticsearch.version=5.4.3
elasticsearch.ip=127.0.0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void cassandraShouldStart() throws NotFoundServiceException {
@Test
public void cassandraShouldStartWithRealDriver() throws NotFoundServiceException {
Cluster cluster = Cluster.builder()
.addContactPoints(configuration.getString(CassandraConfig.CASSANDRA_IP_KEY)).withPort(configuration.getInt(CassandraConfig.CASSANDRA_PORT_KEY)).build();
.addContactPoints(configuration.getString(CassandraConfig.CASSANDRA_LISTEN_ADDRESS_IP_CLIENT_KEY)).withPort(configuration.getInt(CassandraConfig.CASSANDRA_PORT_KEY)).build();
Session session = cluster.connect();

session.execute("insert into test.test(user, value) values('user2', 'value2')");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ mongo.database.name=test_database
mongo.collection.name=test_collection

# Cassandra
cassandra.ip=127.0.0.1
cassandra.listen.address.ip=127.0.0.1
cassandra.rpc.address.ip=0.0.0.0
cassandra.broadcast.address.ip=127.0.0.1
cassandra.broadcast.rpc.address.ip=127.0.0.1
cassandra.port=13433
cassandra.temp.dir=/embedded_cassandra

cassandra.listen.address.client.ip=127.0.0.1

# ElasticSearch
elasticsearch.version=5.4.3
elasticsearch.ip=127.0.0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,10 @@ public class ConfluentConfig {
public static final String CONFLUENT_KSQL_HOST_KEY = "confluent.ksql.host";
public static final String CONFLUENT_KSQL_PORT_KEY = "confluent.ksql.port";

public static final String CONFLUENT_SCHEMAREGISTRY_HOST_CLIENT_KEY = "confluent.schemaregistry.client.host";
public static final String CONFLUENT_KAFKA_HOST_CLIENT_KEY = "confluent.kafka.client.host";
public static final String CONFLUENT_KSQL_HOST_CLIENT_KEY = "confluent.ksql.client.host";
public static final String CONFLUENT_REST_HOST_CLIENT_KEY = "confluent.rest.client.host";

private ConfluentConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,18 @@ public String getProperties() {
}

public void loadConfig() {
restConfig.put("schema.registry.url", configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY));
restConfig.put("zookeeper.connect", configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_KEY) + ":" + configuration.getString(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
restConfig.put("schema.registry.url", configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_CLIENT_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY));
restConfig.put("zookeeper.connect", configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY) + ":" + configuration.getString(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
restConfig.put("listeners", "http://" + configuration.getString(ConfluentConfig.CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_REST_PORT_KEY));
}

@Override
public void loadConfig(Map<String, String> configs) {
if (StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_KEY)) && StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY))) {
restConfig.put("schema.registry.url", configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY));
if (StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_CLIENT_KEY)) && StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY))) {
restConfig.put("schema.registry.url", configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_CLIENT_KEY) + ":" + configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_HOST_KEY)) && StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
restConfig.put("zookeeper.connect", configs.get(ZookeeperConfig.ZOOKEEPER_HOST_KEY) + ":" + configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY)) && StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
restConfig.put("zookeeper.connect", configs.get(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY) + ":" + configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_REST_HOST_KEY)) && StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_REST_PORT_KEY))) {
restConfig.put("listeners", "http://" + configs.get(ConfluentConfig.CONFLUENT_REST_HOST_KEY) + ":" + configs.get(ConfluentConfig.CONFLUENT_REST_PORT_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ zookeeper.temp.dir=/embedded_zk
zookeeper.host=127.0.0.1
zookeeper.port=22010

zookeeper.client.host=127.0.0.1

# Hive
hive.scratch.dir=/hive_scratch_dir
hive.warehouse.dir=/tmp/warehouse_dir
Expand Down Expand Up @@ -192,4 +194,9 @@ confluent.rest.host=127.0.0.1
confluent.rest.port=8082

confluent.ksql.host=127.0.0.1
confluent.ksql.port=8083
confluent.ksql.port=8083

confluent.schemaregistry.client.host=127.0.0.1
confluent.kafka.client.host=127.0.0.1
confluent.rest.client.host=127.0.0.1
confluent.ksql.client.host=127.0.0.1
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ public class ConfluentConfig {
public static final String CONFLUENT_KSQL_HOST_KEY = "confluent.ksql.host";
public static final String CONFLUENT_KSQL_PORT_KEY = "confluent.ksql.port";

public static final String CONFLUENT_SCHEMAREGISTRY_HOST_CLIENT_KEY = "confluent.schemaregistry.client.host";
public static final String CONFLUENT_KAFKA_HOST_CLIENT_KEY = "confluent.kafka.client.host";
public static final String CONFLUENT_KSQL_HOST_CLIENT_KEY = "confluent.ksql.client.host";

private ConfluentConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package fr.jetoile.hadoopunit.component;

import fr.jetoile.hadoopunit.ComponentMetadata;
import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.HadoopUtils;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import kafka.server.KafkaConfig;
Expand Down Expand Up @@ -78,10 +77,10 @@ public String getProperties() {
}

public void loadConfig() {
kafkaConfig.put("zookeeper.connect", configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_KEY) + ":" + configuration.getString(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
kafkaConfig.put("zookeeper.connect", configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY) + ":" + configuration.getString(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
kafkaConfig.put("log.dirs", getTmpDirPath(configuration, ConfluentConfig.CONFLUENT_KAFKA_LOG_DIR_KEY));
kafkaConfig.put("broker.id", configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_BROKER_ID_KEY));
// kafkaConfig.put("advertised.listeners", "PLAINTEXT://localhost:22222");
kafkaConfig.put("advertised.listeners", "PLAINTEXT://" + ":" + configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY));
kafkaConfig.put("advertised.host.name", configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_HOST_KEY));
kafkaConfig.put("port", configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY));
kafkaConfig.put("confluent.support.metrics.enable", "false");
Expand All @@ -91,8 +90,8 @@ public void loadConfig() {

@Override
public void loadConfig(Map<String, String> configs) {
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_HOST_KEY)) && StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
kafkaConfig.put("zookeeper.connect", configs.get(ZookeeperConfig.ZOOKEEPER_HOST_KEY) + ":" + configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY)) && StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
kafkaConfig.put("zookeeper.connect", configs.get(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY) + ":" + configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_KAFKA_LOG_DIR_KEY))) {
kafkaConfig.put("log.dirs", getTmpDirPath(configs, ConfluentConfig.CONFLUENT_KAFKA_LOG_DIR_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public String getProperties() {
public void loadConfig() {
ksqlConfig.put(KsqlRestConfig.LISTENERS_CONFIG, "http://" + configuration.getString(ConfluentConfig.CONFLUENT_KSQL_HOST_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_KSQL_PORT_KEY));
// props.put(KsqlRestConfig.PORT_CONFIG, String.valueOf(portNumber));
ksqlConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_HOST_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY));
ksqlConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_HOST_CLIENT_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY));
ksqlConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "ksql_config_test");
// ksqlConfig.put(KsqlRestConfig.COMMAND_TOPIC_SUFFIX, "commands");
}
Expand All @@ -91,7 +91,7 @@ public void loadConfig(Map<String, String> configs) {
ksqlConfig.put(KsqlRestConfig.LISTENERS_CONFIG, "http://" + configs.get(ConfluentConfig.CONFLUENT_KSQL_HOST_KEY) + ":" + configs.get(ConfluentConfig.CONFLUENT_KSQL_PORT_KEY));
}
if (StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_KAFKA_HOST_KEY)) && StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY))) {
ksqlConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configs.get(ConfluentConfig.CONFLUENT_KAFKA_HOST_KEY) + ":" + configs.get(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY));
ksqlConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configs.get(ConfluentConfig.CONFLUENT_KAFKA_HOST_CLIENT_KEY) + ":" + configs.get(ConfluentConfig.CONFLUENT_KAFKA_PORT_KEY));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void loadConfig() {
schemaRegistryConfig.put("debug", configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_DEBUG_KEY));
schemaRegistryConfig.put("listeners", "http://" + configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_PORT_KEY));
schemaRegistryConfig.put("kafkastore.topic", configuration.getString(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_TOPIC_KEY));
schemaRegistryConfig.put("kafkastore.connection.url", configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_KEY) + ":" + configuration.getString(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
schemaRegistryConfig.put("kafkastore.connection.url", configuration.getString(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY) + ":" + configuration.getString(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
}

@Override
Expand All @@ -90,8 +90,8 @@ public void loadConfig(Map<String, String> configs) {
if (StringUtils.isNotEmpty(configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_TOPIC_KEY))) {
schemaRegistryConfig.put("kafkastore.topic", configs.get(ConfluentConfig.CONFLUENT_SCHEMAREGISTRY_TOPIC_KEY));
}
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_HOST_KEY)) && StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
schemaRegistryConfig.put("kafkastore.connection.url", configs.get(ZookeeperConfig.ZOOKEEPER_HOST_KEY) + ":" + configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
if (StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY)) && StringUtils.isNotEmpty(configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY))) {
schemaRegistryConfig.put("kafkastore.connection.url", configs.get(ZookeeperConfig.ZOOKEEPER_HOST_CLIENT_KEY) + ":" + configs.get(ZookeeperConfig.ZOOKEEPER_PORT_KEY));
}
}

Expand Down
Loading

0 comments on commit d38c2ac

Please sign in to comment.