Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(provider/kubernetes): Don't poll immediately after cache refresh #2871

Merged
merged 4 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,66 +108,68 @@ public TaskResult execute(@Nonnull Stage stage) {
StageData stageData = fromStage(stage);
stageData.deployedManifests = getDeployedManifests(stage);

if (refreshManifests(cloudProvider, stageData)) {
checkPendingRefreshes(cloudProvider, stageData, startTime);

refreshManifests(cloudProvider, stageData);

if (allManifestsProcessed(stageData)) {
registry.timer(durationTimerId.withTags("success", "true", "outcome", "complete"))
.record(duration, TimeUnit.MILLISECONDS);
return new TaskResult(SUCCEEDED, toContext(stageData));
} else {
TaskResult taskResult = checkPendingRefreshes(cloudProvider, stageData, startTime);

// ignoring any non-success, non-failure statuses
if (taskResult.getStatus().isSuccessful()) {
registry.timer(durationTimerId.withTags("success", "true", "outcome", "complete"))
.record(duration, TimeUnit.MILLISECONDS);
} else if (taskResult.getStatus().isFailure()) {
registry.timer(durationTimerId.withTags("success", "false", "outcome", "failure"))
.record(duration, TimeUnit.MILLISECONDS);
}
return taskResult;
}

return new TaskResult(RUNNING, toContext(stageData));
}

private TaskResult checkPendingRefreshes(String provider, StageData stageData, long startTime) {
Collection<PendingRefresh> pendingRefreshes = objectMapper.convertValue(
cacheStatusService.pendingForceCacheUpdates(provider, REFRESH_TYPE),
new TypeReference<Collection<PendingRefresh>>() { }
);
/**
* Checks whether all manifests deployed in the stage have been processed by the cache
* @return true if all manifests have been processed
*/
private boolean allManifestsProcessed(StageData stageData) {
return stageData.getProcessedManifests().containsAll(stageData.getDeployedManifests());
}

List<ScopedManifest> deployedManifests = stageData.getDeployedManifests();
/**
* Checks on the status of any pending on-demand cache refreshes. If a pending refresh has been processed, adds the
* corresponding manifest to processedManifests; if a pending refresh is not found in clouddriver or is invalid,
* removes the corresponding manifest from refreshedManifests
*/
private void checkPendingRefreshes(String provider, StageData stageData, long startTime) {
Set<ScopedManifest> refreshedManifests = stageData.getRefreshedManifests();
Set<ScopedManifest> processedManifests = stageData.getProcessedManifests();
boolean allProcessed = true;

for (ScopedManifest manifest : deployedManifests) {
if (processedManifests.contains(manifest)) {
continue;
}
List<ScopedManifest> manifestsToCheck = refreshedManifests.stream()
.filter(m -> !processedManifests.contains(m))
.collect(Collectors.toList());

Optional<RefreshStatus> refreshStatus = pendingRefreshes.stream()
if (manifestsToCheck.isEmpty()) {
return;
}

Collection<PendingRefresh> pendingRefreshes = objectMapper.convertValue(
cacheStatusService.pendingForceCacheUpdates(provider, REFRESH_TYPE),
new TypeReference<Collection<PendingRefresh>>() { }
);

for (ScopedManifest manifest : manifestsToCheck) {
RefreshStatus refreshStatus = pendingRefreshes.stream()
.filter(pr -> pr.getScopedManifest() != null)
.filter(pr -> refreshMatches(pr.getScopedManifest(), manifest))
.map(pr -> getRefreshStatus(pr, startTime))
.filter(status -> status != RefreshStatus.INVALID)
.sorted()
.findFirst();

if (refreshStatus.isPresent()) {
RefreshStatus status = refreshStatus.get();
if (status == RefreshStatus.PROCESSED) {
log.debug("Pending manifest refresh of {} completed", manifest);
processedManifests.add(manifest);
} else if (status == RefreshStatus.PENDING) {
log.debug("Pending manifest refresh of {} still pending", manifest);
allProcessed = false;
}
.findFirst()
.orElse(RefreshStatus.INVALID);

if (refreshStatus == RefreshStatus.PROCESSED) {
log.debug("Pending manifest refresh of {} completed", manifest);
processedManifests.add(manifest);
} else if (refreshStatus == RefreshStatus.PENDING) {
log.debug("Pending manifest refresh of {} still pending", manifest);
} else {
log.warn("No valid pending refresh of {}", manifest);
allProcessed = false;
refreshedManifests.remove(manifest);
}
}

return new TaskResult(allProcessed ? SUCCEEDED : RUNNING, toContext(stageData));
}

private boolean refreshMatches(ScopedManifest refresh, ScopedManifest manifest) {
Expand Down Expand Up @@ -212,38 +214,29 @@ private List<ScopedManifest> getDeployedManifests(Stage stage) {
.collect(Collectors.toList());
}

private boolean refreshManifests(String provider, StageData stageData) {
/**
* Requests and on-demand cache refresh for any manifest without an refresh requests that is either pending or
ezimanyi marked this conversation as resolved.
Show resolved Hide resolved
* processed. Adds each manifest to refreshedManifests; if the request to clouddriver was immediately processed,
* also adds the manifest to processedManifests.
*/
private void refreshManifests(String provider, StageData stageData) {
List<ScopedManifest> manifests = manifestsNeedingRefresh(stageData);

boolean allRefreshesSucceeded = true;
for (ScopedManifest manifest : manifests) {
Map<String, String> request = objectMapper.convertValue(manifest, new TypeReference<Map<String, String>>() {});
try {
Response response = cacheService.forceCacheUpdate(provider, REFRESH_TYPE, request);
if (response.getStatus() == HTTP_OK) {
log.info("Refresh of {} succeeded immediately", manifest);
stageData.getProcessedManifests().add(manifest);
} else {
allRefreshesSucceeded = false;
}

stageData.getRefreshedManifests().add(manifest);
} catch (Exception e) {
log.warn("Failed to refresh {}: ", manifest, e);
allRefreshesSucceeded = false;
stageData.errors.add(e.getMessage());
}
}

boolean allRefreshesProcessed = stageData.getRefreshedManifests().equals(stageData.getProcessedManifests());

// This can happen when the prior execution of this task returned RUNNING because one or more manifests
// were not processed. In this case, all manifests may have been refreshed successfully without finishing processing.
if (allRefreshesSucceeded && !allRefreshesProcessed) {
log.warn("All refreshes succeeded, but not all have been processed yet...");
}

return allRefreshesSucceeded && allRefreshesProcessed;
}

private StageData fromStage(Stage stage) {
Expand Down
Loading