Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(plugin): Fixed batch plugin stuck when underlying batch is collected #543

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions engine/templates_tests/batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ steps:
template_name: batchedtasktemplate
json_inputs: '[{"specific_string": "specific-1"}, {"specific_string": "specific-2"}]'
common_json_inputs: '{"common_string": "common"}'
sub_batch_size: 2
sub_batch_size: "2"
batchYamlInputs:
description: Batching tasks YAML
action:
Expand All @@ -23,4 +23,4 @@ steps:
- specific_string: specific-2
common_inputs:
common_string: common
sub_batch_size: 2
sub_batch_size: "2"
11 changes: 6 additions & 5 deletions pkg/plugins/builtin/batch/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# `batch` Plugin

This plugin creates a batch of tasks based on the same template and waits for it to complete. It acts like the `subtask` combined with a `foreach`, but doesn't modify the resolution by adding new steps dynamically. As it makes less calls to the underlying database, this plugin is suited for large batches of tasks, where the `subtask` / `foreach` combination would usually struggle, escpecially by bloating the database.
Tasks belonging to the same batch share a common `BatchID` as well as tag holding their parent's ID.
This plugin creates a batch of tasks based on the same template and waits for it to complete. It acts like the `subtask` combined with a `foreach`, but doesn't modify the resolution by adding new steps dynamically. As it makes less calls to the underlying database, this plugin is suited for large batches of tasks, where the `subtask` / `foreach` combination would usually struggle, especially by bloating the database.
Tasks belonging to the same batch share a common `BatchID` as well as a tag holding their parent's ID.

##### Remarks:
Like the subtask plugin, it's unadvised to have a step based on the batch plugin running alongside other steps in a template. If these other steps take time to return a result, the batch plugin may miss the wake up call from its children tasks.
The output of child tasks is not made available in this plugin's output. This feature will come later.

## Configuration
Expand All @@ -16,7 +17,7 @@ The output of child tasks is not made available in this plugin's output. This fe
| `common_inputs` | a map of named values, as accepted on µTask's API, given to all task in the batch by combining it with each input |
| `common_json_inputs` | same as `common_inputs` but as a JSON string. If specified, it overrides `common_inputs` |
| `tags` | a map of named strings added as tags when creating child tasks |
| `sub_batch_size` | the number tasks to create and run at once. `0` for infinity (i.e.: all tasks are created at once and waited for) (default). Higher values reduce the amount of calls made to the database, but increase sensitivity to database unavailability (if a task creation fails, the whole sub batch must be created again) |
| `sub_batch_size` | the number tasks to create and run at once, as a string. `0` for infinity (i.e.: all tasks are created at once and waited for) (default). Higher values reduce the amount of calls made to the database, but increase sensitivity to database unavailability (if a task creation fails, the whole sub batch must be created again) |
| `comment` | a string set as `comment` when creating child tasks |
| `resolver_usernames` | a string containing a JSON array of additional resolver users for child tasks |
| `resolver_groups` | a string containing a JSON array of additional resolver groups for child tasks |
Expand All @@ -33,7 +34,7 @@ action:
configuration:
# [Required]
# A template that must already be registered on this instance of µTask
template: some-task-template
template_name: some-task-template
# Valid inputs, as defined by the referred template, here requiring 3 inputs: foo, otherFoo and fooCommon
inputs:
- foo: bar-1
Expand All @@ -50,7 +51,7 @@ action:
fooTag: value-of-foo-tag
barTag: value-of-bar-tag
# The amount of tasks to run at once
sub_batch_size: 2
sub_batch_size: "2"
# A list of users which are authorized to resolve this specific task
resolver_usernames: '["authorizedUser"]'
resolver_groups: '["authorizedGroup"]'
Expand Down
46 changes: 39 additions & 7 deletions pkg/plugins/builtin/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

jujuErrors "github.com/juju/errors"
"github.com/loopfz/gadgeto/zesty"
"github.com/sirupsen/logrus"

"github.com/ovh/utask"
"github.com/ovh/utask/db/pgjuju"
"github.com/ovh/utask/models/resolution"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
Expand Down Expand Up @@ -46,8 +48,9 @@ type BatchConfig struct {
Tags map[string]string `json:"tags"`
ResolverUsernames string `json:"resolver_usernames"`
ResolverGroups string `json:"resolver_groups"`
// How many tasks will run concurrently. 0 for infinity (default)
SubBatchSize int `json:"sub_batch_size"`
// How many tasks will run concurrently. 0 for infinity (default). It's supplied as a string to support templating
SubBatchSizeStr string `json:"sub_batch_size"`
SubBatchSize int64 `json:"-"`
}

// quotedString is a string with doubly escaped quotes, so the string stays simply escaped after being processed
Expand Down Expand Up @@ -132,6 +135,11 @@ func exec(stepName string, config any, ictx any) (any, any, error) {
return nil, batchCtx.RawMetadata.Format(), err
}

if len(conf.Inputs) == 0 {
// Empty input, there's nothing to do
return nil, BatchMetadata{}, nil
}

if conf.Tags == nil {
conf.Tags = make(map[string]string)
}
Expand Down Expand Up @@ -235,7 +243,7 @@ func populateBatch(

// Computing how many tasks to start
remaining := int64(len(conf.Inputs)) - tasksStarted
toStart := int64(conf.SubBatchSize) - running // How many tasks can be started
toStart := conf.SubBatchSize - running // How many tasks can be started
if remaining < toStart || conf.SubBatchSize == 0 {
// There's less tasks remaining to start than the amount of available running slots or slots are unlimited
toStart = remaining
Expand Down Expand Up @@ -270,12 +278,25 @@ func runBatch(

b, err := task.LoadBatchFromPublicID(dbp, metadata.BatchID)
if err != nil {
if jujuErrors.IsNotFound(err) {
// The batch has been collected (deleted in DB) because no remaining task referenced it. There's
// nothing more to do.
if !jujuErrors.Is(err, jujuErrors.NotFound) {
return metadata, err
}
// else, the batch has been collected (deleted in DB) because no task referenced it anymore.

if metadata.TasksStarted == int64(len(conf.Inputs)) {
// There is no more tasks to create, the work is done
metadata.RemainingTasks = 0
return metadata, nil
}
return metadata, err
// else, the batch was collected but we still have tasks to create. We need to recreate the batch with
// the same public ID and populate it.
// It can happen when the garbage collector runs after a sub-batch is done, but before the batch plugin
// could populate the batch with more tasks.

b = &task.Batch{BatchDBModel: task.BatchDBModel{PublicID: metadata.BatchID}}
if err := dbp.DB().Insert(&b.BatchDBModel); err != nil {
return metadata, pgjuju.Interpret(err)
}
}

if metadata.TasksStarted < int64(len(conf.Inputs)) {
Expand Down Expand Up @@ -349,6 +370,17 @@ func parseInputs(conf *BatchConfig, batchCtx *BatchContext) error {
return jujuErrors.NewBadRequest(err, "JSON inputs unmarshalling failure")
}
}

if conf.SubBatchSizeStr == "" {
conf.SubBatchSize = 0
} else {
subBatchSize, err := strconv.ParseInt(conf.SubBatchSizeStr, 10, 64)
if err != nil {
return jujuErrors.NewBadRequest(err, "parsing failure of field 'SubBatchSize'")
}
conf.SubBatchSize = subBatchSize
}

return nil
}

Expand Down