Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(provider/aws): Reservation caching agent, now with less RxJava #2180

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials
import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent
import groovy.util.logging.Slf4j
import org.springframework.context.ApplicationContext
import rx.Observable
import rx.Scheduler

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.ToDoubleFunction
Expand All @@ -72,7 +72,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
AUTHORITATIVE.forType(RESERVATION_REPORTS.ns)
])

private final Scheduler scheduler
private final ExecutorService reservationReportPool
private final ApplicationContext ctx
private Cache cacheView

Expand All @@ -88,7 +88,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
AmazonClientProvider amazonClientProvider,
Collection<NetflixAmazonCredentials> accounts,
ObjectMapper objectMapper,
Scheduler scheduler,
ExecutorService reservationReportPool,
ApplicationContext ctx) {
this.amazonClientProvider = amazonClientProvider
this.accounts = accounts
Expand All @@ -98,7 +98,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
module.addSerializer(AmazonReservationReport.AccountReservationDetail.class, accountReservationDetailSerializer)

this.objectMapper = objectMapper.copy().enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).registerModule(module)
this.scheduler = scheduler
this.reservationReportPool = reservationReportPool
this.ctx = ctx
this.vpcOnlyAccounts = determineVpcOnlyAccounts()
this.metricsSupport = new MetricsSupport(objectMapper, registry, { getCacheView() })
Expand Down Expand Up @@ -178,15 +178,22 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen

ConcurrentHashMap<String, OverallReservationDetail> reservations = new ConcurrentHashMap<>()
ConcurrentHashMap<String, Collection<String>> errorsByRegion = new ConcurrentHashMap<>()
Observable
.from(accounts.sort { it.name })
.flatMap({ credential ->
extractReservations(reservations, errorsByRegion, credential).subscribeOn(scheduler)
})
.observeOn(scheduler)
.toList()
.toBlocking()
.single()

Map<NetflixAmazonCredentials, Future> tasks = accounts.collectEntries { NetflixAmazonCredentials credential ->
[
(credential) : reservationReportPool.submit {
extractReservations(reservations, errorsByRegion, credential)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my rx is bad enough that I can't tell whether the previous version would have used a thread per region in each credential or not but either way I don't think it matters.

}
]
}

tasks.each {
try {
it.value.get()
} catch (Exception e) {
recordError(registry, errorsByRegion, it.key, "*", e)
}
}

def amazonReservationReport = new AmazonReservationReport(start: new Date(startTime), end: new Date())
accounts.each { NetflixAmazonCredentials credentials ->
Expand Down Expand Up @@ -246,9 +253,9 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
)
}

Observable extractReservations(ConcurrentHashMap<String, OverallReservationDetail> reservations,
ConcurrentHashMap<String, Collection<String>> errorsByRegion,
NetflixAmazonCredentials credentials) {
void extractReservations(ConcurrentHashMap<String, OverallReservationDetail> reservations,
ConcurrentHashMap<String, Collection<String>> errorsByRegion,
NetflixAmazonCredentials credentials) {
def getReservation = { String region, String availabilityZone, String operatingSystemType, String instanceType ->
String key = availabilityZone == null ?
[region, operatingSystemType, instanceType].join(':') :
Expand All @@ -269,9 +276,7 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
return newOverallReservationDetail
}

Observable
.from(credentials.regions)
.flatMap({ AmazonCredentials.AWSRegion region ->
credentials.regions.each { AmazonCredentials.AWSRegion region ->
log.info("Fetching reservation report for ${credentials.name}:${region.name}")
long startTime = System.currentTimeMillis()

Expand Down Expand Up @@ -302,6 +307,8 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
}

startTime = System.currentTimeMillis()

def fetchedInstanceCount = 0
def describeInstancesRequest = new DescribeInstancesRequest().withMaxResults(500)
def allowedStates = ["pending", "running"] as Set<String>
while (true) {
Expand All @@ -322,41 +329,43 @@ class ReservationReportCachingAgent implements CachingAgent, CustomScheduledAgen
reservation.getAccount(credentials.name).used.incrementAndGet()
}
}

fetchedInstanceCount += it.getInstances().size()
}

log.debug("Fetched ${fetchedInstanceCount} instances in ${credentials.name}/${region.name} (nextToken: ${result.nextToken})")

if (result.nextToken) {
describeInstancesRequest.withNextToken(result.nextToken)
} else {
break
}
}
} catch (Exception e) {
recordError(registry, errorsByRegion, credentials, region, e)
recordError(registry, errorsByRegion, credentials, region.name, e)
}

