Skip to content

Commit

Permalink
fix(timeout): stage timeout overrides cumulative task durations
Browse files Browse the repository at this point in the history
  • Loading branch information
emjburns committed Sep 11, 2017
1 parent 7b7733c commit 37afcac
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,9 @@ public void resolveStrategyParams() {
}

/**
* Returns the top-most stage timeout value if present.
* Returns the top-most stage.
*/
@JsonIgnore public Optional<Long> getTopLevelTimeout() {
private Stage<T> getTopLevelStage() {
Stage<T> topLevelStage = this;
while (topLevelStage.parentStageId != null) {
String sid = topLevelStage.parentStageId;
Expand All @@ -408,6 +408,14 @@ public void resolveStrategyParams() {
throw new IllegalStateException("Could not find stage by parentStageId (stage: " + topLevelStage.getId() + ", parentStageId:" + sid + ")");
}
}
return topLevelStage;
}

/**
* Returns the top-most stage timeout value if present.
*/
@JsonIgnore public Optional<Long> getTopLevelTimeout() {
Stage<T> topLevelStage = getTopLevelStage();
Object timeout = topLevelStage.getContext().get("stageTimeoutMs");
if (timeout instanceof Integer) {
return Optional.of((Integer) timeout).map(Long::new);
Expand All @@ -419,6 +427,14 @@ public void resolveStrategyParams() {
return Optional.empty();
}

/**
* Returns the top-most stage start time.
*/
@JsonIgnore public Long getTopLevelStartTime() {
Stage<T> topLevelStage = getTopLevelStage();
return topLevelStage.getStartTime();
}

public static class LastModifiedDetails implements Serializable {
private String user;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,22 @@ class RunTaskHandler(
else -> Duration.ofSeconds(1)
}

private fun Task.formatTimeout(timeout: Long): String {
private fun formatTimeout(timeout: Long): String {
return DurationFormatUtils.formatDurationWords(timeout, true, true)
}

private fun Task.checkForTimeout(stage: Stage<*>, taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, message: Message) {
checkForStageTimeout(stage)
checkForTaskTimeout(taskModel, stage, message)
}

private fun Task.checkForTaskTimeout(taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, stage: Stage<*>, message: Message) {
if (this is RetryableTask) {
val startTime = taskModel.startTime.toInstant()
val pausedDuration = stage.getExecution().pausedDurationRelativeTo(startTime)
val throttleTime = message.getAttribute<TotalThrottleTimeAttribute>()?.totalThrottleTimeMs ?: 0
val elapsedTime = Duration.between(startTime, clock.instant())
if (elapsedTime.minus(pausedDuration).minusMillis(throttleTime) > timeoutDuration(stage)) {
val throttleTime = message.getAttribute<TotalThrottleTimeAttribute>()?.totalThrottleTimeMs ?: 0
if (elapsedTime.minus(pausedDuration).minusMillis(throttleTime) > timeout.toDuration()) {
val durationString = formatTimeout(elapsedTime.toMillis())
val msg = StringBuilder("${javaClass.simpleName} of stage ${stage.getName()} timed out after $durationString. ")
msg.append("pausedDuration: ${formatTimeout(pausedDuration.toMillis())}, ")
Expand All @@ -187,8 +192,16 @@ class RunTaskHandler(
}
}

private fun RetryableTask.timeoutDuration(stage: Stage<*>): Duration
= stage.getTopLevelTimeout().orElse(timeout).toDuration()
private fun checkForStageTimeout(stage: Stage<*>) {
stage.getTopLevelTimeout().map(Duration::ofMillis).ifPresent({
val startTime = stage.getTopLevelStartTime().toInstant()
val elapsedTime = Duration.between(startTime, clock.instant())
val pausedDuration = stage.getExecution().pausedDurationRelativeTo(startTime)
if (elapsedTime.minus(pausedDuration) > it) {
throw TimeoutException("Stage ${stage.getName()} timed out after ${formatTimeout(elapsedTime.toMillis())}")
}
})
}


private fun Execution<*>.pausedDurationRelativeTo(instant: Instant?): Duration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,20 +742,22 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
val timeoutOverride = Duration.ofMinutes(10)

timeoutOverride.toMillis().let { listOf(it.toInt(), it, it.toDouble()) }.forEach { stageTimeoutMs ->
and("the override is a ${stageTimeoutMs.javaClass.simpleName}") {and("the task is between the default and overridden duration") {
val pipeline = pipeline {
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
task {
id = "1"

status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
and("the override is a ${stageTimeoutMs.javaClass.simpleName}") {
and("the task is between the default and overridden duration") {
val pipeline = pipeline {
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
task {
id = "1"

status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() - 1).toEpochMilli()
}
}
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
Expand All @@ -773,38 +775,154 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}
}

and("the timeout override has been exceeded") {
val pipeline = pipeline {
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
task {
id = "1"

status = RUNNING
startTime = clock.instant().minusMillis(timeoutOverride.toMillis() + 1).toEpochMilli()
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
and("the timeout override has been exceeded by stage") {
and("the stage has never been paused") {
val pipeline = pipeline {
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
startTime = clock.instant().minusMillis(timeoutOverride.toMillis() + 1).toEpochMilli()
task {
id = "1"

status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() - 1).toEpochMilli()
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

afterGroup(::resetMocks)

on("receiving $message") {
subject.handle(message)
}

it("fails the task") {
verify(queue).push(CompleteTask(message, TERMINAL))
}

it("does not execute the task") {
verify(task, never()).execute(any())
}
}

afterGroup(::resetMocks)

on("receiving $message") {
subject.handle(message)
and("the execution had been paused") {
val pipeline = pipeline {
paused = PausedDetails().apply {
pauseTime = clock.instant().minus(Minutes.of(3)).toEpochMilli()
resumeTime = clock.instant().minus(Minutes.of(2)).toEpochMilli()
}
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
startTime = clock.instant().minusMillis(timeoutOverride.toMillis() + 1).toEpochMilli()
task {
id = "1"
status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() - 1).toEpochMilli()
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("executes the task") {
verify(task).execute(any())
}
}

it("fails the task") {
verify(queue).push(CompleteTask(message, TERMINAL))
and("the execution had been paused but is timed out anyway") {
val pipeline = pipeline {
paused = PausedDetails().apply {
pauseTime = clock.instant().minus(Minutes.of(3)).toEpochMilli()
resumeTime = clock.instant().minus(Minutes.of(2)).toEpochMilli()
}
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
startTime = clock.instant().minusMillis(timeoutOverride.plusMinutes(1).toMillis() + 1).toEpochMilli()
task {
id = "1"
status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() - 1).toEpochMilli()
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("fails the task") {
verify(queue).push(CompleteTask(message, TERMINAL))
}

it("does not execute the task") {
verify(task, never()).execute(any())
}
}

it("does not execute the task") {
verify(task, never()).execute(any())
and("the execution had been paused but only before this stage started running") {
val pipeline = pipeline {
paused = PausedDetails().apply {
pauseTime = clock.instant().minus(Minutes.of(15)).toEpochMilli()
resumeTime = clock.instant().minus(Minutes.of(14)).toEpochMilli()
}
stage {
type = "whatever"
context["stageTimeoutMs"] = stageTimeoutMs
startTime = clock.instant().minusMillis(timeoutOverride.toMillis() + 1).toEpochMilli()
task {
id = "1"
status = RUNNING
startTime = clock.instant().minusMillis(timeout.toMillis() - 1).toEpochMilli()
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("fails the task") {
verify(queue).push(CompleteTask(message, TERMINAL))
}

it("does not execute the task") {
verify(task, never()).execute(any())
}
}
}
}
Expand Down

0 comments on commit 37afcac

Please sign in to comment.