Skip to content

Commit

Permalink
Adding Dyno lock to enable using redis for distributed locking using …
Browse files Browse the repository at this point in the history
…Redlock
  • Loading branch information
rsrinivasanNetflix authored and smukil committed Aug 26, 2019
1 parent 990da1f commit 4db8e0b
Show file tree
Hide file tree
Showing 60 changed files with 2,097 additions and 796 deletions.
16 changes: 16 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ subprojects {
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'jacoco'
sourceCompatibility = 1.8

repositories {
Expand All @@ -38,6 +39,16 @@ subprojects {
options.addStringOption('Xdoclint:none', '-quiet')
}
}

jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.enabled true
}
}

test.finalizedBy(project.tasks.jacocoTestReport)
}

project(':dyno-core') {
Expand Down Expand Up @@ -133,5 +144,10 @@ project(':dyno-recipes') {
dependencies {
compile project(':dyno-core')
compile project(':dyno-jedis')
testCompile 'com.netflix.spinnaker.embedded-redis:embedded-redis:0.8.0'
}

test {
testLogging.showStandardStreams = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigura
private final ErrorRateMonitorConfig errorRateConfig;
private final RetryPolicyFactory retryPolicyFactory;
private final DynamicBooleanProperty failOnStartupIfNoHosts;
private final DynamicIntProperty lockVotingSize;

private DynamicBooleanProperty isDualWriteEnabled;
private DynamicStringProperty dualWriteClusterName;
Expand All @@ -72,6 +73,7 @@ public ArchaiusConnectionPoolConfiguration(String name) {
configPublisherConfig = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".config.publisher.address", super.getConfigurationPublisherConfig());
failOnStartupIfNoHosts = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".config.startup.failIfNoHosts", super.getFailOnStartupIfNoHosts());
compressionThreshold = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.compressionThreshold", super.getValueCompressionThreshold());
lockVotingSize = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.lock.votingSize", super.getLockVotingSize());

loadBalanceStrategy = parseLBStrategy(propertyPrefix);
errorRateConfig = parseErrorRateMonitorConfig(propertyPrefix);
Expand Down Expand Up @@ -171,6 +173,11 @@ public int getDualWritePercentage() {
return dualWritePercentage.get();
}

@Override
public int getLockVotingSize() {
return lockVotingSize.get();
}

public void setIsDualWriteEnabled(DynamicBooleanProperty booleanProperty) {
this.isDualWriteEnabled = booleanProperty;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;

import com.netflix.dyno.connectionpool.HostBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,7 +115,7 @@ public Host apply(InstanceInfo info) {
if (rack == null) {
Logger.error("Rack wasn't found for host:" + info.getHostName() + " there may be issues matching it up to the token map");
}
Host host = new Host(info.getHostName(), info.getIPAddr(), rack, status);
Host host = new HostBuilder().setHostname(info.getHostName()).setIpAddress(info.getIPAddr()).setRack(rack).setStatus(status).createHost();
return host;
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
*/
package com.netflix.dyno.contrib.consul;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
Expand All @@ -34,7 +26,15 @@
import com.google.common.collect.Lists;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.connectionpool.HostBuilder;
import com.netflix.dyno.connectionpool.HostSupplier;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Simple class that implements {@link Supplier}<{@link List}<{@link Host}>>. It provides a List<{@link Host}>
Expand Down Expand Up @@ -129,8 +129,13 @@ public Host apply(HealthService info) {
Logger.error("Rack wasn't found for host:" + info.getNode()
+ " there may be issues matching it up to the token map");
}
Host host = new Host(hostName, hostName, info.getService().getPort(), rack,
String.valueOf(metaData.get("datacenter")), status);
Host host = new HostBuilder().setHostname(hostName)
.setIpAddress(hostName)
.setPort(info.getService().getPort())
.setRack(rack)
.setDatacenter(String.valueOf(metaData.get("datacenter")))
.setStatus(status)
.createHost();
return host;
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ public interface ConnectionFactory<CL> {
* Create a connection for this {@link HostConnectionPool}
*
* @param pool
* @param observor
* @return
* @throws DynoConnectException
* @throws ThrottledException
*/
public Connection<CL> createConnection(HostConnectionPool<CL> pool, ConnectionObservor observor) throws DynoConnectException, ThrottledException;
Connection<CL> createConnection(HostConnectionPool<CL> pool) throws DynoConnectException;

Connection<CL> createConnectionWithDataStore(HostConnectionPool<CL> pool)
throws DynoConnectException;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ enum CompressionStrategy {
THRESHOLD
}

/**
* Should connections in this pool connect to the datastore directly?
* @return
*/
boolean isConnectToDatastore();

boolean isFallbackEnabled();

/**
* Returns the voting size for dyno lock
* @return
*/
int getLockVotingSize();

/**
* Returns the unique name assigned to this connection pool.
*/
Expand Down Expand Up @@ -224,4 +238,5 @@ enum CompressionStrategy {

String getHashtag();

ConnectionPoolConfiguration setLocalZoneAffinity(boolean condition);
}
81 changes: 29 additions & 52 deletions dyno-core/src/main/java/com/netflix/dyno/connectionpool/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
******************************************************************************/
package com.netflix.dyno.connectionpool;

import org.apache.commons.lang3.StringUtils;

import java.net.InetSocketAddress;
import java.util.Objects;

import com.netflix.dyno.connectionpool.impl.utils.ConfigUtils;
import org.apache.commons.lang3.StringUtils;

/**
* Class encapsulating information about a host.
* <p>
Expand All @@ -35,12 +34,15 @@
public class Host implements Comparable<Host> {

public static final int DEFAULT_PORT = 8102;
public static final Host NO_HOST = new Host("UNKNOWN", "UNKNOWN", 0, "UNKNOWN");
public static final int DEFAULT_DATASTORE_PORT = 22122;
public static final Host NO_HOST = new HostBuilder().setHostname("UNKNOWN").setIpAddress("UNKNOWN").setPort(0)
.setRack("UNKNOWN").createHost();

private final String hostname;
private final String ipAddress;
private final int port;
private final int securePort;
private final int datastorePort;
private final InetSocketAddress socketAddress;
private final String rack;
private final String datacenter;
Expand All @@ -52,56 +54,12 @@ public enum Status {
Up, Down;
}

public Host(String hostname, int port, String rack) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), Status.Down, null);
}

public Host(String hostname, String rack, Status status) {
this(hostname, null, DEFAULT_PORT, DEFAULT_PORT, rack, ConfigUtils.getDataCenterFromRack(rack), status, null);
}

public Host(String hostname, int port, String rack, Status status) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), status, null);
}

public Host(String hostname, int port, String rack, Status status, String hashtag) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), status, hashtag);
}

public Host(String hostname, int port, String rack, Status status, String hashtag, String password) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), status, hashtag, password);
}

public Host(String hostname, String ipAddress, int port, String rack) {
this(hostname, ipAddress, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), Status.Down, null);
}

public Host(String hostname, String ipAddress, String rack, Status status) {
this(hostname, ipAddress, DEFAULT_PORT, DEFAULT_PORT, rack, ConfigUtils.getDataCenterFromRack(rack), status, null);
}

public Host(String hostname, String ipAddress, String rack, Status status, String hashtag) {
this(hostname, ipAddress, DEFAULT_PORT, DEFAULT_PORT, rack, ConfigUtils.getDataCenterFromRack(rack), status, hashtag);
}

public Host(String hostname, String ipAddress, int port, String rack, String datacenter, Status status) {
this(hostname, ipAddress, port, port, rack, datacenter, status, null);
}

public Host(String name, String ipAddress, int port, String rack, String datacenter, Status status,
String hashtag) {
this(name, ipAddress, port, port, rack, datacenter, status, hashtag);
}

public Host(String name, String ipAddress, int port, int securePort, String rack, String datacenter, Status status, String hashtag) {
this(name, ipAddress, port, port, rack, datacenter, status, hashtag, null);
}

public Host(String name, String ipAddress, int port, int securePort, String rack, String datacenter, Status status, String hashtag, String password) {
this.hostname = name;
public Host(String hostname, String ipAddress, int port, int securePort, int datastorePort, String rack, String datacenter, Status status, String hashtag, String password) {
this.hostname = hostname;
this.ipAddress = ipAddress;
this.port = port;
this.securePort = securePort;
this.datastorePort = datastorePort;
this.rack = rack;
this.status = status;
this.datacenter = datacenter;
Expand All @@ -110,7 +68,7 @@ public Host(String name, String ipAddress, int port, int securePort, String rack

// Used for the unit tests to prevent host name resolution
if (port != -1) {
this.socketAddress = new InetSocketAddress(name, port);
this.socketAddress = new InetSocketAddress(hostname, port);
} else {
this.socketAddress = null;
}
Expand Down Expand Up @@ -139,6 +97,10 @@ public int getSecurePort() {
return securePort;
}

public int getDatastorePort() {
return datastorePort;
}

public String getDatacenter() {
return datacenter;
}
Expand Down Expand Up @@ -168,6 +130,10 @@ public String getPassword() {
return password;
}

public Status getStatus() {
return status;
}

public InetSocketAddress getSocketAddress() {
return socketAddress;
}
Expand Down Expand Up @@ -223,4 +189,15 @@ public String toString() {
+ rack + ", datacenter: " + datacenter + ", status: " + status.name() + ", hashtag="
+ hashtag + ", password=" + (Objects.nonNull(password) ? "masked" : "null") + "]";
}

public static Host clone(Host host) {
return new HostBuilder().setHostname(host.getHostName())
.setIpAddress(host.getIpAddress()).setPort(host.getPort())
.setSecurePort(host.getSecurePort())
.setRack(host.getRack())
.setDatastorePort(host.getDatastorePort())
.setDatacenter(host.getDatacenter()).setStatus(host.getStatus())
.setHashtag(host.getHashtag())
.setPassword(host.getPassword()).createHost();
}
}
Loading

0 comments on commit 4db8e0b

Please sign in to comment.