From 93c8b79bc35a2638657c728776414d590600cc02 Mon Sep 17 00:00:00 2001 From: Adam Jordens Date: Wed, 22 Nov 2017 10:52:08 -0800 Subject: [PATCH] fix(provider/aws): Reservation caching agent, now with less RxJava (#2180) --- .../ReservationReportCachingAgent.groovy | 65 +++++++++++-------- .../provider/config/AwsProviderConfig.groovy | 15 ++--- .../ReservationReportCachingAgentSpec.groovy | 6 +- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgent.groovy b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgent.groovy index 2a4c1056dfa..79cd7a8d076 100644 --- a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgent.groovy +++ b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgent.groovy @@ -50,11 +50,11 @@ import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent import groovy.util.logging.Slf4j import org.springframework.context.ApplicationContext -import rx.Observable -import rx.Scheduler import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.function.ToDoubleFunction @@ -72,7 +72,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen AUTHORITATIVE.forType(RESERVATION_REPORTS.ns) ]) - private final Scheduler scheduler + private final ExecutorService reservationReportPool private final ApplicationContext ctx private Cache cacheView @@ -88,7 +88,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen AmazonClientProvider amazonClientProvider, Collection accounts, ObjectMapper objectMapper, - Scheduler scheduler, + ExecutorService reservationReportPool, ApplicationContext ctx) { this.amazonClientProvider = amazonClientProvider this.accounts = accounts @@ -98,7 +98,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen module.addSerializer(AmazonReservationReport.AccountReservationDetail.class, accountReservationDetailSerializer) this.objectMapper = objectMapper.copy().enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).registerModule(module) - this.scheduler = scheduler + this.reservationReportPool = reservationReportPool this.ctx = ctx this.vpcOnlyAccounts = determineVpcOnlyAccounts() this.metricsSupport = new MetricsSupport(objectMapper, registry, { getCacheView() }) @@ -178,15 +178,22 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen ConcurrentHashMap reservations = new ConcurrentHashMap<>() ConcurrentHashMap> errorsByRegion = new ConcurrentHashMap<>() - Observable - .from(accounts.sort { it.name }) - .flatMap({ credential -> - extractReservations(reservations, errorsByRegion, credential).subscribeOn(scheduler) - }) - .observeOn(scheduler) - .toList() - .toBlocking() - .single() + + Map tasks = accounts.collectEntries { NetflixAmazonCredentials credential -> + [ + (credential) : reservationReportPool.submit { + extractReservations(reservations, errorsByRegion, credential) + } + ] + } + + tasks.each { + try { + it.value.get() + } catch (Exception e) { + recordError(registry, errorsByRegion, it.key, "*", e) + } + } def amazonReservationReport = new AmazonReservationReport(start: new Date(startTime), end: new Date()) accounts.each { NetflixAmazonCredentials credentials -> @@ -246,9 +253,9 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen ) } - Observable extractReservations(ConcurrentHashMap reservations, - ConcurrentHashMap> errorsByRegion, - NetflixAmazonCredentials credentials) { + void extractReservations(ConcurrentHashMap reservations, + ConcurrentHashMap> errorsByRegion, + NetflixAmazonCredentials credentials) { def getReservation = { String region, String availabilityZone, String operatingSystemType, String instanceType -> String key = availabilityZone == null ? [region, operatingSystemType, instanceType].join(':') : @@ -269,9 +276,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen return newOverallReservationDetail } - Observable - .from(credentials.regions) - .flatMap({ AmazonCredentials.AWSRegion region -> + credentials.regions.each { AmazonCredentials.AWSRegion region -> log.info("Fetching reservation report for ${credentials.name}:${region.name}") long startTime = System.currentTimeMillis() @@ -302,6 +307,8 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen } startTime = System.currentTimeMillis() + + def fetchedInstanceCount = 0 def describeInstancesRequest = new DescribeInstancesRequest().withMaxResults(500) def allowedStates = ["pending", "running"] as Set while (true) { @@ -322,8 +329,12 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen reservation.getAccount(credentials.name).used.incrementAndGet() } } + + fetchedInstanceCount += it.getInstances().size() } + log.debug("Fetched ${fetchedInstanceCount} instances in ${credentials.name}/${region.name} (nextToken: ${result.nextToken})") + if (result.nextToken) { describeInstancesRequest.withNextToken(result.nextToken) } else { @@ -331,32 +342,30 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen } } } catch (Exception e) { - recordError(registry, errorsByRegion, credentials, region, e) + recordError(registry, errorsByRegion, credentials, region.name, e) } log.debug("Took ${System.currentTimeMillis() - startTime}ms to describe instances for ${credentials.name}/${region.name}") - - return Observable.empty() - }) + } } static void recordError(Registry registry, ConcurrentHashMap> errorsByRegion, NetflixAmazonCredentials credentials, - AmazonCredentials.AWSRegion region, + String region, Exception e) { - def errorMessage = "Failed to describe instances in ${credentials.name}:${region.name}, reason: ${e.message}" as String + def errorMessage = "Failed to describe instances in ${credentials.name}:${region}, reason: ${e.message}" as String log.error(errorMessage, e) def errors = new CopyOnWriteArrayList([errorMessage]) - def previousValue = errorsByRegion.putIfAbsent(region.name, errors) + def previousValue = errorsByRegion.putIfAbsent(region, errors) if (previousValue != null) { previousValue.add(errorMessage) } def id = registry.createId("reservedInstances.errors").withTags([ - region : region.name, + region : region, account: credentials.name ]) registry.counter(id).increment() diff --git a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/config/AwsProviderConfig.groovy b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/config/AwsProviderConfig.groovy index 8243375cd77..361e8725a3f 100644 --- a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/config/AwsProviderConfig.groovy +++ b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/provider/config/AwsProviderConfig.groovy @@ -50,10 +50,9 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.DependsOn import org.springframework.context.annotation.Scope -import rx.Scheduler -import rx.schedulers.Schedulers import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @Configuration @@ -68,7 +67,7 @@ class AwsProviderConfig { EddaApiFactory eddaApiFactory, ApplicationContext ctx, Registry registry, - Scheduler reservationReportScheduler, + ExecutorService reservationReportPool, Optional> agentProviders, EddaTimeoutConfig eddaTimeoutConfig) { def awsProvider = @@ -82,7 +81,7 @@ class AwsProviderConfig { eddaApiFactory, ctx, registry, - reservationReportScheduler, + reservationReportPool, agentProviders.orElse(Collections.emptyList()), eddaTimeoutConfig) @@ -90,8 +89,8 @@ class AwsProviderConfig { } @Bean - Scheduler reservationReportScheduler(ReservationReportConfigurationProperties reservationReportConfigurationProperties) { - return Schedulers.from(Executors.newFixedThreadPool(reservationReportConfigurationProperties.threadPoolSize)) + ExecutorService reservationReportPool(ReservationReportConfigurationProperties reservationReportConfigurationProperties) { + return Executors.newFixedThreadPool(reservationReportConfigurationProperties.threadPoolSize) } @Bean @@ -118,7 +117,7 @@ class AwsProviderConfig { EddaApiFactory eddaApiFactory, ApplicationContext ctx, Registry registry, - Scheduler reservationReportScheduler, + ExecutorService reservationReportPool, Collection agentProviders, EddaTimeoutConfig eddaTimeoutConfig) { def scheduledAccounts = ProviderUtils.getScheduledAccounts(awsProvider) @@ -164,7 +163,7 @@ class AwsProviderConfig { } else { // This caching agent runs across all accounts in one iteration (to maintain consistency). newlyAddedAgents << new ReservationReportCachingAgent( - registry, amazonClientProvider, allAccounts, objectMapper, reservationReportScheduler, ctx + registry, amazonClientProvider, allAccounts, objectMapper, reservationReportPool, ctx ) } diff --git a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgentSpec.groovy b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgentSpec.groovy index 37954d1d0da..def70741122 100644 --- a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgentSpec.groovy +++ b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/provider/agent/ReservationReportCachingAgentSpec.groovy @@ -47,9 +47,9 @@ class ReservationReportCachingAgentSpec extends Specification { } when: - ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1, new NullPointerException("An NPE a day keeps the chaos monkey away")) - ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1, new NullPointerException("This is not right!")) - ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest2, new NullPointerException("This is even better!")) + ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1.name, new NullPointerException("An NPE a day keeps the chaos monkey away")) + ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1.name, new NullPointerException("This is not right!")) + ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest2.name, new NullPointerException("This is even better!")) then: errorsByRegion["us-west-1"] == [