diff --git a/definition/pipelines.go b/definition/pipelines.go index 3b08204..d8bfa08 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool { return true } +// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling. +type OnErrorTaskDef struct { + // Script is a list of shell commands that are executed if an error occurs in a "normal" task + Script []string `yaml:"script"` + + // Env sets/overrides environment variables for this task (takes precedence over pipeline environment) + Env map[string]string `yaml:"env"` +} + type PipelineDef struct { // Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1) Concurrency int `yaml:"concurrency"` @@ -60,8 +69,19 @@ type PipelineDef struct { // Env sets/overrides environment variables for all tasks (takes precedence over process environment) Env map[string]string `yaml:"env"` + // Tasks is a map of task names to task definitions Tasks map[string]TaskDef `yaml:"tasks"` + // Task to be executed if this pipeline fails, e.g. for notifications. + // + // In this task, you have the following variables set: + // - failedTaskName: Name of the failed task (key from pipelines.yml) + // - failedTaskExitCode: Exit code of the failed task + // - failedTaskError: Error message of the failed task + // - failedTaskStdout: Stdout of the failed task + // - failedTaskStderr: Stderr of the failed task + OnError OnErrorTaskDef `yaml:"onError"` + // SourcePath stores the source path where the pipeline was defined SourcePath string } diff --git a/prunner.go b/prunner.go index 1ee864d..41a8404 100644 --- a/prunner.go +++ b/prunner.go @@ -3,6 +3,7 @@ package prunner import ( "context" "fmt" + "io" "sort" "sync" "time" @@ -418,6 +419,39 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { if jt == nil { return } + updateJobTaskStateFromTask(jt, t) + + // If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false), + // then we directly abort all other tasks of the job. + // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only + // if one task failed, and we want to kill the other tasks. + if jt.Errored { + pipelineDef, found := r.defs.Pipelines[j.Pipeline] + if found && !pipelineDef.ContinueRunningTasksAfterFailure { + log. + WithField("component", "runner"). + WithField("jobID", jobIDString). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + Debug("Task failed - cancelling all other tasks of the job") + // Use internal cancel since we already have a lock on the mutex + _ = r.cancelJobInternal(jobID) + } + + if found && len(pipelineDef.OnError.Script) > 0 { + // we errored; and there is an onError script defined for the + // current pipeline. So let's run it. + r.runOnErrorScript(t, j, pipelineDef.OnError) + } + } + + r.requestPersist() +} + +// updateJobTaskStateFromTask updates jobTask properties from a given taskCtl task.Task. +// Very internal helper function, to be used in PipelineRunner.HandleTaskChange +// and PipelineRunner.runOnErrorScript. +func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) { if !t.Start.IsZero() { start := t.Start jt.Start = &start @@ -437,25 +471,143 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { jt.Error = t.Error } - // if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE), - // then we directly abort all other tasks of the job. - // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only - // if one task failed, and we want to kill the other tasks. - if jt.Errored { - pipelineDef, found := r.defs.Pipelines[j.Pipeline] - if found && !pipelineDef.ContinueRunningTasksAfterFailure { +} + +const OnErrorTaskName = "on_error" + +// runOnErrorScript is responsible for running a special "on_error" script in response to an error in the pipeline. +// It exposes variables containing information about the errored task. +// +// The method is triggered with the errored Task t, belonging to pipelineJob j; and pipelineDev +func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorTaskDef definition.OnErrorTaskDef) { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + Debug("Triggering onError Script because of task failure") + + var failedTaskStdout []byte + rc, err := r.outputStore.Reader(j.ID.String(), t.Name, "stdout") + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Warn("Could not create stdout reader for failed task") + } else { + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + failedTaskStdout, err = io.ReadAll(rc) + if err != nil { log. WithField("component", "runner"). - WithField("jobID", jobIDString). + WithField("jobID", j.ID.String()). WithField("pipeline", j.Pipeline). WithField("failedTaskName", t.Name). - Debug("Task failed - cancelling all other tasks of the job") - // Use internal cancel since we already have a lock on the mutex - _ = r.cancelJobInternal(jobID) + WithError(err). + Warn("Could not read stdout of failed task") } } - r.requestPersist() + var failedTaskStderr []byte + rc, err = r.outputStore.Reader(j.ID.String(), t.Name, "stderr") + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Debug("Could not create stderrReader for failed task") + } else { + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + failedTaskStderr, err = io.ReadAll(rc) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Debug("Could not read stderr of failed task") + } + } + + onErrorVariables := make(map[string]interface{}) + for key, value := range j.Variables { + onErrorVariables[key] = value + } + onErrorVariables["failedTaskName"] = t.Name + onErrorVariables["failedTaskExitCode"] = t.ExitCode + onErrorVariables["failedTaskError"] = t.Error + onErrorVariables["failedTaskStdout"] = string(failedTaskStdout) + onErrorVariables["failedTaskStderr"] = string(failedTaskStderr) + + onErrorJobTask := jobTask{ + TaskDef: definition.TaskDef{ + Script: onErrorTaskDef.Script, + // AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log) + AllowFailure: false, + Env: onErrorTaskDef.Env, + }, + Name: OnErrorTaskName, + Status: toStatus(scheduler.StatusWaiting), + } + + // store on task list; so that it appears in pipeline and UI etc + j.Tasks = append(j.Tasks, onErrorJobTask) + + onErrorGraph, err := buildPipelineGraph(j.ID, jobTasks{onErrorJobTask}, onErrorVariables) + if err != nil { + log. + WithError(err). + WithField("jobID", j.ID). + WithField("pipeline", j.Pipeline). + Error("Failed to build onError pipeline graph") + onErrorJobTask.Error = err + onErrorJobTask.Errored = true + + // the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to + // store it again after modifying it. + j.Tasks[len(j.Tasks)-1] = onErrorJobTask + return + } + + // we use a detached taskRunner and scheduler to run the onError task, to + // run synchronously (as we are already in an async goroutine here), won't have any cycles, + // and to simplify the code. + taskRunner := r.createTaskRunner(j) + sched := taskctl.NewScheduler(taskRunner) + + // Now, actually run the job synchronously + lastErr := sched.Schedule(onErrorGraph) + + // Update job status as with normal jobs + onErrorJobTask.Status = toStatus(onErrorGraph.Nodes()[OnErrorTaskName].ReadStatus()) + updateJobTaskStateFromTask(&onErrorJobTask, onErrorGraph.Nodes()[OnErrorTaskName].Task) + + if lastErr != nil { + log. + WithError(err). + WithField("jobID", j.ID). + WithField("pipeline", j.Pipeline). + Error("Error running the onError handler") + } else { + log. + WithField("jobID", j.ID). + WithField("pipeline", j.Pipeline). + Debug("Successfully ran the onError handler") + } + + // the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to + // store it again after modifying it. + j.Tasks[len(j.Tasks)-1] = onErrorJobTask } // HandleStageChange will be called when the stage state changes in the scheduler @@ -810,6 +962,7 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { // Wait for all running jobs to have called JobCompleted r.wg.Wait() + // TODO This is not safe to do outside of the requestPersist loop, since we might have a save in progress. So we need to wait until the save loop is finished before calling SaveToStore. // Do a final save to include the state of recently completed jobs r.SaveToStore() }() diff --git a/prunner_test.go b/prunner_test.go index cb1d1e1..1a33ed7 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -161,6 +161,7 @@ func TestPipelineRunner_ScheduleAsync_WithEmptyScriptTask(t *testing.T) { require.NoError(t, err) waitForCompletedJob(t, pRunner, job.ID) + assert.NoError(t, job.LastError) } func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { @@ -213,6 +214,7 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { require.NoError(t, err) waitForCompletedJob(t, pRunner, job.ID) + assert.NoError(t, job.LastError) pipelineVarTaskOutput := store.GetBytes(job.ID.String(), "pipeline_var", "stdout") assert.Equal(t, "from pipeline,from pipeline,from process", string(pipelineVarTaskOutput), "output of pipeline_var") @@ -221,6 +223,137 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { assert.Equal(t, "from task,from pipeline,from process", string(taskVarTaskOutput), "output of task_var") } +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "echo 'Failed Task Name: {{ .failedTaskName }}'", + "echo 'Failed Task Exit Code: {{ .failedTaskExitCode }}'", + "echo 'Failed Task Error: {{ .failedTaskError }}'", + "echo 'Failed Task Stdout: {{ .failedTaskStdout }}'", + "echo 'Failed Task Stderr: {{ .failedTaskStderr }}'", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + res := mockOutputStore.GetBytes(job.ID.String(), "on_error", "stdout") + assert.Error(t, job.LastError) + assert.Equal(t, `ON_ERROR +Failed Task Name: b +Failed Task Exit Code: 42 +Failed Task Error: exit status 42 +Failed Task Stdout: stdoutContent + +Failed Task Stderr: This message goes to stderr + +`, string(res)) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.False(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "done", jt.Status, "task has status done") + assert.Nil(t, jt.Error, "task has no error set") + } +} + +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndSetsStateCorrectlyIfErrorHookFails(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "exit 1", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + assert.Error(t, job.LastError) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.True(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "error", jt.Status, "task has status done") + assert.NotNil(t, jt.Error, "task has no error set") + } +} func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ @@ -325,6 +458,7 @@ func TestPipelineRunner_CancelJob_WithQueuedJob(t *testing.T) { waitForCompletedJob(t, pRunner, job1.ID) waitForCanceledJob(t, pRunner, job2.ID) waitForCompletedJob(t, pRunner, job3.ID) + assert.NoError(t, job1.LastError) assert.Nil(t, job2.Start, "job 2 should not be started") assert.Equal(t, true, job2.Tasks.ByName("sleep").Canceled, "job 2 task was marked as canceled") @@ -426,6 +560,7 @@ func TestPipelineRunner_FirstErroredTaskShouldCancelAllRunningTasks_ByDefault(t jobID := job.ID waitForCompletedJob(t, pRunner, jobID) + assert.Error(t, job.LastError) assert.True(t, job.Tasks.ByName("err").Errored, "err task was errored") assert.True(t, job.Tasks.ByName("ok").Canceled, "ok task should be cancelled") diff --git a/taskctl/runner.go b/taskctl/runner.go index 4bf0480..dec2fd4 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -166,8 +166,8 @@ func (r *TaskRunner) Run(t *task.Task) error { // but this lead to a huge memory leak because the full job output was retained // in memory forever. // This enabled features of taskctl like {{ .Tasks.TASKNAME.Output }} and {{.Output}}, - // but we never promised these features. Thus it is fine to not log to stdout and stderr - // into a Buffer, but directly to a file. + // but we never promised these features. Thus, it is fine to not log stdout and stderr + // into a Buffer, but directly to the output store. stdoutWriter []io.Writer stderrWriter []io.Writer )