log.debug("Took ${System.currentTimeMillis() - startTime}ms to describe instances for ${credentials.name}/${region.name}")

return Observable.empty()
})
}
}

static void recordError(Registry registry,
ConcurrentHashMap<String, Collection<String>> errorsByRegion,
NetflixAmazonCredentials credentials,
AmazonCredentials.AWSRegion region,
String region,
Exception e) {
def errorMessage = "Failed to describe instances in ${credentials.name}:${region.name}, reason: ${e.message}" as String
def errorMessage = "Failed to describe instances in ${credentials.name}:${region}, reason: ${e.message}" as String
log.error(errorMessage, e)

def errors = new CopyOnWriteArrayList([errorMessage])

def previousValue = errorsByRegion.putIfAbsent(region.name, errors)
def previousValue = errorsByRegion.putIfAbsent(region, errors)
if (previousValue != null) {
previousValue.add(errorMessage)
}

def id = registry.createId("reservedInstances.errors").withTags([
region : region.name,
region : region,
account: credentials.name
])
registry.counter(id).increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.DependsOn
import org.springframework.context.annotation.Scope
import rx.Scheduler
import rx.schedulers.Schedulers

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

@Configuration
Expand All @@ -68,7 +67,7 @@ class AwsProviderConfig {
EddaApiFactory eddaApiFactory,
ApplicationContext ctx,
Registry registry,
Scheduler reservationReportScheduler,
ExecutorService reservationReportPool,
Optional<Collection<AgentProvider>> agentProviders,
EddaTimeoutConfig eddaTimeoutConfig) {
def awsProvider =
Expand All @@ -82,16 +81,16 @@ class AwsProviderConfig {
eddaApiFactory,
ctx,
registry,
reservationReportScheduler,
reservationReportPool,
agentProviders.orElse(Collections.emptyList()),
eddaTimeoutConfig)

awsProvider
}

@Bean
Scheduler reservationReportScheduler(ReservationReportConfigurationProperties reservationReportConfigurationProperties) {
return Schedulers.from(Executors.newFixedThreadPool(reservationReportConfigurationProperties.threadPoolSize))
ExecutorService reservationReportPool(ReservationReportConfigurationProperties reservationReportConfigurationProperties) {
return Executors.newFixedThreadPool(reservationReportConfigurationProperties.threadPoolSize)
}

@Bean
Expand All @@ -118,7 +117,7 @@ class AwsProviderConfig {
EddaApiFactory eddaApiFactory,
ApplicationContext ctx,
Registry registry,
Scheduler reservationReportScheduler,
ExecutorService reservationReportPool,
Collection<AgentProvider> agentProviders,
EddaTimeoutConfig eddaTimeoutConfig) {
def scheduledAccounts = ProviderUtils.getScheduledAccounts(awsProvider)
Expand Down Expand Up @@ -164,7 +163,7 @@ class AwsProviderConfig {
} else {
// This caching agent runs across all accounts in one iteration (to maintain consistency).
newlyAddedAgents << new ReservationReportCachingAgent(
registry, amazonClientProvider, allAccounts, objectMapper, reservationReportScheduler, ctx
registry, amazonClientProvider, allAccounts, objectMapper, reservationReportPool, ctx
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class ReservationReportCachingAgentSpec extends Specification {
}

when:
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1, new NullPointerException("An NPE a day keeps the chaos monkey away"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1, new NullPointerException("This is not right!"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest2, new NullPointerException("This is even better!"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1.name, new NullPointerException("An NPE a day keeps the chaos monkey away"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest1.name, new NullPointerException("This is not right!"))
ReservationReportCachingAgent.recordError(registry, errorsByRegion, credentials, usWest2.name, new NullPointerException("This is even better!"))

then:
errorsByRegion["us-west-1"] == [
Expand Down