Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Computes replication factor for complete topology (active and inactive hosts) #203

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.dyno.connectionpool.HashPartitioner;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.connectionpool.HostConnectionPool;
import com.netflix.dyno.connectionpool.HostSupplier;
import com.netflix.dyno.connectionpool.RetryPolicy;
import com.netflix.dyno.connectionpool.TokenMapSupplier;
import com.netflix.dyno.connectionpool.TokenPoolTopology;
Expand Down Expand Up @@ -327,6 +328,19 @@ public boolean apply(HostToken x) {
*/
public void initWithHosts(Map<Host, HostConnectionPool<CL>> hPools) {

// Check replication factor for complete topology (active and inactive hosts)
if (localSelector.isTokenAware() && localRack != null) {
HostSupplier hostSupplier = cpConfig.getHostSupplier();
if (hostSupplier == null) {
throw new DynoException("Host supplier not configured!");
}

Collection<Host> hosts = hostSupplier.getHosts();
List<HostToken> hostsTokens = tokenSupplier.getTokens(new HashSet<Host>(hosts));

replicationFactor.set(calculateReplicationFactor(hostsTokens));
}

// Get the list of tokens for these hosts
//tokenSupplier.initWithHosts(hPools.keySet());
List<HostToken> allHostTokens = tokenSupplier.getTokens(hPools.keySet());
Expand All @@ -349,10 +363,6 @@ public void initWithHosts(Map<Host, HostConnectionPool<CL>> hPools) {
Map<HostToken, HostConnectionPool<CL>> localPools = getHostPoolsForDC(tokenPoolMap, localRack);
localSelector.initWithHosts(localPools);

if (localSelector.isTokenAware() && localRack != null) {
replicationFactor.set(calculateReplicationFactor(allHostTokens));
}

for (String rack : remoteRacks) {
Map<HostToken, HostConnectionPool<CL>> dcPools = getHostPoolsForDC(tokenPoolMap, rack);
HostSelectionStrategy<CL> remoteSelector = selectorFactory.vendPoolSelectionStrategy();
Expand All @@ -362,7 +372,7 @@ public void initWithHosts(Map<Host, HostConnectionPool<CL>> hPools) {

remoteDCNames.swapWithList(remoteRackSelectors.keySet());

topology.set(getTokenPoolTopology());
topology.set(getTokenPoolTopology());
}

/*package private*/ int calculateReplicationFactor(List<HostToken> allHostTokens) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,56 +228,60 @@ private void checkConnectionPoolMonitorStats(int numHosts, boolean fallback) {

private TokenMapSupplier getTokenMapSupplier() {

/**
cqlsh:dyno_bootstrap> select "availabilityZone","hostname","token" from tokens where "appId" = 'dynomite_redis_puneet';

availabilityZone | hostname | token
------------------+--------------------------------------------+------------
us-east-1c | ec2-54-83-179-213.compute-1.amazonaws.com | 1383429731
us-east-1c | ec2-54-224-184-99.compute-1.amazonaws.com | 309687905
us-east-1c | ec2-54-91-190-159.compute-1.amazonaws.com | 3530913377
us-east-1c | ec2-54-81-31-218.compute-1.amazonaws.com | 2457171554
us-east-1e | ec2-54-198-222-153.compute-1.amazonaws.com | 309687905
us-east-1e | ec2-54-198-239-231.compute-1.amazonaws.com | 2457171554
us-east-1e | ec2-54-226-212-40.compute-1.amazonaws.com | 1383429731
us-east-1e | ec2-54-197-178-229.compute-1.amazonaws.com | 3530913377

cqlsh:dyno_bootstrap>
*/
final Map<Host, HostToken> tokenMap = new HashMap<Host, HostToken>();

tokenMap.put(host1, new HostToken(309687905L, host1));
tokenMap.put(host2, new HostToken(1383429731L, host2));
tokenMap.put(host3, new HostToken(2457171554L, host3));
tokenMap.put(host4, new HostToken(309687905L, host4));
tokenMap.put(host5, new HostToken(1383429731L, host5));
tokenMap.put(host6, new HostToken(2457171554L, host6));

return new TokenMapSupplier() {

@Override
public List<HostToken> getTokens(Set<Host> activeHosts) {
if (activeHosts.size() < tokenMap.size()) {
List<HostToken> hostTokens = new ArrayList<HostToken>(activeHosts.size());
Iterator<Host> iterator = activeHosts.iterator();
while (iterator.hasNext()) {
Host activeHost = (Host) iterator.next();
hostTokens.add(tokenMap.get(activeHost));
/**
cqlsh:dyno_bootstrap> select "availabilityZone","hostname","token" from tokens where "appId" = 'dynomite_redis_puneet';

availabilityZone | hostname | token
------------------+--------------------------------------------+------------
us-east-1c | ec2-54-83-179-213.compute-1.amazonaws.com | 1383429731
us-east-1c | ec2-54-224-184-99.compute-1.amazonaws.com | 309687905
us-east-1c | ec2-54-91-190-159.compute-1.amazonaws.com | 3530913377
us-east-1c | ec2-54-81-31-218.compute-1.amazonaws.com | 2457171554
us-east-1e | ec2-54-198-222-153.compute-1.amazonaws.com | 309687905
us-east-1e | ec2-54-198-239-231.compute-1.amazonaws.com | 2457171554
us-east-1e | ec2-54-226-212-40.compute-1.amazonaws.com | 1383429731
us-east-1e | ec2-54-197-178-229.compute-1.amazonaws.com | 3530913377

cqlsh:dyno_bootstrap>
*/
final Map<Host, HostToken> tokenMap = new HashMap<Host, HostToken>();

tokenMap.put(host1, new HostToken(309687905L, host1));
tokenMap.put(host2, new HostToken(1383429731L, host2));
tokenMap.put(host3, new HostToken(2457171554L, host3));
tokenMap.put(host4, new HostToken(309687905L, host4));
tokenMap.put(host5, new HostToken(1383429731L, host5));
tokenMap.put(host6, new HostToken(2457171554L, host6));

return getTokenMapSupplier(tokenMap);
}

private TokenMapSupplier getTokenMapSupplier(final Map<Host, HostToken> tokenMap) {
return new TokenMapSupplier() {

@Override
public List<HostToken> getTokens(Set<Host> activeHosts) {
if (activeHosts.size() < tokenMap.size()) {
List<HostToken> hostTokens = new ArrayList<HostToken>(activeHosts.size());
Iterator<Host> iterator = activeHosts.iterator();
while (iterator.hasNext()) {
Host activeHost = (Host) iterator.next();
hostTokens.add(tokenMap.get(activeHost));
}
return hostTokens;
} else {
return new ArrayList<HostToken>(tokenMap.values());
}
return hostTokens;
} else {
return new ArrayList<HostToken>(tokenMap.values());
}
}

@Override
public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
return tokenMap.get(host);
}
@Override
public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
return tokenMap.get(host);
}

};
}
};
}
@Test
public void testAddingNewHosts() throws Exception {

Expand Down Expand Up @@ -639,22 +643,60 @@ public RetryPolicy getRetryPolicy() {

@Test(expected = NoAvailableHostsException.class)
public void testHostsDownDuringStartup() {

final ConnectionPoolImpl<TestClient> pool = new ConnectionPoolImpl<TestClient>(connFactory, cpConfig, cpMonitor);

hostSupplierHosts.add(new Host("host1_down", 8080, "localRack", Status.Down));
hostSupplierHosts.add(new Host("host2_down", 8080, "localRack",Status.Down));
hostSupplierHosts.add(new Host("host3_down", 8080, "localRack", Status.Down));

pool.start();

}

@Test
public void testNoReplicationFailureWhenHostDownDuringStartup() {
// Topology with symmetric replication - One node is inactive
final Host h1 = new Host("h1", "192.168.2.1", 8080, "dc-r0", "dc", Status.Up);
final Host h2 = new Host("h2", "192.168.2.2", 8080, "dc-r0", "dc", Status.Up);
final Host h3 = new Host("h3", "192.168.2.3", 8080, "dc-r0", "dc", Status.Up);

final Host h4 = new Host("h4", "192.168.2.4", 8080, "dc-r1", "dc", Status.Down);
final Host h5 = new Host("h5", "192.168.2.5", 8080, "dc-r1", "dc", Status.Up);
final Host h6 = new Host("h6", "192.168.2.6", 8080, "dc-r1", "dc", Status.Up);

hostSupplierHosts.add(h1);
hostSupplierHosts.add(h2);
hostSupplierHosts.add(h3);

hostSupplierHosts.add(h4);
hostSupplierHosts.add(h5);
hostSupplierHosts.add(h6);

final Map<Host, HostToken> tokenMap = new HashMap<Host, HostToken>();

tokenMap.put(h1, new HostToken(309687905L, h1));
tokenMap.put(h2, new HostToken(1383429731L, h2));
tokenMap.put(h3, new HostToken(2457171554L, h3));
tokenMap.put(h4, new HostToken(309687905L, h4));
tokenMap.put(h5, new HostToken(1383429731L, h5));
tokenMap.put(h6, new HostToken(2457171554L, h6));

// Token Aware load balancing
cpConfig.setLoadBalancingStrategy(LoadBalancingStrategy.TokenAware);
cpConfig.setLocalDataCenter("dc");
cpConfig.setLocalRack("dc-r0");
cpConfig.withTokenSupplier(getTokenMapSupplier(tokenMap));

//
final ConnectionPoolImpl<TestClient> pool = new ConnectionPoolImpl<TestClient>(connFactory, cpConfig, cpMonitor);
pool.start();
}

private void executeTestClientOperation(final ConnectionPoolImpl<TestClient> pool) {
executeTestClientOperation(pool, null);
}
private void executeTestClientOperation(final ConnectionPoolImpl<TestClient> pool, final Callable<Void> customLogic) {
private void executeTestClientOperation(final ConnectionPoolImpl<TestClient> pool) {
executeTestClientOperation(pool, null);
}

private void executeTestClientOperation(final ConnectionPoolImpl<TestClient> pool, final Callable<Void> customLogic) {
pool.executeWithFailover(new Operation<TestClient, Integer>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.connectionpool.Host.Status;
import com.netflix.dyno.connectionpool.HostConnectionPool;
import com.netflix.dyno.connectionpool.HostSupplier;
import com.netflix.dyno.connectionpool.TokenMapSupplier;
import com.netflix.dyno.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.dyno.connectionpool.impl.CountingConnectionPoolMonitor;
Expand Down Expand Up @@ -530,13 +531,13 @@ public void testReplicationFactorForMultiRegionCluster() {
);

int rf = selection.calculateReplicationFactor(hostTokens);

Assert.assertEquals(3, rf);
}
}

@Test
public void testChangingHashPartitioner() {
cpConfig.setLoadBalancingStrategy(LoadBalancingStrategy.TokenAware);
cpConfig.withHostSupplier(getHostSupplier());
cpConfig.withTokenSupplier(getTokenMapSupplier());
cpConfig.withHashPartitioner(getMockHashPartitioner(1000000000L));

Expand Down Expand Up @@ -675,6 +676,15 @@ public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
};
}

private HostSupplier getHostSupplier() {
return new HostSupplier () {
@Override
public List<Host> getHosts() {
return Arrays.asList(h1, h2, h3, h4);
}
};
}

private HashPartitioner getMockHashPartitioner(final Long hash) {
return new HashPartitioner() {
@Override
Expand Down