From 3feea18c607904434f9c6cd7b38b7c507073bdb6 Mon Sep 17 00:00:00 2001 From: Ruben Tordjman <144785435+DrRebus@users.noreply.github.com> Date: Tue, 3 Dec 2024 09:03:13 +0000 Subject: [PATCH] fix(plugin): Fixed batch plugin stuck when underlying batch is collected Signed-off-by: Ruben Tordjman <144785435+DrRebus@users.noreply.github.com> --- engine/templates_tests/batch.yaml | 4 +-- pkg/plugins/builtin/batch/README.md | 11 +++---- pkg/plugins/builtin/batch/batch.go | 46 ++++++++++++++++++++++++----- 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/engine/templates_tests/batch.yaml b/engine/templates_tests/batch.yaml index 1cae43fb..4aa4216e 100644 --- a/engine/templates_tests/batch.yaml +++ b/engine/templates_tests/batch.yaml @@ -11,7 +11,7 @@ steps: template_name: batchedtasktemplate json_inputs: '[{"specific_string": "specific-1"}, {"specific_string": "specific-2"}]' common_json_inputs: '{"common_string": "common"}' - sub_batch_size: 2 + sub_batch_size: "2" batchYamlInputs: description: Batching tasks YAML action: @@ -23,4 +23,4 @@ steps: - specific_string: specific-2 common_inputs: common_string: common - sub_batch_size: 2 + sub_batch_size: "2" diff --git a/pkg/plugins/builtin/batch/README.md b/pkg/plugins/builtin/batch/README.md index 594dd6ab..9b07a0e5 100644 --- a/pkg/plugins/builtin/batch/README.md +++ b/pkg/plugins/builtin/batch/README.md @@ -1,9 +1,10 @@ # `batch` Plugin -This plugin creates a batch of tasks based on the same template and waits for it to complete. It acts like the `subtask` combined with a `foreach`, but doesn't modify the resolution by adding new steps dynamically. As it makes less calls to the underlying database, this plugin is suited for large batches of tasks, where the `subtask` / `foreach` combination would usually struggle, escpecially by bloating the database. -Tasks belonging to the same batch share a common `BatchID` as well as tag holding their parent's ID. +This plugin creates a batch of tasks based on the same template and waits for it to complete. It acts like the `subtask` combined with a `foreach`, but doesn't modify the resolution by adding new steps dynamically. As it makes less calls to the underlying database, this plugin is suited for large batches of tasks, where the `subtask` / `foreach` combination would usually struggle, especially by bloating the database. +Tasks belonging to the same batch share a common `BatchID` as well as a tag holding their parent's ID. ##### Remarks: +Like the subtask plugin, it's unadvised to have a step based on the batch plugin running alongside other steps in a template. If these other steps take time to return a result, the batch plug may miss the wake up call from its children tasks. The output of child tasks is not made available in this plugin's output. This feature will come later. ## Configuration @@ -16,7 +17,7 @@ The output of child tasks is not made available in this plugin's output. This fe | `common_inputs` | a map of named values, as accepted on µTask's API, given to all task in the batch by combining it with each input | | `common_json_inputs` | same as `common_inputs` but as a JSON string. If specified, it overrides `common_inputs` | | `tags` | a map of named strings added as tags when creating child tasks | -| `sub_batch_size` | the number tasks to create and run at once. `0` for infinity (i.e.: all tasks are created at once and waited for) (default). Higher values reduce the amount of calls made to the database, but increase sensitivity to database unavailability (if a task creation fails, the whole sub batch must be created again) | +| `sub_batch_size` | the number tasks to create and run at once, as a string. `0` for infinity (i.e.: all tasks are created at once and waited for) (default). Higher values reduce the amount of calls made to the database, but increase sensitivity to database unavailability (if a task creation fails, the whole sub batch must be created again) | | `comment` | a string set as `comment` when creating child tasks | | `resolver_usernames` | a string containing a JSON array of additional resolver users for child tasks | | `resolver_groups` | a string containing a JSON array of additional resolver groups for child tasks | @@ -33,7 +34,7 @@ action: configuration: # [Required] # A template that must already be registered on this instance of µTask - template: some-task-template + template_name: some-task-template # Valid inputs, as defined by the referred template, here requiring 3 inputs: foo, otherFoo and fooCommon inputs: - foo: bar-1 @@ -50,7 +51,7 @@ action: fooTag: value-of-foo-tag barTag: value-of-bar-tag # The amount of tasks to run at once - sub_batch_size: 2 + sub_batch_size: "2" # A list of users which are authorized to resolve this specific task resolver_usernames: '["authorizedUser"]' resolver_groups: '["authorizedGroup"]' diff --git a/pkg/plugins/builtin/batch/batch.go b/pkg/plugins/builtin/batch/batch.go index 3951122d..bd35e52b 100644 --- a/pkg/plugins/builtin/batch/batch.go +++ b/pkg/plugins/builtin/batch/batch.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "strings" jujuErrors "github.com/juju/errors" @@ -11,6 +12,7 @@ import ( "github.com/sirupsen/logrus" "github.com/ovh/utask" + "github.com/ovh/utask/db/pgjuju" "github.com/ovh/utask/models/resolution" "github.com/ovh/utask/models/task" "github.com/ovh/utask/models/tasktemplate" @@ -46,8 +48,9 @@ type BatchConfig struct { Tags map[string]string `json:"tags"` ResolverUsernames string `json:"resolver_usernames"` ResolverGroups string `json:"resolver_groups"` - // How many tasks will run concurrently. 0 for infinity (default) - SubBatchSize int `json:"sub_batch_size"` + // How many tasks will run concurrently. 0 for infinity (default). It's supplied as a string to support templating + SubBatchSizeStr string `json:"sub_batch_size"` + SubBatchSize int64 } // quotedString is a string with doubly escaped quotes, so the string stays simply escaped after being processed @@ -132,6 +135,11 @@ func exec(stepName string, config any, ictx any) (any, any, error) { return nil, batchCtx.RawMetadata.Format(), err } + if len(conf.Inputs) == 0 { + // Empty input, there's nothing to do + return nil, BatchMetadata{}, nil + } + if conf.Tags == nil { conf.Tags = make(map[string]string) } @@ -235,7 +243,7 @@ func populateBatch( // Computing how many tasks to start remaining := int64(len(conf.Inputs)) - tasksStarted - toStart := int64(conf.SubBatchSize) - running // How many tasks can be started + toStart := conf.SubBatchSize - running // How many tasks can be started if remaining < toStart || conf.SubBatchSize == 0 { // There's less tasks remaining to start than the amount of available running slots or slots are unlimited toStart = remaining @@ -270,12 +278,25 @@ func runBatch( b, err := task.LoadBatchFromPublicID(dbp, metadata.BatchID) if err != nil { - if jujuErrors.IsNotFound(err) { - // The batch has been collected (deleted in DB) because no remaining task referenced it. There's - // nothing more to do. + if !jujuErrors.Is(err, jujuErrors.NotFound) { + return metadata, err + } + // else, the batch has been collected (deleted in DB) because no task referenced it anymore. + + if metadata.TasksStarted == int64(len(conf.Inputs)) { + // There is no more tasks to create, the work is done + metadata.RemainingTasks = 0 return metadata, nil } - return metadata, err + // else, the batch was collected but we still have tasks to create. We need to recreate the batch with + // the same public ID and populate it. + // It can happen when the garbage collector runs after a sub-batch is done, but before the batch plugin + // could populate the batch with more tasks. + + b = &task.Batch{BatchDBModel: task.BatchDBModel{PublicID: metadata.BatchID}} + if err := dbp.DB().Insert(&b.BatchDBModel); err != nil { + return metadata, pgjuju.Interpret(err) + } } if metadata.TasksStarted < int64(len(conf.Inputs)) { @@ -349,6 +370,17 @@ func parseInputs(conf *BatchConfig, batchCtx *BatchContext) error { return jujuErrors.NewBadRequest(err, "JSON inputs unmarshalling failure") } } + + if conf.SubBatchSizeStr == "" { + conf.SubBatchSize = 0 + } else { + subBatchSize, err := strconv.ParseInt(conf.SubBatchSizeStr, 10, 64) + if err != nil { + return jujuErrors.NewBadRequest(err, "parsing failure of field 'SubBatchSize'") + } + conf.SubBatchSize = subBatchSize + } + return nil }