Skip to content
This repository has been archived by the owner on Apr 23, 2020. It is now read-only.

Add support for consul shared configuration #305

Merged
merged 2 commits into from
Jul 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions exhibitor-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>jersey-json</artifactId>
</dependency>

<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand All @@ -111,5 +116,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.pszymczyk.consul</groupId>
<artifactId>embedded-consul</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.netflix.exhibitor.core.config.consul;

import com.netflix.exhibitor.core.config.*;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;

import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class ConsulConfigProvider implements ConfigProvider {
private static final Long DEFAULT_LOCK_TIMEOUT_MS = 5L * 60L * 1000L; // 5 minutes;
private final Consul consul;
private final Properties defaults;
private final String basePath;
private final String versionPath;
private final String propertiesPath;
private final ConsulKvLock lock;
private final String pseudoLockPath;
private final Long lockTimeoutMs;

/**
* @param consul consul client instance for connecting to consul cluster
* @param prefix consul key-value path under which configs are stored
* @param defaults default properties
*/
public ConsulConfigProvider(Consul consul, String prefix, Properties defaults) {
this(consul, prefix, defaults, DEFAULT_LOCK_TIMEOUT_MS);
}

/**
* @param consul consul client instance for connecting to consul cluster
* @param prefix consul key-value path under which configs are stored
* @param defaults default properties
* @param lockTimeoutMs timeout, in milliseconds, for lock acquisition
*/
public ConsulConfigProvider(Consul consul, String prefix, Properties defaults, Long lockTimeoutMs) {
this.consul = consul;
this.defaults = defaults;
this.lockTimeoutMs = lockTimeoutMs;

this.basePath = prefix.endsWith("/") ? prefix : prefix + "/";
this.versionPath = basePath + "version";
this.propertiesPath = basePath + "properties";
this.pseudoLockPath = basePath + "pseudo-locks";

this.lock = new ConsulKvLock(consul, basePath + "lock", "exhibitor");
}

@Override
public void start() throws Exception {
// NOP
}

@Override
public void close() throws IOException {
// NOP
}

@Override
public LoadedInstanceConfig loadConfig() throws Exception {
ConsulVersionedProperties properties;

lock.acquireLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
try {
properties = loadProperties();
}
finally {
lock.releaseLock();
}

PropertyBasedInstanceConfig config = new PropertyBasedInstanceConfig(
properties.getProperties(), defaults);
return new LoadedInstanceConfig(config, properties.getVersion());
}

@Override
public LoadedInstanceConfig storeConfig(ConfigCollection config, long compareVersion) throws Exception {
Long currentVersion = loadProperties().getVersion();
if (currentVersion != compareVersion) {
return null;
}

KeyValueClient kv = consul.keyValueClient();
PropertyBasedInstanceConfig instanceConfig = new PropertyBasedInstanceConfig(config);
StringWriter writer = new StringWriter();
instanceConfig.getProperties().store(writer, "Auto-generated by Exhibitor");

lock.acquireLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
try {
kv.putValue(propertiesPath, writer.toString());
kv.putValue(versionPath, String.valueOf(currentVersion + 1));
}
finally {
lock.releaseLock();
}

return new LoadedInstanceConfig(instanceConfig, currentVersion + 1);
}

@Override
public PseudoLock newPseudoLock() throws Exception {
return new ConsulPseudoLock(consul, pseudoLockPath);
}

private String getString(String path) {
return consul.keyValueClient().getValueAsString(path).orNull();
}

private Long getLong(String path) {
return Long.valueOf(consul.keyValueClient().getValueAsString(path).or("0"));
}

private ConsulVersionedProperties loadProperties() throws Exception {
Long version = getLong(versionPath);

Properties properties = new Properties();
String rawProperties = getString(propertiesPath);
if (rawProperties != null) {
StringReader reader = new StringReader(getString(propertiesPath));
properties.load(reader);
}

return new ConsulVersionedProperties(properties, version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.netflix.exhibitor.core.config.consul;

import com.google.common.base.Optional;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.option.QueryOptions;

import java.math.BigInteger;
import java.util.concurrent.TimeUnit;

public class ConsulKvLock {
private final Consul consul;
private final String path;
private final String name;
private final String ttl;
private String sessionId;

/**
* @param consul consul client instance for connecting to consul cluster
* @param path consul key-value path to lock
* @param name a descriptive name for the lock
* @param ttl TTL, in seconds, for the consul session underpinning the lock
*/
public ConsulKvLock(Consul consul, String path, String name, Integer ttl) {
this.consul = consul;
this.path = path;
this.name = name;
this.ttl = ttl != null ? String.format("%ds", ttl) : null;
this.sessionId = null;
}

/**
* @param consul consul client instance for connecting to consul cluster
* @param path consul key-value path to lock
* @param name a descriptive name for the lock
*/
public ConsulKvLock(Consul consul, String path, String name) {
this(consul, path, name, 60);
}

private String createSession() {
final ImmutableSession session = ImmutableSession.builder()
.name(name)
.ttl(Optional.fromNullable(ttl))
.build();
return consul.sessionClient().createSession(session).getId();
}

private void destroySession() {
consul.sessionClient().destroySession(sessionId);
sessionId = null;
}

public boolean acquireLock(long maxWait, TimeUnit unit) {
KeyValueClient kv = consul.keyValueClient();
sessionId = createSession();

Optional<Value> value = kv.getValue(path);

if (kv.acquireLock(path, sessionId)) {
return true;
}

BigInteger index = BigInteger.valueOf(value.get().getModifyIndex());
kv.getValue(path, QueryOptions.blockMinutes((int) unit.toMinutes(maxWait), index).build());

if (!kv.acquireLock(path, sessionId)) {
destroySession();
return false;
} else {
return true;
}
}

public void releaseLock() {
try {
KeyValueClient kv = consul.keyValueClient();
Optional<Value> value = kv.getValue(path);

if (value.isPresent()) {
Optional<String> session = value.get().getSession();
if (session.isPresent()) {
kv.releaseLock(path, session.get());
}
}
} finally {
destroySession();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.netflix.exhibitor.core.config.consul;

import com.netflix.exhibitor.core.activity.ActivityLog;
import com.netflix.exhibitor.core.config.PseudoLock;
import com.orbitz.consul.Consul;

import java.util.concurrent.TimeUnit;

public class ConsulPseudoLock implements PseudoLock {

private final ConsulKvLock lock;

public ConsulPseudoLock(Consul consul, String prefix) {
String path = prefix.endsWith("/") ? prefix + "pseudo-lock" : prefix + "/pseudo-lock";
this.lock = new ConsulKvLock(consul, path, "pseudo-lock");
}

@Override
public boolean lock(ActivityLog log, long maxWait, TimeUnit unit) throws Exception {
if (!lock.acquireLock(maxWait, unit)) {
log.add(ActivityLog.Type.ERROR,
String.format("Could not acquire lock within %d ms", unit.toMillis(maxWait)));
return false;
}

return true;
}

@Override
public void unlock() throws Exception {
lock.releaseLock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.netflix.exhibitor.core.config.consul;

import java.util.Properties;

public class ConsulVersionedProperties {
private final Properties properties;
private final Long version;

public ConsulVersionedProperties(Properties properties, Long version) {
this.properties = properties;
this.version = version;
}

public Properties getProperties() {
return properties;
}

public Long getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.netflix.exhibitor.core.config.consul;

import com.google.common.net.HostAndPort;
import com.netflix.exhibitor.core.config.LoadedInstanceConfig;
import com.netflix.exhibitor.core.config.PropertyBasedInstanceConfig;
import com.netflix.exhibitor.core.config.StringConfigs;
import com.orbitz.consul.Consul;
import com.orbitz.consul.model.session.SessionInfo;
import com.pszymczyk.consul.ConsulProcess;
import com.pszymczyk.consul.ConsulStarterBuilder;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.*;

import java.util.List;
import java.util.Properties;


public class TestConsulConfigProvider {
private Timing timing;
private ConsulProcess consul;
private Consul client;

@BeforeClass
public void setupClass() throws Exception {
consul = ConsulStarterBuilder.consulStarter().build().start();
}

@AfterClass
public void tearDownClass()
{
consul.close();
}

@BeforeMethod
public void setup() throws Exception {
timing = new Timing();

consul.reset();
client = Consul.builder()
.withHostAndPort(HostAndPort.fromParts("localhost", consul.getHttpPort()))
.build();
}

@Test
public void testBasic() throws Exception {
ConsulConfigProvider config = new ConsulConfigProvider(client, "prefix", new Properties());

try {
config.start();
config.loadConfig();

Properties properties = new Properties();
properties.setProperty(PropertyBasedInstanceConfig.toName(StringConfigs.ZOO_CFG_EXTRA, PropertyBasedInstanceConfig.ROOT_PROPERTY_PREFIX), "1,2,3");
config.storeConfig(new PropertyBasedInstanceConfig(properties, new Properties()), 0);

timing.sleepABit();

LoadedInstanceConfig instanceConfig = config.loadConfig();
Assert.assertEquals(instanceConfig.getConfig().getRootConfig().getString(StringConfigs.ZOO_CFG_EXTRA), "1,2,3");

List<SessionInfo> sessions = client.sessionClient().listSessions();
Assert.assertEquals(sessions.size(), 0, "Consul session still exists!");
}
finally {
CloseableUtils.closeQuietly(config);
}
}
}
Loading