Skip to content

Commit

Permalink
Merge pull request #1108 from PropertyOfTuckerSystems/pts/aws-asg-int…
Browse files Browse the repository at this point in the history
…erface

Add an interface for AwsAsgUtil
  • Loading branch information
qiangdavidliu committed Aug 6, 2018
2 parents 7935f2c + e4dd6a7 commit f660f78
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.netflix.eureka;

import javax.annotation.Nullable;
import javax.inject.Singleton;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -26,13 +24,17 @@
import java.util.Map;
import java.util.Set;

import javax.annotation.Nullable;
import javax.inject.Singleton;

import com.netflix.config.ConfigurationManager;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.config.DynamicStringSetProperty;
import com.netflix.eureka.aws.AwsBindingStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -314,6 +316,11 @@ public long getEvictionIntervalTimerInMs() {
namespace + "evictionIntervalTimerInMs", (60 * 1000)).get();
}

@Override
public boolean shouldUseAwsAsgApi() {
return configInstance.getBooleanProperty(namespace + "shouldUseAwsAsgApi", true).get();
}

@Override
public int getASGQueryTimeoutMs() {
return configInstance.getIntProperty(namespace + "asgQueryTimeoutMs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package com.netflix.eureka;

import com.netflix.eureka.aws.AwsBindingStrategy;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;

import javax.annotation.Nullable;

import com.netflix.eureka.aws.AwsBindingStrategy;

/**
* Configuration information required by the eureka server to operate.
*
Expand Down Expand Up @@ -264,6 +265,13 @@ public interface EurekaServerConfig {
*/
long getEvictionIntervalTimerInMs();

/**
* Whether to use AWS API to query ASG statuses.
*
* @return true if AWS API is used, false otherwise.
*/
boolean shouldUseAwsAsgApi();

/**
* Get the timeout value for querying the <em>AWS</em> for <em>ASG</em>
* information.
Expand Down Expand Up @@ -418,7 +426,7 @@ public interface EurekaServerConfig {
* @return maximum number of threads to be used for replication.
*/
int getMaxThreadsForPeerReplication();

/**
* Get the minimum number of available peer replication instances
* for this instance to be considered healthy. The design of eureka allows
Expand All @@ -428,7 +436,7 @@ public interface EurekaServerConfig {
* The default value of -1 is interpreted as a marker to not compare
* the number of replicas. This would be done to either disable this check
* or to run eureka in a single node configuration.
*
*
* @return minimum number of available peer replication instances
* for this instance to be considered healthy.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netflix.eureka.aws;

import com.netflix.appinfo.InstanceInfo;

public interface AsgClient {
boolean isASGEnabled(InstanceInfo instanceInfo);

void setStatus(String asgName, boolean enabled);
}
55 changes: 36 additions & 19 deletions eureka-core/src/main/java/com/netflix/eureka/aws/AwsAsgUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.AmazonInfo.MetaDataKey;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.eureka.registry.InstanceRegistry;
import com.netflix.appinfo.DataCenterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.BasicAWSCredentials;
Expand All @@ -58,20 +44,34 @@
import com.amazonaws.services.autoscaling.model.DescribeAutoScalingGroupsRequest;
import com.amazonaws.services.autoscaling.model.DescribeAutoScalingGroupsResult;
import com.amazonaws.services.autoscaling.model.SuspendedProcess;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.AmazonInfo.MetaDataKey;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.registry.InstanceRegistry;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;

import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A utility class for querying and updating information about amazon
Expand All @@ -81,7 +81,7 @@
*
*/
@Singleton
public class AwsAsgUtil {
public class AwsAsgUtil implements AsgClient {
private static final Logger logger = LoggerFactory.getLogger(AwsAsgUtil.class);

private static final String PROP_ADD_TO_LOAD_BALANCER = "AddToLoadBalancer";
Expand Down Expand Up @@ -165,6 +165,18 @@ public boolean isASGEnabled(InstanceInfo instanceInfo) {
if (result != null) {
return result;
} else {
if (!serverConfig.shouldUseAwsAsgApi()) {
// Disabled, cached values (if any) are still being returned if the caller makes
// a decision to call the disabled client during some sort of transitioning
// period, but no new values will be fetched while disabled.

logger.info(("'{}' is not cached at the moment and won't be fetched because querying AWS ASGs "
+ "has been disabled via the config, returning the fallback value."),
cacheKey);

return true;
}

logger.info("Cache value for asg {} does not exist yet, async refreshing.", cacheKey.asgName);
// Only do an async refresh if it does not yet exist. Do this to refrain from calling aws api too much
asgCache.refresh(cacheKey);
Expand Down Expand Up @@ -371,6 +383,11 @@ private TimerTask getASGUpdateTask() {
@Override
public void run() {
try {
if (!serverConfig.shouldUseAwsAsgApi()) {
// Disabled via the config, no-op.
return;
}

// First get the active ASG names
Set<CacheKey> cacheKeys = getCacheKeys();
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.eureka.aws.AwsAsgUtil;
import com.netflix.eureka.aws.AsgClient;
import com.netflix.eureka.lease.Lease;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -16,17 +17,17 @@
public class AsgEnabledRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(AsgEnabledRule.class);

private final AwsAsgUtil awsAsgUtil;
private final AsgClient asgClient;

public AsgEnabledRule(AwsAsgUtil awsAsgUtil) {
this.awsAsgUtil = awsAsgUtil;
public AsgEnabledRule(AsgClient asgClient) {
this.asgClient = asgClient;
}

@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
// If the ASGName is present- check for its status
if (instanceInfo.getASGName() != null) {
boolean isASGDisabled = !awsAsgUtil.isASGEnabled(instanceInfo);
boolean isASGDisabled = !asgClient.isASGEnabled(instanceInfo);
logger.debug("The ASG name is specified {} and the value is {}", instanceInfo.getASGName(), isASGDisabled);
if (isASGDisabled) {
return StatusOverrideResult.matchingStatus(InstanceStatus.OUT_OF_SERVICE);
Expand Down

0 comments on commit f660f78

Please sign in to comment.