Skip to content

Commit

Permalink
fix(provider/aws): Reservation caching agent, now with less RxJava (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajordens authored Nov 22, 2017
1 parent 9632a88 commit 93c8b79
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials
import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent
import groovy.util.logging.Slf4j
import org.springframework.context.ApplicationContext
import rx.Observable
import rx.Scheduler

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.ToDoubleFunction
Expand All @@ -72,7 +72,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
AUTHORITATIVE.forType(RESERVATION_REPORTS.ns)
])

private final Scheduler scheduler
private final ExecutorService reservationReportPool
private final ApplicationContext ctx
private Cache cacheView

Expand All @@ -88,7 +88,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
AmazonClientProvider amazonClientProvider,
Collection<NetflixAmazonCredentials> accounts,
ObjectMapper objectMapper,
Scheduler scheduler,
ExecutorService reservationReportPool,
ApplicationContext ctx) {
this.amazonClientProvider = amazonClientProvider
this.accounts = accounts
Expand All @@ -98,7 +98,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
module.addSerializer(AmazonReservationReport.AccountReservationDetail.class, accountReservationDetailSerializer)

this.objectMapper = objectMapper.copy().enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).registerModule(module)
this.scheduler = scheduler
this.reservationReportPool = reservationReportPool
this.ctx = ctx
this.vpcOnlyAccounts = determineVpcOnlyAccounts()
this.metricsSupport = new MetricsSupport(objectMapper, registry, { getCacheView() })
Expand Down Expand Up @@ -178,15 +178,22 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen

ConcurrentHashMap<String, OverallReservationDetail> reservations = new ConcurrentHashMap<>()
ConcurrentHashMap<String, Collection<String>> errorsByRegion = new ConcurrentHashMap<>()
Observable
.from(accounts.sort { it.name })
.flatMap({ credential ->
extractReservations(reservations, errorsByRegion, credential).subscribeOn(scheduler)
})
.observeOn(scheduler)
.toList()
.toBlocking()
.single()

Map<NetflixAmazonCredentials, Future> tasks = accounts.collectEntries { NetflixAmazonCredentials credential ->
[
(credential) : reservationReportPool.submit {
extractReservations(reservations, errorsByRegion, credential)
}
]
}

tasks.each {
try {
it.value.get()
} catch (Exception e) {
recordError(registry, errorsByRegion, it.key, "*", e)
}
}

def amazonReservationReport = new AmazonReservationReport(start: new Date(startTime), end: new Date())
accounts.each { NetflixAmazonCredentials credentials ->
Expand Down Expand Up @@ -246,9 +253,9 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
)
}

Observable extractReservations(ConcurrentHashMap<String, OverallReservationDetail> reservations,
ConcurrentHashMap<String, Collection<String>> errorsByRegion,
NetflixAmazonCredentials credentials) {
void extractReservations(ConcurrentHashMap<String, OverallReservationDetail> reservations,
ConcurrentHashMap<String, Collection<String>> errorsByRegion,
NetflixAmazonCredentials credentials) {
def getReservation = { String region, String availabilityZone, String operatingSystemType, String instanceType ->
String key = availabilityZone == null ?
[region, operatingSystemType, instanceType].join(':') :
Expand All @@ -269,9 +276,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
return newOverallReservationDetail
}

Observable
.from(credentials.regions)
.flatMap({ AmazonCredentials.AWSRegion region ->
credentials.regions.each { AmazonCredentials.AWSRegion region ->
log.info("Fetching reservation report for ${credentials.name}:${region.name}")
long startTime = System.currentTimeMillis()

Expand Down Expand Up @@ -302,6 +307,8 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
}

startTime = System.currentTimeMillis()

def fetchedInstanceCount = 0
def describeInstancesRequest = new DescribeInstancesRequest().withMaxResults(500)
def allowedStates = ["pending", "running"] as Set<String>
while (true) {
Expand All @@ -322,41 +329,43 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
reservation.getAccount(credentials.name).used.incrementAndGet()
}
}

fetchedInstanceCount += it.getInstances().size()
}

log.debug("Fetched ${fetchedInstanceCount} instances in ${credentials.name}/${region.name} (nextToken: ${result.nextToken})")

if (result.nextToken) {
describeInstancesRequest.withNextToken(result.nextToken)
} else {
break
}
}
} catch (Exception e) {
recordError(registry, errorsByRegion, credentials, region, e)
recordError(registry, errorsByRegion, credentials, region.name, e)
}

