diff --git a/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContext.java b/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContext.java index f53790433c..d6831ce89a 100644 --- a/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContext.java +++ b/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContext.java @@ -16,11 +16,14 @@ package com.netflix.spinnaker.orca.pipeline.model; -import java.util.*; -import java.util.stream.Collectors; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import com.google.common.collect.ForwardingMap; import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE; +import static java.util.stream.Collectors.toList; public class StageContext extends ForwardingMap { @@ -31,6 +34,10 @@ public StageContext(Stage stage) { this(stage, new HashMap<>()); } + public StageContext(StageContext stageContext) { + this(stageContext.stage, new HashMap<>(stageContext.delegate)); + } + public StageContext(Stage stage, Map delegate) { this.stage = stage; this.delegate = delegate; @@ -59,12 +66,7 @@ private Map getTrigger() { .filter(it -> it.getOutputs().containsKey(key)) .findFirst() .map(it -> it.getOutputs().get(key)) - .orElseGet(() -> - Optional - .ofNullable(stage.getExecution()) - .map(execution -> execution.getContext().get(key)) - .orElse(null) - ); + .orElse(null); } } @@ -78,7 +80,7 @@ public List getAll(Object key) { .stream() .filter(it -> it.getOutputs().containsKey(key)) .map(it -> it.getOutputs().get(key)) - .collect(Collectors.toList()); + .collect(toList()); if (delegate.containsKey(key)) { result.add(0, delegate.get(key)); diff --git a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContextSpec.groovy b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContextSpec.groovy index 916da2fff2..dfaa91959c 100644 --- a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContextSpec.groovy +++ b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/model/StageContextSpec.groovy @@ -93,8 +93,26 @@ class StageContextSpec extends Specification { pipeline.stageByRef("3>1").context.covfefe == null } - def "if all else fails will read from global context"() { + def "will not read from global context"() { expect: - pipeline.stageByRef("3>1").context.fnord == "global-fnord" + pipeline.stageByRef("3>1").context.fnord == null + } + + def "getAll returns all matching keys in distance order"() { + expect: + pipeline.stageByRef("3>1").context.getAll("foo") == [ + "current-foo", + "parent-foo", + "ancestor-foo", + "root-foo" + ] + } + + def "getAll returns only values for matching keys"() { + expect: + pipeline.stageByRef("3>1").context.getAll("baz") == [ + "ancestor-baz", + "root-baz" + ] } } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/ExpressionAware.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/ExpressionAware.kt index 5ec11a5b05..5075f67723 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/ExpressionAware.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/ExpressionAware.kt @@ -21,6 +21,7 @@ import com.netflix.spinnaker.orca.pipeline.expressions.PipelineExpressionEvaluat import com.netflix.spinnaker.orca.pipeline.model.Execution import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE import com.netflix.spinnaker.orca.pipeline.model.Stage +import com.netflix.spinnaker.orca.pipeline.model.StageContext import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -84,17 +85,22 @@ interface ExpressionAware { mapOf("details" to mapOf("errors" to mergedErrors)) } - private fun processEntries(stage: Stage) = - contextParameterProcessor.process( + private fun processEntries(stage: Stage): StageContext = + StageContext(stage, contextParameterProcessor.process( stage.context, - stage.context.augmentContext(stage.execution), + (stage.context as StageContext).augmentContext(stage.execution), true ) + ) - private fun Map.augmentContext(execution: Execution) = + private fun StageContext.augmentContext(execution: Execution): StageContext = if (execution.type == PIPELINE) { - this + execution.context + mapOf("trigger" to execution.trigger, "execution" to execution) + this + mapOf("trigger" to execution.trigger, "execution" to execution) } else { this } + + private operator fun StageContext.plus(map: Map): StageContext + = StageContext(this).apply { putAll(map) } + } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt index 551a6fe33d..445a7a8049 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt @@ -210,11 +210,11 @@ class RunTaskHandler( private fun Execution.pausedDurationRelativeTo(instant: Instant?): Duration { val pausedDetails = paused - if (pausedDetails != null) { - return if (pausedDetails.pauseTime.toInstant()?.isAfter(instant) == true) { + return if (pausedDetails != null) { + if (pausedDetails.pauseTime.toInstant()?.isAfter(instant) == true) { Duration.ofMillis(pausedDetails.pausedMs) } else ZERO - } else return ZERO + } else ZERO } /** diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt index 132ced09dc..31586df59c 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt @@ -140,7 +140,12 @@ class StartStageHandler( } val clonedContext = objectMapper.convertValue(this.context, Map::class.java) as Map - val clonedStage = Stage(this.execution, this.type, clonedContext) + val clonedStage = Stage(this.execution, this.type, clonedContext).also { + it.refId = refId + it.requisiteStageRefIds = requisiteStageRefIds + it.syntheticStageOwner = syntheticStageOwner + it.parentStageId = parentStageId + } if (clonedStage.context.containsKey(PipelineExpressionEvaluator.SUMMARY)) { this.context.put(PipelineExpressionEvaluator.SUMMARY, clonedStage.context[PipelineExpressionEvaluator.SUMMARY]) } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueIntegrationTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueIntegrationTest.kt index 56a82b35ec..50da3d85fb 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueIntegrationTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueIntegrationTest.kt @@ -77,17 +77,21 @@ open class QueueIntegrationTest { lateinit var timeZoneId: String private val timeZone by lazy { ZoneId.of(timeZoneId) } - @Before fun discoveryUp() { + @Before + fun discoveryUp() { context.publishEvent(RemoteStatusChangedEvent(StatusChangeEvent(STARTING, UP))) } - @After fun discoveryDown() { + @After + fun discoveryDown() { context.publishEvent(RemoteStatusChangedEvent(StatusChangeEvent(UP, OUT_OF_SERVICE))) } - @After fun resetMocks() = reset(dummyTask) + @After + fun resetMocks() = reset(dummyTask) - @Test fun `can run a simple pipeline`() { + @Test + fun `can run a simple pipeline`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -105,7 +109,8 @@ open class QueueIntegrationTest { repository.retrieve(PIPELINE, pipeline.id).status shouldEqual SUCCEEDED } - @Test fun `will run tasks to completion`() { + @Test + fun `will run tasks to completion`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -123,7 +128,8 @@ open class QueueIntegrationTest { repository.retrieve(PIPELINE, pipeline.id).status shouldEqual SUCCEEDED } - @Test fun `can run a fork join pipeline`() { + @Test + fun `can run a fork join pipeline`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -162,7 +168,8 @@ open class QueueIntegrationTest { } } - @Test fun `can run a pipeline that ends in a branch`() { + @Test + fun `can run a pipeline that ends in a branch`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -201,7 +208,8 @@ open class QueueIntegrationTest { } } - @Test fun `can skip stages`() { + @Test + fun `can skip stages`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -222,7 +230,8 @@ open class QueueIntegrationTest { verify(dummyTask, never()).execute(any()) } - @Test fun `pipeline fails if a task fails`() { + @Test + fun `pipeline fails if a task fails`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -239,7 +248,8 @@ open class QueueIntegrationTest { repository.retrieve(PIPELINE, pipeline.id).status shouldEqual TERMINAL } - @Test fun `parallel stages that fail cancel other branches`() { + @Test + fun `parallel stages that fail cancel other branches`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -285,7 +295,8 @@ open class QueueIntegrationTest { } } - @Test fun `stages set to allow failure will proceed in spite of errors`() { + @Test + fun `stages set to allow failure will proceed in spite of errors`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -332,7 +343,8 @@ open class QueueIntegrationTest { } } - @Test fun `stages set to allow failure but fail the pipeline will run to completion but then mark the pipeline failed`() { + @Test + fun `stages set to allow failure but fail the pipeline will run to completion but then mark the pipeline failed`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -387,7 +399,8 @@ open class QueueIntegrationTest { } } - @Test fun `can run a stage with an execution window`() { + @Test + fun `can run a stage with an execution window`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -461,7 +474,8 @@ open class QueueIntegrationTest { } // TODO: this test is verifying a bunch of things at once, it would make sense to break it up - @Test fun `can resolve expressions in stage contexts`() { + @Test + fun `can resolve expressions in stage contexts`() { val pipeline = pipeline { application = "spinnaker" context["global"] = "foo" @@ -498,7 +512,8 @@ open class QueueIntegrationTest { } } - @Test fun `a restarted branch will not stall due to original cancellation`() { + @Test + fun `a restarted branch will not stall due to original cancellation`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -539,7 +554,8 @@ open class QueueIntegrationTest { } } - @Test fun `conditional stages can depend on global context values`() { + @Test + fun `conditional stages can depend on outputs of previous stages`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -550,30 +566,32 @@ open class QueueIntegrationTest { refId = "2a" requisiteStageRefIds = setOf("1") type = "dummy" - context = mapOf( - "stageEnabled" to mapOf( - "type" to "expression", - "expression" to "\${foo == true}" - ) + context["stageEnabled"] = mapOf( + "type" to "expression", + "expression" to "\${foo == true}" ) } stage { refId = "2b" requisiteStageRefIds = setOf("1") type = "dummy" - context = mapOf( - "stageEnabled" to mapOf( - "type" to "expression", - "expression" to "\${foo == false}" - ) + context["stageEnabled"] = mapOf( + "type" to "expression", + "expression" to "\${foo == false}" ) } } repository.store(pipeline) - repository.storeExecutionContext(pipeline.id, mapOf("foo" to false)) whenever(dummyTask.timeout) doReturn 2000L - whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED + whenever(dummyTask.execute(any())) doAnswer { + val stage = it.arguments.first() as Stage + if (stage.refId == "1") { + TaskResult(SUCCEEDED, emptyMap(), mapOf("foo" to false)) + } else { + TaskResult.SUCCEEDED + } + } context.runToCompletion(pipeline, runner::start, repository) @@ -585,7 +603,8 @@ open class QueueIntegrationTest { } } - @Test fun `conditional stages can depend on global context values after restart`() { + @Test + fun `conditional stages can depend on global context values after restart`() { val pipeline = pipeline { application = "spinnaker" stage { @@ -620,10 +639,16 @@ open class QueueIntegrationTest { status = TERMINAL } repository.store(pipeline) - repository.storeExecutionContext(pipeline.id, mapOf("foo" to false)) whenever(dummyTask.timeout) doReturn 2000L - whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED + whenever(dummyTask.execute(any())) doAnswer { + val stage = it.arguments.first() as Stage + if (stage.refId == "1") { + TaskResult(SUCCEEDED, emptyMap(), mapOf("foo" to false)) + } else { + TaskResult.SUCCEEDED + } + } context.restartAndRunToCompletion(pipeline.stageByRef("1"), runner::restart, repository) @@ -653,7 +678,7 @@ open class TestConfig { } @Bean open fun dummyStage() = object : StageDefinitionBuilder { - override fun taskGraph(stage: Stage, builder: Builder) { + override fun taskGraph(stage: Stage, builder: Builder) { builder.withTask("dummy", DummyTask::class.java) } @@ -661,7 +686,7 @@ open class TestConfig { } @Bean open fun parallelStage() = object : StageDefinitionBuilder { - override fun parallelStages(stage: Stage) = + override fun parallelStages(stage: Stage) = listOf("us-east-1", "us-west-2", "eu-west-1").map { region -> newStage(stage.execution, "dummy", "dummy $region", stage.context + mapOf("region" to region), stage, STAGE_BEFORE) } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt index d44dfed7e9..bd84f35c41 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt @@ -1127,11 +1127,15 @@ object RunTaskHandlerTest : SubjectSpek({ } } - given("an expression in the context that refers to a global context value") { + given("an expression in the context that refers to a prior stage output") { val pipeline = pipeline { - context["foo"] = "bar" stage { refId = "1" + outputs["foo"] = "bar" + } + stage { + refId = "2" + requisiteStageRefIds = setOf("1") context["expression"] = "\${foo}" type = "whatever" task { @@ -1140,7 +1144,7 @@ object RunTaskHandlerTest : SubjectSpek({ } } } - val message = RunTask(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "1", DummyTask::class.java) + val message = RunTask(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("2").id, "1", DummyTask::class.java) beforeGroup { whenever(task.execute(any())) doReturn TaskResult.SUCCEEDED