Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(expressions): expressions can reference prior stage outputs #1828

Merged
merged 2 commits into from
Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package com.netflix.spinnaker.orca.pipeline.model;

import java.util.*;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import com.google.common.collect.ForwardingMap;
import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE;
import static java.util.stream.Collectors.toList;

public class StageContext extends ForwardingMap<String, Object> {

Expand All @@ -31,6 +34,10 @@ public StageContext(Stage stage) {
this(stage, new HashMap<>());
}

public StageContext(StageContext stageContext) {
this(stageContext.stage, new HashMap<>(stageContext.delegate));
}

public StageContext(Stage stage, Map<String, Object> delegate) {
this.stage = stage;
this.delegate = delegate;
Expand Down Expand Up @@ -59,12 +66,7 @@ private Map<String, Object> getTrigger() {
.filter(it -> it.getOutputs().containsKey(key))
.findFirst()
.map(it -> it.getOutputs().get(key))
.orElseGet(() ->
Optional
.ofNullable(stage.getExecution())
.map(execution -> execution.getContext().get(key))
.orElse(null)
);
.orElse(null);
}
}

Expand All @@ -78,7 +80,7 @@ public List<Object> getAll(Object key) {
.stream()
.filter(it -> it.getOutputs().containsKey(key))
.map(it -> it.getOutputs().get(key))
.collect(Collectors.toList());
.collect(toList());

if (delegate.containsKey(key)) {
result.add(0, delegate.get(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,26 @@ class StageContextSpec extends Specification {
pipeline.stageByRef("3>1").context.covfefe == null
}

def "if all else fails will read from global context"() {
def "will not read from global context"() {
expect:
pipeline.stageByRef("3>1").context.fnord == "global-fnord"
pipeline.stageByRef("3>1").context.fnord == null
}

def "getAll returns all matching keys in distance order"() {
expect:
pipeline.stageByRef("3>1").context.getAll("foo") == [
"current-foo",
"parent-foo",
"ancestor-foo",
"root-foo"
]
}

def "getAll returns only values for matching keys"() {
expect:
pipeline.stageByRef("3>1").context.getAll("baz") == [
"ancestor-baz",
"root-baz"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.spinnaker.orca.pipeline.expressions.PipelineExpressionEvaluat
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.model.StageContext
import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -84,17 +85,22 @@ interface ExpressionAware {
mapOf("details" to mapOf("errors" to mergedErrors))
}

private fun processEntries(stage: Stage) =
contextParameterProcessor.process(
private fun processEntries(stage: Stage): StageContext =
StageContext(stage, contextParameterProcessor.process(
stage.context,
stage.context.augmentContext(stage.execution),
(stage.context as StageContext).augmentContext(stage.execution),
true
)
)

private fun Map<String, Any?>.augmentContext(execution: Execution) =
private fun StageContext.augmentContext(execution: Execution): StageContext =
if (execution.type == PIPELINE) {
this + execution.context + mapOf("trigger" to execution.trigger, "execution" to execution)
this + mapOf("trigger" to execution.trigger, "execution" to execution)
} else {
this
}

private operator fun StageContext.plus(map: Map<String, Any?>): StageContext
= StageContext(this).apply { putAll(map) }

}
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ class RunTaskHandler(

private fun Execution.pausedDurationRelativeTo(instant: Instant?): Duration {
val pausedDetails = paused
if (pausedDetails != null) {
return if (pausedDetails.pauseTime.toInstant()?.isAfter(instant) == true) {
return if (pausedDetails != null) {
if (pausedDetails.pauseTime.toInstant()?.isAfter(instant) == true) {
Duration.ofMillis(pausedDetails.pausedMs)
} else ZERO
} else return ZERO
} else ZERO
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ class StartStageHandler(
}

val clonedContext = objectMapper.convertValue(this.context, Map::class.java) as Map<String, Any>
val clonedStage = Stage(this.execution, this.type, clonedContext)
val clonedStage = Stage(this.execution, this.type, clonedContext).also {
it.refId = refId
it.requisiteStageRefIds = requisiteStageRefIds
it.syntheticStageOwner = syntheticStageOwner
it.parentStageId = parentStageId
}
if (clonedStage.context.containsKey(PipelineExpressionEvaluator.SUMMARY)) {
this.context.put(PipelineExpressionEvaluator.SUMMARY, clonedStage.context[PipelineExpressionEvaluator.SUMMARY])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,21 @@ open class QueueIntegrationTest {
lateinit var timeZoneId: String
private val timeZone by lazy { ZoneId.of(timeZoneId) }

@Before fun discoveryUp() {
@Before
fun discoveryUp() {
context.publishEvent(RemoteStatusChangedEvent(StatusChangeEvent(STARTING, UP)))
}

@After fun discoveryDown() {
@After
fun discoveryDown() {
context.publishEvent(RemoteStatusChangedEvent(StatusChangeEvent(UP, OUT_OF_SERVICE)))
}

@After fun resetMocks() = reset(dummyTask)
@After
fun resetMocks() = reset(dummyTask)

@Test fun `can run a simple pipeline`() {
@Test
fun `can run a simple pipeline`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand All @@ -105,7 +109,8 @@ open class QueueIntegrationTest {
repository.retrieve(PIPELINE, pipeline.id).status shouldEqual SUCCEEDED
}

@Test fun `will run tasks to completion`() {
@Test
fun `will run tasks to completion`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand All @@ -123,7 +128,8 @@ open class QueueIntegrationTest {
repository.retrieve(PIPELINE, pipeline.id).status shouldEqual SUCCEEDED
}

@Test fun `can run a fork join pipeline`() {
@Test
fun `can run a fork join pipeline`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -162,7 +168,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `can run a pipeline that ends in a branch`() {
@Test
fun `can run a pipeline that ends in a branch`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -201,7 +208,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `can skip stages`() {
@Test
fun `can skip stages`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand All @@ -222,7 +230,8 @@ open class QueueIntegrationTest {
verify(dummyTask, never()).execute(any())
}

@Test fun `pipeline fails if a task fails`() {
@Test
fun `pipeline fails if a task fails`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand All @@ -239,7 +248,8 @@ open class QueueIntegrationTest {
repository.retrieve(PIPELINE, pipeline.id).status shouldEqual TERMINAL
}

@Test fun `parallel stages that fail cancel other branches`() {
@Test
fun `parallel stages that fail cancel other branches`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -285,7 +295,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `stages set to allow failure will proceed in spite of errors`() {
@Test
fun `stages set to allow failure will proceed in spite of errors`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -332,7 +343,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `stages set to allow failure but fail the pipeline will run to completion but then mark the pipeline failed`() {
@Test
fun `stages set to allow failure but fail the pipeline will run to completion but then mark the pipeline failed`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -387,7 +399,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `can run a stage with an execution window`() {
@Test
fun `can run a stage with an execution window`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -461,7 +474,8 @@ open class QueueIntegrationTest {
}

// TODO: this test is verifying a bunch of things at once, it would make sense to break it up
@Test fun `can resolve expressions in stage contexts`() {
@Test
fun `can resolve expressions in stage contexts`() {
val pipeline = pipeline {
application = "spinnaker"
context["global"] = "foo"
Expand Down Expand Up @@ -498,7 +512,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `a restarted branch will not stall due to original cancellation`() {
@Test
fun `a restarted branch will not stall due to original cancellation`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -539,7 +554,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `conditional stages can depend on global context values`() {
@Test
fun `conditional stages can depend on outputs of previous stages`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand All @@ -550,30 +566,32 @@ open class QueueIntegrationTest {
refId = "2a"
requisiteStageRefIds = setOf("1")
type = "dummy"
context = mapOf(
"stageEnabled" to mapOf(
"type" to "expression",
"expression" to "\${foo == true}"
)
context["stageEnabled"] = mapOf(
"type" to "expression",
"expression" to "\${foo == true}"
)
}
stage {
refId = "2b"
requisiteStageRefIds = setOf("1")
type = "dummy"
context = mapOf(
"stageEnabled" to mapOf(
"type" to "expression",
"expression" to "\${foo == false}"
)
context["stageEnabled"] = mapOf(
"type" to "expression",
"expression" to "\${foo == false}"
)
}
}
repository.store(pipeline)
repository.storeExecutionContext(pipeline.id, mapOf("foo" to false))

whenever(dummyTask.timeout) doReturn 2000L
whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED
whenever(dummyTask.execute(any())) doAnswer {
val stage = it.arguments.first() as Stage
if (stage.refId == "1") {
TaskResult(SUCCEEDED, emptyMap<String, Any?>(), mapOf("foo" to false))
} else {
TaskResult.SUCCEEDED
}
}

context.runToCompletion(pipeline, runner::start, repository)

Expand All @@ -585,7 +603,8 @@ open class QueueIntegrationTest {
}
}

@Test fun `conditional stages can depend on global context values after restart`() {
@Test
fun `conditional stages can depend on global context values after restart`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
Expand Down Expand Up @@ -620,10 +639,16 @@ open class QueueIntegrationTest {
status = TERMINAL
}
repository.store(pipeline)
repository.storeExecutionContext(pipeline.id, mapOf("foo" to false))

whenever(dummyTask.timeout) doReturn 2000L
whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED
whenever(dummyTask.execute(any())) doAnswer {
val stage = it.arguments.first() as Stage
if (stage.refId == "1") {
TaskResult(SUCCEEDED, emptyMap<String, Any?>(), mapOf("foo" to false))
} else {
TaskResult.SUCCEEDED
}
}

context.restartAndRunToCompletion(pipeline.stageByRef("1"), runner::restart, repository)

Expand Down Expand Up @@ -653,15 +678,15 @@ open class TestConfig {
}

@Bean open fun dummyStage() = object : StageDefinitionBuilder {
override fun taskGraph(stage: Stage, builder: Builder) {
override fun taskGraph(stage: Stage, builder: Builder) {
builder.withTask("dummy", DummyTask::class.java)
}

override fun getType() = "dummy"
}

@Bean open fun parallelStage() = object : StageDefinitionBuilder {
override fun parallelStages(stage: Stage) =
override fun parallelStages(stage: Stage) =
listOf("us-east-1", "us-west-2", "eu-west-1").map { region ->
newStage(stage.execution, "dummy", "dummy $region", stage.context + mapOf("region" to region), stage, STAGE_BEFORE)
}
Expand Down
Loading