log.debug("Took ${System.currentTimeMillis() - startTime}ms to describe instances for ${credentials.name}/${region.name}")

return Observable.empty()
})
}
}

static void recordError(Registry registry,
ConcurrentHashMap<String, Collection<String>> errorsByRegion,
NetflixAmazonCredentials credentials,
AmazonCredentials.AWSRegion region,
String region,
Exception e) {
def errorMessage = "Failed to describe instances in ${credentials.name}:${region.name}, reason: ${e.message}" as String
def errorMessage = "Failed to describe instances in ${credentials.name}:${region}, reason: ${e.message}" as String
log.error(errorMessage, e)

def errors = new CopyOnWriteArrayList([errorMessage])

def previousValue = errorsByRegion.putIfAbsent(region.name, errors)
def previousValue = errorsByRegion.putIfAbsent(region, errors)
if (previousValue != null) {
previousValue.add(errorMessage)
}

def id = registry.createId("reservedInstances.errors").withTags([
region : region.name,
region : region,
account: credentials.name
])
registry.counter(id).increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.DependsOn
import org.springframework.context.annotation.Scope
import rx.Scheduler
import rx.schedulers.Schedulers

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

@Configuration
Expand All @@ -68,7 +67,7 @@ class AwsProviderConfig {
EddaApiFactory eddaApiFactory,
ApplicationContext ctx,
Registry registry,
Scheduler reservationReportScheduler,
ExecutorService reservationReportPool,
Optional<Collection<AgentProvider>> agentProviders,
EddaTimeoutConfig eddaTimeoutConfig) {
def awsProvider =
Expand All @@ -82,16 +81,16 @@ class AwsProviderConfig {
eddaApiFactory,
ctx,
registry,
reservationReportScheduler,
reservationReportPool,
agentProviders.orElse(Collections.emptyList()),
eddaTimeoutConfig)

awsProvider
}

@Bean
Scheduler reservationReportScheduler(ReservationReportConfigurationProperties reservationReportConfigurationProperties) {
return Schedulers.from(Executors.newFixedThreadPool(reservationReportConfigurationProperties.threadPoolSize))
ExecutorService reservationReportPool(ReservationReportConfigurationProperties reservationReportConfigurationProperties) {
return Executors.newFixedThreadPool(reservationReportConfigurationProperties.threadPoolSize)
}

@Bean
Expand All @@ -118,7 +117,7 @@ class AwsProviderConfig {
EddaApiFactory eddaApiFactory,
ApplicationContext ctx,
Registry registry,
Scheduler reservationReportScheduler,
ExecutorService reservationReportPool,
Collection<AgentProvider> agentProviders,
EddaTimeoutConfig eddaTimeoutConfig) {
def scheduledAccounts = ProviderUtils.getScheduledAccounts(awsProvider)
Expand Down Expand Up @@ -164,7 +163,7 @@ class AwsProviderConfig {
} else {
// This caching agent runs across all accounts in one iteration (to maintain consistency).
newlyAddedAgents << new ReservationReportCachingAgent(
registry, amazonClientProvider, allAccounts, objectMapper, reservationReportScheduler, ctx
registry, amazonClientProvider, allAccounts, objectMapper, reservationReportPool, ctx
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class ReservationReportCachingAgentSpec extends Specification {
}

when:
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1, new NullPointerException("An NPE a day keeps the chaos monkey away"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1, new NullPointerException("This is not right!"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest2, new NullPointerException("This is even better!"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1.name, new NullPointerException("An NPE a day keeps the chaos monkey away"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1.name, new NullPointerException("This is not right!"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest2.name, new NullPointerException("This is even better!"))

then:
errorsByRegion["us-west-1"] == [
Expand Down

0 comments on commit 93c8b79

Please sign in to comment.