Skip to content
This repository has been archived by the owner on Apr 23, 2020. It is now read-only.

Commit

Permalink
Add support for consul shared configuration (#305)
Browse files Browse the repository at this point in the history
* Add support for consul shared configuration

Add consul as a shared config provider.

Fixes #280

* Reap consul sessions when releasing lock; add session TTL
  • Loading branch information
thesquelched authored and xiaochuanyu committed Jul 29, 2017
1 parent 5fcdb41 commit c919b3c
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 1 deletion.
11 changes: 11 additions & 0 deletions exhibitor-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>jersey-json</artifactId>
</dependency>

<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand All @@ -111,5 +116,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.pszymczyk.consul</groupId>
<artifactId>embedded-consul</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.netflix.exhibitor.core.config.consul;

import com.netflix.exhibitor.core.config.*;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;

import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class ConsulConfigProvider implements ConfigProvider {
private static final Long DEFAULT_LOCK_TIMEOUT_MS = 5L * 60L * 1000L; // 5 minutes;
private final Consul consul;
private final Properties defaults;
private final String basePath;
private final String versionPath;
private final String propertiesPath;
private final ConsulKvLock lock;
private final String pseudoLockPath;
private final Long lockTimeoutMs;

/**
* @param consul consul client instance for connecting to consul cluster
* @param prefix consul key-value path under which configs are stored
* @param defaults default properties
*/
public ConsulConfigProvider(Consul consul, String prefix, Properties defaults) {
this(consul, prefix, defaults, DEFAULT_LOCK_TIMEOUT_MS);
}

/**
* @param consul consul client instance for connecting to consul cluster
* @param prefix consul key-value path under which configs are stored
* @param defaults default properties
* @param lockTimeoutMs timeout, in milliseconds, for lock acquisition
*/
public ConsulConfigProvider(Consul consul, String prefix, Properties defaults, Long lockTimeoutMs) {
this.consul = consul;
this.defaults = defaults;
this.lockTimeoutMs = lockTimeoutMs;

this.basePath = prefix.endsWith("/") ? prefix : prefix + "/";
this.versionPath = basePath + "version";
this.propertiesPath = basePath + "properties";
this.pseudoLockPath = basePath + "pseudo-locks";

this.lock = new ConsulKvLock(consul, basePath + "lock", "exhibitor");
}

@Override
public void start() throws Exception {
// NOP
}

@Override
public void close() throws IOException {
// NOP
}

@Override
public LoadedInstanceConfig loadConfig() throws Exception {
ConsulVersionedProperties properties;

lock.acquireLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
try {
properties = loadProperties();
}
finally {
lock.releaseLock();
}

PropertyBasedInstanceConfig config = new PropertyBasedInstanceConfig(
properties.getProperties(), defaults);
return new LoadedInstanceConfig(config, properties.getVersion());
}

@Override
public LoadedInstanceConfig storeConfig(ConfigCollection config, long compareVersion) throws Exception {
Long currentVersion = loadProperties().getVersion();
if (currentVersion != compareVersion) {
return null;
}

KeyValueClient kv = consul.keyValueClient();
PropertyBasedInstanceConfig instanceConfig = new PropertyBasedInstanceConfig(config);
StringWriter writer = new StringWriter();
instanceConfig.getProperties().store(writer, "Auto-generated by Exhibitor");

lock.acquireLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
try {
kv.putValue(propertiesPath, writer.toString());
kv.putValue(versionPath, String.valueOf(currentVersion + 1));
}
finally {
lock.releaseLock();
}

return new LoadedInstanceConfig(instanceConfig, currentVersion + 1);
}

@Override
public PseudoLock newPseudoLock() throws Exception {
return new ConsulPseudoLock(consul, pseudoLockPath);
}

private String getString(String path) {
return consul.keyValueClient().getValueAsString(path).orNull();
}

private Long getLong(String path) {
return Long.valueOf(consul.keyValueClient().getValueAsString(path).or("0"));
}

private ConsulVersionedProperties loadProperties() throws Exception {
Long version = getLong(versionPath);

Properties properties = new Properties();
String rawProperties = getString(propertiesPath);
if (rawProperties != null) {
StringReader reader = new StringReader(getString(propertiesPath));
properties.load(reader);
}

return new ConsulVersionedProperties(properties, version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.netflix.exhibitor.core.config.consul;

import com.google.common.base.Optional;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.option.QueryOptions;

import java.math.BigInteger;
import java.util.concurrent.TimeUnit;

public class ConsulKvLock {
private final Consul consul;
private final String path;
private final String name;
private final String ttl;
private String sessionId;

/**
* @param consul consul client instance for connecting to consul cluster
* @param path consul key-value path to lock
* @param name a descriptive name for the lock
* @param ttl TTL, in seconds, for the consul session underpinning the lock
*/
public ConsulKvLock(Consul consul, String path, String name, Integer ttl) {
this.consul = consul;
this.path = path;
this.name = name;
this.ttl = ttl != null ? String.format("%ds", ttl) : null;
this.sessionId = null;
}

/**
* @param consul consul client instance for connecting to consul cluster
* @param path consul key-value path to lock
* @param name a descriptive name for the lock
*/
public ConsulKvLock(Consul consul, String path, String name) {
this(consul, path, name, 60);
}

private String createSession() {
final ImmutableSession session = ImmutableSession.builder()
.name(name)
.ttl(Optional.fromNullable(ttl))
.build();
return consul.sessionClient().createSession(session).getId();
}

private void destroySession() {
consul.sessionClient().destroySession(sessionId);
sessionId = null;
}

public boolean acquireLock(long maxWait, TimeUnit unit) {
KeyValueClient kv = consul.keyValueClient();
sessionId = createSession();

Optional<Value> value = kv.getValue(path);

if (kv.acquireLock(path, sessionId)) {
return true;
}

BigInteger index = BigInteger.valueOf(value.get().getModifyIndex());
kv.getValue(path, QueryOptions.blockMinutes((int) unit.toMinutes(maxWait), index).build());

if (!kv.acquireLock(path, sessionId)) {
destroySession();
return false;
} else {
return true;
}
}

public void releaseLock() {
try {
KeyValueClient kv = consul.keyValueClient();
Optional<Value> value = kv.getValue(path);

if (value.isPresent()) {
Optional<String> session = value.get().getSession();
if (session.isPresent()) {
kv.releaseLock(path, session.get());
}
}
} finally {
destroySession();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.netflix.exhibitor.core.config.consul;

import com.netflix.exhibitor.core.activity.ActivityLog;
import com.netflix.exhibitor.core.config.PseudoLock;
import com.orbitz.consul.Consul;

import java.util.concurrent.TimeUnit;

public class ConsulPseudoLock implements PseudoLock {

private final ConsulKvLock lock;

public ConsulPseudoLock(Consul consul, String prefix) {
String path = prefix.endsWith("/") ? prefix + "pseudo-lock" : prefix + "/pseudo-lock";
this.lock = new ConsulKvLock(consul, path, "pseudo-lock");
}

@Override
public boolean lock(ActivityLog log, long maxWait, TimeUnit unit) throws Exception {
if (!lock.acquireLock(maxWait, unit)) {
log.add(ActivityLog.Type.ERROR,
String.format("Could not acquire lock within %d ms", unit.toMillis(maxWait)));
return false;
}

return true;
}

@Override
public void unlock() throws Exception {
lock.releaseLock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.netflix.exhibitor.core.config.consul;

import java.util.Properties;

public class ConsulVersionedProperties {
private final Properties properties;
private final Long version;

public ConsulVersionedProperties(Properties properties, Long version) {
this.properties = properties;
this.version = version;
}

public Properties getProperties() {
return properties;
}

public Long getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.netflix.exhibitor.core.config.consul;

import com.google.common.net.HostAndPort;
import com.netflix.exhibitor.core.config.LoadedInstanceConfig;
import com.netflix.exhibitor.core.config.PropertyBasedInstanceConfig;
import com.netflix.exhibitor.core.config.StringConfigs;
import com.orbitz.consul.Consul;
import com.orbitz.consul.model.session.SessionInfo;
import com.pszymczyk.consul.ConsulProcess;
import com.pszymczyk.consul.ConsulStarterBuilder;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.*;

import java.util.List;
import java.util.Properties;


public class TestConsulConfigProvider {
private Timing timing;
private ConsulProcess consul;
private Consul client;

@BeforeClass
public void setupClass() throws Exception {
consul = ConsulStarterBuilder.consulStarter().build().start();
}

@AfterClass
public void tearDownClass()
{
consul.close();
}

@BeforeMethod
public void setup() throws Exception {
timing = new Timing();

consul.reset();
client = Consul.builder()
.withHostAndPort(HostAndPort.fromParts("localhost", consul.getHttpPort()))
.build();
}

@Test
public void testBasic() throws Exception {
ConsulConfigProvider config = new ConsulConfigProvider(client, "prefix", new Properties());

try {
config.start();
config.loadConfig();

Properties properties = new Properties();
properties.setProperty(PropertyBasedInstanceConfig.toName(StringConfigs.ZOO_CFG_EXTRA, PropertyBasedInstanceConfig.ROOT_PROPERTY_PREFIX), "1,2,3");
config.storeConfig(new PropertyBasedInstanceConfig(properties, new Properties()), 0);

timing.sleepABit();

LoadedInstanceConfig instanceConfig = config.loadConfig();
Assert.assertEquals(instanceConfig.getConfig().getRootConfig().getString(StringConfigs.ZOO_CFG_EXTRA), "1,2,3");

List<SessionInfo> sessions = client.sessionClient().listSessions();
Assert.assertEquals(sessions.size(), 0, "Consul session still exists!");
}
finally {
CloseableUtils.closeQuietly(config);
}
}
}
Loading

0 comments on commit c919b3c

Please sign in to comment.