diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallback.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallback.java index a17723a4..eecd7d52 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallback.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallback.java @@ -23,6 +23,7 @@ import com.netflix.dyno.connectionpool.HashPartitioner; import com.netflix.dyno.connectionpool.Host; import com.netflix.dyno.connectionpool.HostConnectionPool; +import com.netflix.dyno.connectionpool.HostSupplier; import com.netflix.dyno.connectionpool.RetryPolicy; import com.netflix.dyno.connectionpool.TokenMapSupplier; import com.netflix.dyno.connectionpool.TokenPoolTopology; @@ -327,6 +328,19 @@ public boolean apply(HostToken x) { */ public void initWithHosts(Map> hPools) { + // Check replication factor for complete topology (active and inactive hosts) + if (localSelector.isTokenAware() && localRack != null) { + HostSupplier hostSupplier = cpConfig.getHostSupplier(); + if (hostSupplier == null) { + throw new DynoException("Host supplier not configured!"); + } + + Collection hosts = hostSupplier.getHosts(); + List hostsTokens = tokenSupplier.getTokens(new HashSet(hosts)); + + replicationFactor.set(calculateReplicationFactor(hostsTokens)); + } + // Get the list of tokens for these hosts //tokenSupplier.initWithHosts(hPools.keySet()); List allHostTokens = tokenSupplier.getTokens(hPools.keySet()); @@ -349,10 +363,6 @@ public void initWithHosts(Map> hPools) { Map> localPools = getHostPoolsForDC(tokenPoolMap, localRack); localSelector.initWithHosts(localPools); - if (localSelector.isTokenAware() && localRack != null) { - replicationFactor.set(calculateReplicationFactor(allHostTokens)); - } - for (String rack : remoteRacks) { Map> dcPools = getHostPoolsForDC(tokenPoolMap, rack); HostSelectionStrategy remoteSelector = selectorFactory.vendPoolSelectionStrategy(); @@ -362,7 +372,7 @@ public void initWithHosts(Map> hPools) { remoteDCNames.swapWithList(remoteRackSelectors.keySet()); - topology.set(getTokenPoolTopology()); + topology.set(getTokenPoolTopology()); } /*package private*/ int calculateReplicationFactor(List allHostTokens) { diff --git a/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java b/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java index 25c25344..f6a4f180 100644 --- a/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java +++ b/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolImplTest.java @@ -228,56 +228,60 @@ private void checkConnectionPoolMonitorStats(int numHosts, boolean fallback) { private TokenMapSupplier getTokenMapSupplier() { - /** - cqlsh:dyno_bootstrap> select "availabilityZone","hostname","token" from tokens where "appId" = 'dynomite_redis_puneet'; - - availabilityZone | hostname | token - ------------------+--------------------------------------------+------------ - us-east-1c | ec2-54-83-179-213.compute-1.amazonaws.com | 1383429731 - us-east-1c | ec2-54-224-184-99.compute-1.amazonaws.com | 309687905 - us-east-1c | ec2-54-91-190-159.compute-1.amazonaws.com | 3530913377 - us-east-1c | ec2-54-81-31-218.compute-1.amazonaws.com | 2457171554 - us-east-1e | ec2-54-198-222-153.compute-1.amazonaws.com | 309687905 - us-east-1e | ec2-54-198-239-231.compute-1.amazonaws.com | 2457171554 - us-east-1e | ec2-54-226-212-40.compute-1.amazonaws.com | 1383429731 - us-east-1e | ec2-54-197-178-229.compute-1.amazonaws.com | 3530913377 - - cqlsh:dyno_bootstrap> - */ - final Map tokenMap = new HashMap(); - - tokenMap.put(host1, new HostToken(309687905L, host1)); - tokenMap.put(host2, new HostToken(1383429731L, host2)); - tokenMap.put(host3, new HostToken(2457171554L, host3)); - tokenMap.put(host4, new HostToken(309687905L, host4)); - tokenMap.put(host5, new HostToken(1383429731L, host5)); - tokenMap.put(host6, new HostToken(2457171554L, host6)); - - return new TokenMapSupplier() { - - @Override - public List getTokens(Set activeHosts) { - if (activeHosts.size() < tokenMap.size()) { - List hostTokens = new ArrayList(activeHosts.size()); - Iterator iterator = activeHosts.iterator(); - while (iterator.hasNext()) { - Host activeHost = (Host) iterator.next(); - hostTokens.add(tokenMap.get(activeHost)); + /** + cqlsh:dyno_bootstrap> select "availabilityZone","hostname","token" from tokens where "appId" = 'dynomite_redis_puneet'; + + availabilityZone | hostname | token + ------------------+--------------------------------------------+------------ + us-east-1c | ec2-54-83-179-213.compute-1.amazonaws.com | 1383429731 + us-east-1c | ec2-54-224-184-99.compute-1.amazonaws.com | 309687905 + us-east-1c | ec2-54-91-190-159.compute-1.amazonaws.com | 3530913377 + us-east-1c | ec2-54-81-31-218.compute-1.amazonaws.com | 2457171554 + us-east-1e | ec2-54-198-222-153.compute-1.amazonaws.com | 309687905 + us-east-1e | ec2-54-198-239-231.compute-1.amazonaws.com | 2457171554 + us-east-1e | ec2-54-226-212-40.compute-1.amazonaws.com | 1383429731 + us-east-1e | ec2-54-197-178-229.compute-1.amazonaws.com | 3530913377 + + cqlsh:dyno_bootstrap> + */ + final Map tokenMap = new HashMap(); + + tokenMap.put(host1, new HostToken(309687905L, host1)); + tokenMap.put(host2, new HostToken(1383429731L, host2)); + tokenMap.put(host3, new HostToken(2457171554L, host3)); + tokenMap.put(host4, new HostToken(309687905L, host4)); + tokenMap.put(host5, new HostToken(1383429731L, host5)); + tokenMap.put(host6, new HostToken(2457171554L, host6)); + + return getTokenMapSupplier(tokenMap); + } + + private TokenMapSupplier getTokenMapSupplier(final Map tokenMap) { + return new TokenMapSupplier() { + + @Override + public List getTokens(Set activeHosts) { + if (activeHosts.size() < tokenMap.size()) { + List hostTokens = new ArrayList(activeHosts.size()); + Iterator iterator = activeHosts.iterator(); + while (iterator.hasNext()) { + Host activeHost = (Host) iterator.next(); + hostTokens.add(tokenMap.get(activeHost)); + } + return hostTokens; + } else { + return new ArrayList(tokenMap.values()); } - return hostTokens; - } else { - return new ArrayList(tokenMap.values()); } - } - @Override - public HostToken getTokenForHost(Host host, Set activeHosts) { - return tokenMap.get(host); - } + @Override + public HostToken getTokenForHost(Host host, Set activeHosts) { + return tokenMap.get(host); + } - }; - } - + }; + } + @Test public void testAddingNewHosts() throws Exception { @@ -639,22 +643,60 @@ public RetryPolicy getRetryPolicy() { @Test(expected = NoAvailableHostsException.class) public void testHostsDownDuringStartup() { - + final ConnectionPoolImpl pool = new ConnectionPoolImpl(connFactory, cpConfig, cpMonitor); hostSupplierHosts.add(new Host("host1_down", 8080, "localRack", Status.Down)); hostSupplierHosts.add(new Host("host2_down", 8080, "localRack",Status.Down)); hostSupplierHosts.add(new Host("host3_down", 8080, "localRack", Status.Down)); - + pool.start(); - + } + + @Test + public void testNoReplicationFailureWhenHostDownDuringStartup() { + // Topology with symmetric replication - One node is inactive + final Host h1 = new Host("h1", "192.168.2.1", 8080, "dc-r0", "dc", Status.Up); + final Host h2 = new Host("h2", "192.168.2.2", 8080, "dc-r0", "dc", Status.Up); + final Host h3 = new Host("h3", "192.168.2.3", 8080, "dc-r0", "dc", Status.Up); + + final Host h4 = new Host("h4", "192.168.2.4", 8080, "dc-r1", "dc", Status.Down); + final Host h5 = new Host("h5", "192.168.2.5", 8080, "dc-r1", "dc", Status.Up); + final Host h6 = new Host("h6", "192.168.2.6", 8080, "dc-r1", "dc", Status.Up); + + hostSupplierHosts.add(h1); + hostSupplierHosts.add(h2); + hostSupplierHosts.add(h3); + + hostSupplierHosts.add(h4); + hostSupplierHosts.add(h5); + hostSupplierHosts.add(h6); + + final Map tokenMap = new HashMap(); + + tokenMap.put(h1, new HostToken(309687905L, h1)); + tokenMap.put(h2, new HostToken(1383429731L, h2)); + tokenMap.put(h3, new HostToken(2457171554L, h3)); + tokenMap.put(h4, new HostToken(309687905L, h4)); + tokenMap.put(h5, new HostToken(1383429731L, h5)); + tokenMap.put(h6, new HostToken(2457171554L, h6)); + + // Token Aware load balancing + cpConfig.setLoadBalancingStrategy(LoadBalancingStrategy.TokenAware); + cpConfig.setLocalDataCenter("dc"); + cpConfig.setLocalRack("dc-r0"); + cpConfig.withTokenSupplier(getTokenMapSupplier(tokenMap)); + + // + final ConnectionPoolImpl pool = new ConnectionPoolImpl(connFactory, cpConfig, cpMonitor); + pool.start(); } - private void executeTestClientOperation(final ConnectionPoolImpl pool) { - executeTestClientOperation(pool, null); - } - - private void executeTestClientOperation(final ConnectionPoolImpl pool, final Callable customLogic) { + private void executeTestClientOperation(final ConnectionPoolImpl pool) { + executeTestClientOperation(pool, null); + } + + private void executeTestClientOperation(final ConnectionPoolImpl pool, final Callable customLogic) { pool.executeWithFailover(new Operation() { @Override diff --git a/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallbackTest.java b/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallbackTest.java index 963bbef4..13048ceb 100644 --- a/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallbackTest.java +++ b/dyno-core/src/test/java/com/netflix/dyno/connectionpool/impl/lb/HostSelectionWithFallbackTest.java @@ -46,6 +46,7 @@ import com.netflix.dyno.connectionpool.Host; import com.netflix.dyno.connectionpool.Host.Status; import com.netflix.dyno.connectionpool.HostConnectionPool; +import com.netflix.dyno.connectionpool.HostSupplier; import com.netflix.dyno.connectionpool.TokenMapSupplier; import com.netflix.dyno.connectionpool.impl.ConnectionPoolConfigurationImpl; import com.netflix.dyno.connectionpool.impl.CountingConnectionPoolMonitor; @@ -530,13 +531,13 @@ public void testReplicationFactorForMultiRegionCluster() { ); int rf = selection.calculateReplicationFactor(hostTokens); - Assert.assertEquals(3, rf); - } + } @Test public void testChangingHashPartitioner() { cpConfig.setLoadBalancingStrategy(LoadBalancingStrategy.TokenAware); + cpConfig.withHostSupplier(getHostSupplier()); cpConfig.withTokenSupplier(getTokenMapSupplier()); cpConfig.withHashPartitioner(getMockHashPartitioner(1000000000L)); @@ -675,6 +676,15 @@ public HostToken getTokenForHost(Host host, Set activeHosts) { }; } + private HostSupplier getHostSupplier() { + return new HostSupplier () { + @Override + public List getHosts() { + return Arrays.asList(h1, h2, h3, h4); + } + }; + } + private HashPartitioner getMockHashPartitioner(final Long hash) { return new HashPartitioner() { @Override