From 79bd4583dfbab4e127f72c68a7ca7c2b47ec8d9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20B=C3=A9trancourt?= Date: Fri, 29 Apr 2022 14:04:21 +0000 Subject: [PATCH] feature(callback): create callback plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Thomas Bétrancourt --- .gitignore | 5 + Makefile | 5 +- README.md | 23 +- api/bind.go | 40 +++ api/server.go | 54 ++++ cmd/utask/root.go | 6 +- db/encrypt.go | 37 +++ db/init.go | 5 + db/migration.go | 2 +- engine/engine_test.go | 27 ++ engine/templates_tests/callback.yaml | 38 +++ hack/template-schema.json | 43 +++ hack/test.sh | 12 + pkg/plugins/builtin/builtin.go | 15 + pkg/plugins/builtin/callback/README.md | 82 +++++ pkg/plugins/builtin/callback/callback.go | 118 ++++++++ pkg/plugins/builtin/callback/callback_test.go | 71 +++++ pkg/plugins/builtin/callback/handler.go | 109 +++++++ pkg/plugins/builtin/callback/handler_test.go | 142 +++++++++ pkg/plugins/builtin/callback/init.go | 135 +++++++++ pkg/plugins/builtin/callback/init_test.go | 97 ++++++ pkg/plugins/builtin/callback/models.go | 286 ++++++++++++++++++ pkg/plugins/plugins.go | 16 +- sql/migrations/008_callbacks.sql | 25 ++ sql/schema.sql | 18 +- 25 files changed, 1390 insertions(+), 21 deletions(-) create mode 100644 api/bind.go create mode 100644 db/encrypt.go create mode 100644 engine/templates_tests/callback.yaml create mode 100644 pkg/plugins/builtin/callback/README.md create mode 100644 pkg/plugins/builtin/callback/callback.go create mode 100644 pkg/plugins/builtin/callback/callback_test.go create mode 100644 pkg/plugins/builtin/callback/handler.go create mode 100644 pkg/plugins/builtin/callback/handler_test.go create mode 100644 pkg/plugins/builtin/callback/init.go create mode 100644 pkg/plugins/builtin/callback/init_test.go create mode 100644 pkg/plugins/builtin/callback/models.go create mode 100644 sql/migrations/008_callbacks.sql diff --git a/.gitignore b/.gitignore index 875cf558..0b4f3f88 100644 --- a/.gitignore +++ b/.gitignore @@ -49,3 +49,8 @@ dist/ # VScode extension **/*.vsix + +# Related to tests +docker-compose.override.yaml +hack/test.env +report.xml diff --git a/Makefile b/Makefile index d45c8259..3456f8a7 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,10 @@ release-utask-lib: test: # moving to another location to go get some packages, otherwise it will include those packages as dependencies in go.mod cd ${HOME} && go install github.com/jstemmer/go-junit-report/v2@latest - GO111MODULE=on DEV=true bash hack/test.sh ${TEST_CMD} 2>&1 | go-junit-report -set-exit-code > report.xml + GO111MODULE=on DEV=true bash hack/test.sh ${TEST_CMD} 2>&1 | go-junit-report | tee report.xml + +test-dev: + GO111MODULE=on DEV=true bash hack/test.sh ${TEST_CMD} 2>&1 test-travis: # moving to another location to go get some packages, otherwise it will include those packages as dependencies in go.mod diff --git a/README.md b/README.md index 334e7baf..84510f94 100644 --- a/README.md +++ b/README.md @@ -506,17 +506,18 @@ All the strategies available are: Browse [builtin actions](./pkg/plugins/builtin) -| Plugin name | Description | Documentation | -| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------ | -| **`echo`** | Print out a pre-determined result | [Access plugin doc](./pkg/plugins/builtin/echo/README.md) | -| **`http`** | Make an http request | [Access plugin doc](./pkg/plugins/builtin/http/README.md) | -| **`subtask`** | Spawn a new task on µTask | [Access plugin doc](./pkg/plugins/builtin/subtask/README.md) | -| **`notify`** | Dispatch a notification over a registered channel | [Access plugin doc](./pkg/plugins/builtin/notify/README.md) | -| **`apiovh`** | Make a signed call on OVH's public API (requires credentials retrieved from configstore, containing the fields `endpoint`, `appKey`, `appSecret`, `consumerKey`, more info [here](https://docs.ovh.com/gb/en/customer/first-steps-with-ovh-api/)) | [Access plugin doc](./pkg/plugins/builtin/apiovh/README.md) | -| **`ssh`** | Connect to a remote system and run commands on it | [Access plugin doc](./pkg/plugins/builtin/ssh/README.md) | -| **`email`** | Send an email | [Access plugin doc](./pkg/plugins/builtin/email/README.md) | -| **`ping`** | Send a ping to an hostname *Warn: This plugin will keep running until the count is done* | [Access plugin doc](./pkg/plugins/builtin/ping/README.md) | -| **`script`** | Execute a script under `scripts` folder | [Access plugin doc](./pkg/plugins/builtin/script/README.md) | +| Plugin name | Description | Documentation | +| -------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------- | +| **`echo`** | Print out a pre-determined result | [Access plugin doc](./pkg/plugins/builtin/echo/README.md) | +| **`http`** | Make an http request | [Access plugin doc](./pkg/plugins/builtin/http/README.md) | +| **`subtask`** | Spawn a new task on µTask | [Access plugin doc](./pkg/plugins/builtin/subtask/README.md) | +| **`notify`** | Dispatch a notification over a registered channel | [Access plugin doc](./pkg/plugins/builtin/notify/README.md) | +| **`apiovh`** | Make a signed call on OVH's public API (requires credentials retrieved from configstore, containing the fields `endpoint`, `appKey`, `appSecret`, `consumerKey`, more info [here](https://docs.ovh.com/gb/en/customer/first-steps-with-ovh-api/)) | [Access plugin doc](./pkg/plugins/builtin/apiovh/README.md) | +| **`ssh`** | Connect to a remote system and run commands on it | [Access plugin doc](./pkg/plugins/builtin/ssh/README.md) | +| **`email`** | Send an email | [Access plugin doc](./pkg/plugins/builtin/email/README.md) | +| **`ping`** | Send a ping to an hostname *Warn: This plugin will keep running until the count is done* | [Access plugin doc](./pkg/plugins/builtin/ping/README.md) | +| **`script`** | Execute a script under `scripts` folder | [Access plugin doc](./pkg/plugins/builtin/script/README.md) | +| **`callback`** | Use callbacks to manage your tasks life-cycle | [Access plugin doc](./pkg/plugins/builtin/callback/README.md) | #### PreHooks diff --git a/api/bind.go b/api/bind.go new file mode 100644 index 00000000..aab29f20 --- /dev/null +++ b/api/bind.go @@ -0,0 +1,40 @@ +package api + +import ( + "reflect" + + "github.com/gin-gonic/gin" + "github.com/loopfz/gadgeto/tonic" +) + +// bodyBindHook is a wrapper around the default binding hook of tonic. +// It adds the possibility to bind a specific field in an object rather than +// unconditionally binding the whole object. +func bodyBindHook(c *gin.Context, v interface{}) error { + val := reflect.ValueOf(v) + typ := reflect.TypeOf(v).Elem() + + for i := 0; i < typ.NumField(); i++ { + ft := typ.Field(i) + if _, ok := ft.Tag.Lookup("body"); !ok { + continue + } + flt := ft.Type + var fv reflect.Value + if flt.Kind() == reflect.Map { + fv = reflect.New(flt) + } else { + fv = reflect.New(flt.Elem()) + } + if err := tonic.DefaultBindingHook(c, fv.Interface()); err != nil { + return err + } + if flt.Kind() == reflect.Map { + val.Elem().Field(i).Set(fv.Elem()) + } else { + val.Elem().Field(i).Set(fv) + } + } + + return tonic.DefaultBindingHook(c, v) +} diff --git a/api/server.go b/api/server.go index 6539b9ef..0317976a 100644 --- a/api/server.go +++ b/api/server.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/gin-gonic/gin" + "github.com/juju/errors" "github.com/loopfz/gadgeto/tonic" "github.com/loopfz/gadgeto/tonic/utils/jujerr" "github.com/loopfz/gadgeto/zesty" @@ -21,11 +22,28 @@ import ( "github.com/ovh/utask" "github.com/ovh/utask/api/handler" + "github.com/ovh/utask/db" "github.com/ovh/utask/models/resolution" "github.com/ovh/utask/models/task" "github.com/ovh/utask/pkg/auth" ) +type PluginRoute struct { + Secured bool + Maintenance bool + Path string + Method string + Infos []fizz.OperationOption + Handlers []gin.HandlerFunc +} + +type PluginRouterGroup struct { + Path string + Name string + Description string + Routes []PluginRoute +} + // Server wraps the http handler that exposes a REST API to control // the task orchestration engine type Server struct { @@ -37,6 +55,7 @@ type Server struct { editorPathPrefix string maxBodyBytes int64 customMiddlewares []gin.HandlerFunc + pluginRoutes []PluginRouterGroup } // NewServer returns a new Server @@ -132,6 +151,17 @@ func (s *Server) Handler(ctx context.Context) http.Handler { return s.httpHandler } +// RegisterPluginRoutes allows plugins to register custom routes +func (s *Server) RegisterPluginRoutes(group PluginRouterGroup) error { + for _, route := range group.Routes { + if len(route.Handlers) == 0 { + return errors.NewNotImplemented(nil, "found route without handler") + } + } + s.pluginRoutes = append(s.pluginRoutes, group) + return nil +} + func generateBaseHref(pathPrefix, uri string) string { // UI requires to have a trailing slash at the end return path.Join(pathPrefix, uri) + "/" @@ -199,6 +229,7 @@ func (s *Server) build(ctx context.Context) { tonic.SetErrorHook(jujerr.ErrHook) tonic.SetBindHook(yamlBindHook(s.maxBodyBytes)) + tonic.SetBindHook(bodyBindHook) tonic.SetRenderHook(yamljsonRenderHook, "application/json") authRoutes := router.Group("/", "x-misc", "Misc authenticated routes", s.authMiddleware) @@ -458,6 +489,26 @@ func (s *Server) build(ctx context.Context) { }, tonic.Handler(Stats, 200)) + // plugin routes + for _, p := range s.pluginRoutes { + group := router.Group(p.Path, p.Name, p.Description) + + for _, r := range p.Routes { + routeHandlers := []gin.HandlerFunc{} + + if r.Maintenance { + routeHandlers = append(routeHandlers, maintenanceMode) + } + if r.Secured { + routeHandlers = append(routeHandlers, s.authMiddleware) + } + + routeHandlers = append(routeHandlers, r.Handlers...) + + group.Handle(r.Path, r.Method, r.Infos, routeHandlers...) + } + } + s.httpHandler = router } } @@ -531,6 +582,9 @@ func keyRotate(c *gin.Context) error { if err != nil { return err } + if err := db.CallKeyRotations(dbp); err != nil { + return err + } if err := task.RotateTasks(dbp); err != nil { return err } diff --git a/cmd/utask/root.go b/cmd/utask/root.go index b3314bd5..0952a244 100644 --- a/cmd/utask/root.go +++ b/cmd/utask/root.go @@ -143,11 +143,15 @@ var rootCmd = &cobra.Command{ server = api.NewServer() server.WithGroupAuth(defaultAuthHandler) + service := &plugins.Service{Store: store, Server: server} + for _, err := range []error{ + // register builtin initializers + builtin.RegisterInit(service), // register builtin executors builtin.Register(), // run custom initialization code built as *.so plugins - plugins.InitializersFromFolder(utask.FInitializersFolder, &plugins.Service{Store: store, Server: server}), + plugins.InitializersFromFolder(utask.FInitializersFolder, service), // load custom executors built as *.so plugins plugins.ExecutorsFromFolder(utask.FPluginFolder), // load the functions diff --git a/db/encrypt.go b/db/encrypt.go new file mode 100644 index 00000000..5542554b --- /dev/null +++ b/db/encrypt.go @@ -0,0 +1,37 @@ +package db + +import ( + "sync" + + "github.com/loopfz/gadgeto/zesty" +) + +type KeyRotationCallback func(dbp zesty.DBProvider) error + +var ( + keyRotationsCb []KeyRotationCallback + keyRotationsCbMu sync.Mutex +) + +// RegisterKeyRotations registers a callback which will be called during the encrypt +// keys rotations +func RegisterKeyRotations(cb KeyRotationCallback) { + keyRotationsCbMu.Lock() + defer keyRotationsCbMu.Unlock() + + keyRotationsCb = append(keyRotationsCb, cb) +} + +// CallKeyRotations calls registered callbacks to rotate the encryption keys +func CallKeyRotations(dbp zesty.DBProvider) error { + keyRotationsCbMu.Lock() + defer keyRotationsCbMu.Unlock() + + for _, cb := range keyRotationsCb { + if err := cb(dbp); err != nil { + return err + } + } + + return nil +} diff --git a/db/init.go b/db/init.go index 05d41205..ca03c137 100644 --- a/db/init.go +++ b/db/init.go @@ -43,6 +43,11 @@ var schema = []tableModel{ {runnerinstance.Instance{}, "runner_instance", []string{"id"}, true}, } +// RegisterTableModel registers a new table model +func RegisterTableModel(model interface{}, name string, keys []string, autoinc bool) { + schema = append(schema, tableModel{model, name, keys, autoinc}) +} + // Init takes a connection string and a configuration struct // and registers a new postgres DB connection in zesty, // under utask.DBName -> accessible from api handlers and engine collectors diff --git a/db/migration.go b/db/migration.go index 38cba9d2..46fce375 100644 --- a/db/migration.go +++ b/db/migration.go @@ -13,7 +13,7 @@ import ( ) const ( - expectedVersion = "v1.19.0-migration007" + expectedVersion = "v1.20.0-migration008" ) var ( diff --git a/engine/engine_test.go b/engine/engine_test.go index a1a5d400..b227ffc6 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ovh/utask" + "github.com/ovh/utask/api" "github.com/ovh/utask/db" "github.com/ovh/utask/db/pgjuju" "github.com/ovh/utask/engine" @@ -34,6 +35,8 @@ import ( "github.com/ovh/utask/models/task" "github.com/ovh/utask/models/tasktemplate" "github.com/ovh/utask/pkg/now" + "github.com/ovh/utask/pkg/plugins" + plugincallback "github.com/ovh/utask/pkg/plugins/builtin/callback" "github.com/ovh/utask/pkg/plugins/builtin/echo" "github.com/ovh/utask/pkg/plugins/builtin/script" pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask" @@ -52,6 +55,13 @@ func TestMain(m *testing.M) { store := configstore.DefaultStore store.InitFromEnvironment() + server := api.NewServer() + service := &plugins.Service{Store: store, Server: server} + + if err := plugincallback.Init.Init(service); err != nil { + panic(err) + } + if err := db.Init(store); err != nil { panic(err) } @@ -76,6 +86,7 @@ func TestMain(m *testing.M) { step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin) step.RegisterRunner(script.Plugin.PluginName(), script.Plugin) step.RegisterRunner(pluginsubtask.Plugin.PluginName(), pluginsubtask.Plugin) + step.RegisterRunner(plugincallback.Plugin.PluginName(), plugincallback.Plugin) os.Exit(m.Run()) } @@ -1204,3 +1215,19 @@ func TestResolveSubTask(t *testing.T) { } assert.Equal(t, resolution.StateDone, res.State) } + +func TestResolveCallback(t *testing.T) { + res, err := createResolution("callback.yaml", map[string]interface{}{}, nil) + require.NoError(t, err) + + res, err = runResolution(res) + require.NoError(t, err) + require.NotNil(t, res) + + // check steps state + assert.Equal(t, res.Steps["createCallback"].State, step.StateDone) + assert.Equal(t, res.Steps["waitCallback"].State, step.StateWaiting) + + // callback has been created, waiting for its resolution + assert.Equal(t, resolution.StateWaiting, res.State) +} diff --git a/engine/templates_tests/callback.yaml b/engine/templates_tests/callback.yaml new file mode 100644 index 00000000..1fc4faf1 --- /dev/null +++ b/engine/templates_tests/callback.yaml @@ -0,0 +1,38 @@ +name: callbackTemplate +description: Template that test the callbacks +title_format: "[test] callback template test" +variables: + - name: successField + value: 'success' +steps: + createCallback: + description: create a callback + action: + type: callback + configuration: + action: create + schema: |- + { + "$schema": "http://json-schema.org/schema#", + "type": "object", + "additionalProperties": false, + "required": ["{{evalCache `successField`}}"], + "properties": { + "{{evalCache `successField`}}": { + "type": "boolean" + } + } + } + + waitCallback: + dependencies: + - createCallback + description: everything is OK + action: + type: callback + configuration: + action: wait + id: '{{.step.createCallback.output.id}}' + +result_format: + foo: "{{.step.waitCallback.output.success}}" diff --git a/hack/template-schema.json b/hack/template-schema.json index b1928215..d99df614 100644 --- a/hack/template-schema.json +++ b/hack/template-schema.json @@ -134,6 +134,9 @@ { "$ref": "#/definitions/ActionSubtask" }, + { + "$ref": "#/definitions/ActionCallback" + }, { "$ref": "#/definitions/ActionPing" }, @@ -163,6 +166,7 @@ "script", "ssh", "subtask", + "callback", "ping", "notify", "email", @@ -520,6 +524,45 @@ "additionalProperties": false, "description": "Subtask action will spawn a subtask of the specified template, and wait for resolution" }, + "ActionCallback": { + "type": "object", + "properties": { + "type": { + "const": "callback" + }, + "configuration": { + "type": "object", + "additionalProperties": false, + "oneOf": [ + { + "properties": { + "action": { + "const": "create" + }, + "schema": { + "type": "string" + } + }, + "required": ["action"] + }, + { + "properties": { + "action": { + "const": "wait" + }, + "id": { + "type": "string" + } + }, + "required": ["action", "id"] + } + ] + } + }, + "title": "Callback Action", + "additionalProperties": false, + "description": "Callback action allows to create a callback and to wait for the resolution" + }, "ActionTag": { "type": "object", "properties": { diff --git a/hack/test.sh b/hack/test.sh index a50d0c88..988dc8b2 100755 --- a/hack/test.sh +++ b/hack/test.sh @@ -21,6 +21,12 @@ require_exec() { done } +ENV_FILE=$(readlink -f $(dirname ${0}))/test.env + +if [ -f ${ENV_FILE} ]; then + source ${ENV_FILE} +fi + require PG_USER PG_PASSWORD PG_HOST PG_PORT PG_DATABASENAME export CFG_DATABASE="postgres://$PG_USER:$PG_PASSWORD@$PG_HOST:$PG_PORT/$PG_DATABASENAME?connect_timeout=5&sslmode=disable" @@ -43,6 +49,12 @@ cat <$PWD/config/utask-cfg } EOF +cat <$PWD/config/callback-config +{ + "base_url": "http://foo.example.com" +} +EOF + echo "Initializing DB..." if [[ -n "$PSQL_BIN" ]]; then diff --git a/pkg/plugins/builtin/builtin.go b/pkg/plugins/builtin/builtin.go index ca129e76..0585967f 100644 --- a/pkg/plugins/builtin/builtin.go +++ b/pkg/plugins/builtin/builtin.go @@ -2,7 +2,9 @@ package builtin import ( "github.com/ovh/utask/engine/step" + "github.com/ovh/utask/pkg/plugins" pluginapiovh "github.com/ovh/utask/pkg/plugins/builtin/apiovh" + plugincallback "github.com/ovh/utask/pkg/plugins/builtin/callback" pluginecho "github.com/ovh/utask/pkg/plugins/builtin/echo" pluginemail "github.com/ovh/utask/pkg/plugins/builtin/email" pluginhttp "github.com/ovh/utask/pkg/plugins/builtin/http" @@ -15,6 +17,18 @@ import ( "github.com/ovh/utask/pkg/plugins/taskplugin" ) +// RegisterInit takes all builtin init plugins and registers them +func RegisterInit(service *plugins.Service) error { + for pluginName, pluginSymbol := range map[string]plugins.InitializerPlugin{ + "callback": plugincallback.Init, + } { + if err := plugins.RegisterInit(pluginName, pluginSymbol, service); err != nil { + return err + } + } + return nil +} + // Register takes all builtin plugins and registers them as step executors func Register() error { for _, p := range []taskplugin.PluginExecutor{ @@ -28,6 +42,7 @@ func Register() error { pluginping.Plugin, pluginscript.Plugin, plugintag.Plugin, + plugincallback.Plugin, } { if err := step.RegisterRunner(p.PluginName(), p); err != nil { return err diff --git a/pkg/plugins/builtin/callback/README.md b/pkg/plugins/builtin/callback/README.md new file mode 100644 index 00000000..d6c86b0c --- /dev/null +++ b/pkg/plugins/builtin/callback/README.md @@ -0,0 +1,82 @@ +# `callback` Plugin + +This plugin allows to create callbacks. A callback can be created from a step using this plugin. Once the callback has been created, you can +pause your resolution until the callback has been successfully called. + +## Configuration + +| Fields | Description | +| -------------------- | ----------------------------------------------------------------------------------------------------------------- | +| `action ` | `create` to create a callback or `wait` to wait a callback | +| `schema` | only valid if `action` is `create`: validate the body provided during the call of the callback | +| `id` | only valid if `action` is `wait`: ID of the callback to wait | + +## Example + +First, the callback has to be created: + +```yaml +create-cb: + action: + type: callback + configuration: + action: create + schema: |- + { + "$schema": "http://json-schema.org/schema#", + "type": "object", + "additionalProperties": false, + "required": ["success"], + "properties": { + "success": { + "type": "boolean" + } + } + } +``` + +In a second step, you can wait for the callback resolution: + +```yaml +wait-cb: + dependencies: + - create-cb + action: + type: callback + configuration: + action: wait + id: '{{field `step` `create-cb` `output` `id`}}' +``` + +## Requirements + +The base URl for callbacks must be defined in `callback-config` configuration key. The value must be a map with at least the `base_url` key which +contains the base URL to reach the callback API from callers. You can also append a `path_prefix` key to override the default value (`/unsecured/callback/`). + +### Examples + +In those examples, `` will be the callback ID and `` the callback token. + +| Base URL | Path prefix | URL | +| ----------------- | ----------- | ----------------------------------------------- | +| `https://foo.bar` | `-` | `https://foo.bar/unsecured/callback/?t=` | +| `https://foo.bar` | `/` | `https://foo.bar/?t=` | +| `https://foo.bar` | `/foobar/` | `https://foo.bar/foobar/?t=` | + +## Return + +### Callback `create` action output + +| Name | Description | +| -------- | ------------------------------------- | +| `id` | The public identifier of the callback | +| `url` | The public URL of the callback | +| `schema` | The sanitized schema | + +### Callback `wait` action output + +| Name | Description | +| ------ | ------------------------------------- | +| `id` | The public identifier of the callback | +| `date` | The call date | +| `body` | The provided body during the call | diff --git a/pkg/plugins/builtin/callback/callback.go b/pkg/plugins/builtin/callback/callback.go new file mode 100644 index 00000000..174ed977 --- /dev/null +++ b/pkg/plugins/builtin/callback/callback.go @@ -0,0 +1,118 @@ +package plugincallback + +import ( + "fmt" + "strings" + + "github.com/juju/errors" + "github.com/loopfz/gadgeto/zesty" + "github.com/ovh/utask" + "github.com/ovh/utask/models/task" + "github.com/ovh/utask/pkg/plugins/taskplugin" +) + +var ( + Plugin = taskplugin.New("callback", "0.1", exec, + taskplugin.WithConfig(validConfig, CallbackStepConfig{}), + taskplugin.WithContextFunc(ctx), + ) +) + +type CallbackStepConfig struct { + Action string `json:"action"` + BodySchema string `json:"schema,omitempty"` + ID string `json:"id"` +} + +type CallbackContext struct { + StepName string `json:"step"` + TaskID string `json:"task_id"` + RequesterUsername string `json:"requester_username"` +} + +func ctx(stepName string) interface{} { + return &CallbackContext{ + TaskID: "{{.task.task_id}}", + RequesterUsername: "{{.task.requester_username}}", + StepName: stepName, + } +} + +func validConfig(config interface{}) error { + cfg := config.(*CallbackStepConfig) + + switch strings.ToLower(cfg.Action) { + case "create": + return nil + + case "wait": + if cfg.ID == "" { + return fmt.Errorf("missing %q parameter", "id") + } + return nil + + default: + return fmt.Errorf("invalid action %q", cfg.Action) + } +} + +func exec(stepName string, config interface{}, ctx interface{}) (interface{}, interface{}, error) { + dbp, err := zesty.NewDBProvider(utask.DBName) + if err != nil { + return nil, nil, err + } + + cfg := config.(*CallbackStepConfig) + stepContext := ctx.(*CallbackContext) + + task, err := task.LoadFromPublicID(dbp, stepContext.TaskID) + if err != nil { + return nil, nil, err + } + + switch strings.ToLower(cfg.Action) { + case "create": + cb, err := createCallback(dbp, task, stepContext, cfg.BodySchema) + if err != nil { + return nil, nil, err + } + + return map[string]interface{}{ + "id": cb.PublicID, + "url": buildUrl(cb), + "schema": cb.Schema, + }, nil, nil + + case "wait": + cb, err := loadFromPublicID(dbp, cfg.ID, false) + if err != nil { + return nil, nil, err + } + + if cb.Called == nil { + return nil, nil, errors.NewNotAssigned(fmt.Errorf("task is waiting for a callback"), "") + } + + return map[string]interface{}{ + "id": cb.PublicID, + "date": cb.Called, + "body": cb.Body, + }, nil, nil + + default: + return nil, nil, errors.BadRequestf("invalid action %q", cfg.Action) + } +} + +func buildUrl(cb *callback) string { + if Init.cfg.PathPrefix == "" { + return fmt.Sprintf("%s%s/%s?t=%s", Init.cfg.BaseURL, defaultCallbackPathPrefix, cb.PublicID, cb.Secret) + } else { + basePath := Init.cfg.PathPrefix + if !strings.HasPrefix(basePath, "/") { + basePath = fmt.Sprintf("/%s", basePath) + } + basePath = strings.TrimSuffix(basePath, "/") + return fmt.Sprintf("%s%s/%s?t=%s", Init.cfg.BaseURL, basePath, cb.PublicID, cb.Secret) + } +} diff --git a/pkg/plugins/builtin/callback/callback_test.go b/pkg/plugins/builtin/callback/callback_test.go new file mode 100644 index 00000000..508e26d1 --- /dev/null +++ b/pkg/plugins/builtin/callback/callback_test.go @@ -0,0 +1,71 @@ +package plugincallback + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_validConfig(t *testing.T) { + // create step - valid + cfg := CallbackStepConfig{ + Action: "create", + } + cfgJSON, err := json.Marshal(cfg) + assert.NoError(t, err) + assert.NoError(t, Plugin.ValidConfig(json.RawMessage(""), json.RawMessage(cfgJSON))) + + // create step - valid with JSON schema + cfg.BodySchema = "{}" + cfgJSON, err = json.Marshal(cfg) + assert.NoError(t, err) + assert.NoError(t, Plugin.ValidConfig(json.RawMessage(""), json.RawMessage(cfgJSON))) + + // wait step - invalid missing id + cfg = CallbackStepConfig{ + Action: "wait", + } + cfgJSON, err = json.Marshal(cfg) + assert.NoError(t, err) + assert.Error(t, Plugin.ValidConfig(json.RawMessage(""), json.RawMessage(cfgJSON)), "missing \"id\" parameter") + + // wait step - valid + cfg.ID = "foo" + cfgJSON, err = json.Marshal(cfg) + assert.NoError(t, err) + assert.NoError(t, Plugin.ValidConfig(json.RawMessage(""), json.RawMessage(cfgJSON))) + + // unknown action + cfg = CallbackStepConfig{ + Action: "foo", + } + cfgJSON, err = json.Marshal(cfg) + assert.NoError(t, err) + assert.Error(t, Plugin.ValidConfig(json.RawMessage(""), json.RawMessage(cfgJSON)), "invalid action \"foo\"") +} + +func Test_buildUrl(t *testing.T) { + cb := &callback{ + ID: 42, + PublicID: "foobar", + Secret: "s3cr3t", + } + + Init.cfg.BaseURL = "http://utask.example.com" + Init.cfg.PathPrefix = "" + assert.Equal(t, buildUrl(cb), fmt.Sprintf("http://utask.example.com%s/foobar?t=s3cr3t", defaultCallbackPathPrefix)) + + Init.cfg.BaseURL = "http://utask.example.com" + Init.cfg.PathPrefix = "/foo" + assert.Equal(t, buildUrl(cb), "http://utask.example.com/foo/foobar?t=s3cr3t") + + Init.cfg.BaseURL = "http://utask.example.com" + Init.cfg.PathPrefix = "/bar/" + assert.Equal(t, buildUrl(cb), "http://utask.example.com/bar/foobar?t=s3cr3t") + + Init.cfg.BaseURL = "http://utask.example.com" + Init.cfg.PathPrefix = "/" + assert.Equal(t, buildUrl(cb), "http://utask.example.com/foobar?t=s3cr3t") +} diff --git a/pkg/plugins/builtin/callback/handler.go b/pkg/plugins/builtin/callback/handler.go new file mode 100644 index 00000000..029cc425 --- /dev/null +++ b/pkg/plugins/builtin/callback/handler.go @@ -0,0 +1,109 @@ +package plugincallback + +import ( + "github.com/gin-gonic/gin" + "github.com/juju/errors" + "github.com/loopfz/gadgeto/zesty" + "github.com/ovh/utask" + "github.com/ovh/utask/engine" + "github.com/ovh/utask/models/task" + "github.com/ovh/utask/pkg/jsonschema" + "github.com/ovh/utask/pkg/metadata" + "github.com/sirupsen/logrus" +) + +const ( + CallbackID = "callback_id" + CallbackSecret = "callback_secret" + CallbackBody = "callback_body" +) + +type handleCallbackIn struct { + CallbackID string `path:"id, required"` + CallbackSecret string `query:"t, required"` + Body map[string]interface{} `body:""` +} + +type handleCallbackOut struct { + Message string `json:"message"` +} + +func HandleCallback(c *gin.Context, in *handleCallbackIn) (res *handleCallbackOut, err error) { + metadata.AddActionMetadata(c, CallbackID, in.CallbackID) + metadata.AddActionMetadata(c, CallbackSecret, in.CallbackSecret) + metadata.AddActionMetadata(c, CallbackBody, in.Body) + + dbp, err := zesty.NewDBProvider(utask.DBName) + if err != nil { + return nil, err + } + + if err = dbp.Tx(); err != nil { + return nil, err + } + + cb, err := loadFromPublicID(dbp, in.CallbackID, true) + if err != nil { + dbp.Rollback() + return nil, err + } + + if cb.Secret != in.CallbackSecret { + dbp.Rollback() + return nil, errors.NotFoundf("failed to load callback from public id: callback") + } + + if cb.Called != nil { + dbp.Rollback() + return nil, errors.BadRequestf("callback has already been resolved") + } + + t, err := task.LoadFromID(dbp, cb.TaskID) + if err != nil { + dbp.Rollback() + return nil, errors.BadRequestf("unable to fetch related task") + } + + // Check the state of the related task + switch t.State { + case task.StateBlocked, task.StateRunning, task.StateWaiting: + default: + dbp.Rollback() + return nil, errors.BadRequestf("related task is not in a valid state: %s", t.State) + } + + if cb.Schema != nil { + s, err := jsonschema.NormalizeAndCompile(in.CallbackID, cb.Schema) + if err != nil { + dbp.Rollback() + return nil, errors.BadRequestf("unable to validate body: %s", err) + } + vc := jsonschema.Validator(in.CallbackID, s) + if err := vc(in.Body); err != nil { + dbp.Rollback() + return nil, errors.BadRequestf("unable to validate body: %s", err) + } + } + + if err = cb.SetCalled(dbp, in.Body); err != nil { + dbp.Rollback() + return nil, err + } + + if err := dbp.Commit(); err != nil { + dbp.Rollback() + return nil, err + } + + logrus.Debugf("resuming task %q resolution %q", t.PublicID, *t.Resolution) + logrus.WithFields(logrus.Fields{"task_id": t.PublicID, "resolution_id": *t.Resolution}).Debugf("resuming resolution %q as callback %q has been called", *t.Resolution, cb.PublicID) + + // We ignore the potential error because the caller don't care about it + _ = engine.GetEngine().Resolve(*t.Resolution, nil) + + res = &handleCallbackOut{ + Message: "The callback has been resolved", + } + + return res, nil +} diff --git a/pkg/plugins/builtin/callback/handler_test.go b/pkg/plugins/builtin/callback/handler_test.go new file mode 100644 index 00000000..8b867691 --- /dev/null +++ b/pkg/plugins/builtin/callback/handler_test.go @@ -0,0 +1,142 @@ +package plugincallback + +import ( + "context" + "fmt" + "net/http/httptest" + "os" + "sync" + "testing" + + "github.com/gin-gonic/gin" + "github.com/loopfz/gadgeto/zesty" + "github.com/ovh/configstore" + "github.com/ovh/utask" + "github.com/ovh/utask/api" + "github.com/ovh/utask/db" + "github.com/ovh/utask/engine" + "github.com/ovh/utask/engine/input" + "github.com/ovh/utask/engine/step" + "github.com/ovh/utask/engine/values" + "github.com/ovh/utask/models/resolution" + "github.com/ovh/utask/models/task" + "github.com/ovh/utask/models/tasktemplate" + "github.com/ovh/utask/pkg/now" + "github.com/ovh/utask/pkg/plugins" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + store := configstore.DefaultStore + store.InitFromEnvironment() + + server := api.NewServer() + service := &plugins.Service{Store: store, Server: server} + + if err := Init.Init(service); err != nil { + panic(err) + } + + if err := db.Init(store); err != nil { + panic(err) + } + + if err := now.Init(); err != nil { + panic(err) + } + + var wg sync.WaitGroup + + if err := engine.Init(context.Background(), &wg, store); err != nil { + panic(err) + } + + step.RegisterRunner(Plugin.PluginName(), Plugin) + + os.Exit(m.Run()) +} + +func TestHandleCallback(t *testing.T) { + dbp, err := zesty.NewDBProvider(utask.DBName) + assert.NoError(t, err) + + tt, err := tasktemplate.Create(dbp, "callback", "callback", nil, nil, []input.Input{}, []input.Input{}, []string{}, []string{}, false, false, nil, []values.Variable{}, nil, nil, "foo", nil, false, nil) + assert.NoError(t, err) + assert.NotNil(t, tt) + + tsk, err := task.Create(dbp, tt, "foo", []string{}, []string{}, []string{}, []string{}, []string{}, nil, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, t) + + res, err := resolution.Create(dbp, tsk, nil, "bar", true, nil) + assert.NoError(t, err) + assert.NotNil(t, res) + + tsk.Resolution = &res.PublicID + tsk.Update(dbp, true, false) + + cb, err := createCallback(dbp, tsk, &CallbackContext{ + StepName: "foo", + TaskID: tsk.PublicID, + RequesterUsername: "foo", + }, `{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "additionalProperties": false, + "required": ["success"], + "properties": { + "success": { + "type": "boolean" + } + } + }`) + assert.NoError(t, err) + assert.NotNil(t, cb) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + // invalid state + out, err := HandleCallback(c, &handleCallbackIn{ + CallbackID: cb.PublicID, + CallbackSecret: cb.Secret, + Body: map[string]interface{}{}, + }) + assert.EqualError(t, err, "related task is not in a valid state: TODO") + assert.Nil(t, out) + + // run the task + tsk.SetState(task.StateRunning) + tsk.Update(dbp, true, false) + + // Invalid JSON-schema + out, err = HandleCallback(c, &handleCallbackIn{ + CallbackID: cb.PublicID, + CallbackSecret: cb.Secret, + Body: map[string]interface{}{}, + }) + assert.EqualError(t, err, fmt.Sprintf("unable to validate body: I[#] S[#] doesn't validate with %q\n I[#] S[#/required] missing properties: %q", cb.PublicID+"#", "success")) + assert.Nil(t, out) + + // valid body + out, err = HandleCallback(c, &handleCallbackIn{ + CallbackID: cb.PublicID, + CallbackSecret: cb.Secret, + Body: map[string]interface{}{ + "success": true, + }, + }) + assert.NoError(t, err) + assert.Equal(t, out.Message, "The callback has been resolved") + + // already called + out, err = HandleCallback(c, &handleCallbackIn{ + CallbackID: cb.PublicID, + CallbackSecret: cb.Secret, + Body: map[string]interface{}{ + "success": true, + }, + }) + assert.Nil(t, out) + assert.EqualError(t, err, "callback has already been resolved") +} diff --git a/pkg/plugins/builtin/callback/init.go b/pkg/plugins/builtin/callback/init.go new file mode 100644 index 00000000..de2b687f --- /dev/null +++ b/pkg/plugins/builtin/callback/init.go @@ -0,0 +1,135 @@ +package plugincallback + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/gin-gonic/gin" + "github.com/loopfz/gadgeto/tonic" + "github.com/ovh/configstore" + "github.com/ovh/utask" + "github.com/ovh/utask/api" + "github.com/ovh/utask/db" + "github.com/ovh/utask/pkg/plugins" + "github.com/wI2L/fizz" +) + +const ( + configAlias = "callback-config" + defaultCallbackPathPrefix = "/unsecured/callback" +) + +var ( + Init = NewCallbackInit() +) + +type CallbackConfig struct { + BaseURL string `json:"base_url"` + PathPrefix string `json:"path_prefix,omitempty"` +} + +type CallbackInit struct { + cfg CallbackConfig +} + +func NewCallbackInit() *CallbackInit { + return &CallbackInit{} +} + +func (ci *CallbackInit) Init(s *plugins.Service) error { + if err := ci.loadConfig(s.Store); err != nil { + return fmt.Errorf("unable to load configuration: %s", err) + } + + db.RegisterTableModel(callback{}, "callback", []string{"id"}, true) + db.RegisterKeyRotations(RotateEncryptionKeys) + + group := api.PluginRouterGroup{ + Path: defaultCallbackPathPrefix, + Name: "callback", + Description: "Callback plugin routes.", + Routes: []api.PluginRoute{ + { + Path: "/:id", + Method: "POST", + Infos: []fizz.OperationOption{ + fizz.ID("HandleCallback"), + fizz.Summary("Call waiting callback"), + fizz.Description("This action updates a waiting callback to resolves it."), + }, + Handlers: []gin.HandlerFunc{ + tonic.Handler(HandleCallback, 200), + }, + Maintenance: true, + }, + }, + } + if err := s.Server.RegisterPluginRoutes(group); err != nil { + return err + } + + return nil +} + +func (ci *CallbackInit) Description() string { + return `This plugin will init the callback task plugin.` +} + +func (ci *CallbackInit) loadConfig(store *configstore.Store) error { + var ret CallbackConfig + var notFound configstore.ErrItemNotFound + + cbFilter := configstore.Filter().Store(store).Slice(configAlias).Squash() + cbItems, err := cbFilter.GetItemList() + if err != nil { + return err + } + + if cbItems.Len() > 0 { + jsonStr, err := cbItems.Items[0].Value() + if err != nil { + return err + } + + if err := json.Unmarshal([]byte(jsonStr), &ret); err != nil { + return err + } + + if ret.BaseURL == "" { + return fmt.Errorf("\"base_url\" key not defined in %q", configAlias) + } + } else { + utaskFilter := configstore.Filter().Store(store).Slice(utask.UtaskCfgSecretAlias).Squash() + utaskItem, err := utaskFilter.GetFirstItem() + if err != nil { + if errors.As(err, ¬Found) { + return fmt.Errorf("configstore: get %q: no item found", configAlias) + } + return err + } + + var utaskRet utask.Cfg + + jsonStr, err := utaskItem.Value() + if err != nil { + return err + } + + if err := json.Unmarshal([]byte(jsonStr), &utaskRet); err != nil { + return err + } + + if utaskRet.BaseURL == "" { + return fmt.Errorf("configstore: get %q: no item found", configAlias) + } + + ret.BaseURL = utaskRet.BaseURL + } + + ci.cfg.BaseURL = strings.TrimSuffix(ret.BaseURL, "/") + ci.cfg.PathPrefix = ret.PathPrefix + + return nil +} diff --git a/pkg/plugins/builtin/callback/init_test.go b/pkg/plugins/builtin/callback/init_test.go new file mode 100644 index 00000000..e640cd70 --- /dev/null +++ b/pkg/plugins/builtin/callback/init_test.go @@ -0,0 +1,97 @@ +package plugincallback + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/ovh/configstore" + "github.com/stretchr/testify/assert" +) + +func Test_loadConfig(t *testing.T) { + tests := []struct { + utaskCfg *string + callbackCfg *string + expectedCfg *CallbackConfig + expectedError *string + }{ + { + callbackCfg: cfg(map[string]interface{}{ + "base_url": "http://utask.example.com/callbacks/", + "path_prefix": "path-prefix", + }), + expectedCfg: &CallbackConfig{ + BaseURL: "http://utask.example.com/callbacks", + PathPrefix: "path-prefix", + }, + }, + { + utaskCfg: cfg(map[string]interface{}{ + "base_url": "http://utask.example.com/", + }), + expectedCfg: &CallbackConfig{ + BaseURL: "http://utask.example.com", + PathPrefix: "", + }, + }, + { + expectedError: str("configstore: get %q: no item found", configAlias), + }, + { + callbackCfg: cfg(map[string]interface{}{}), + expectedError: str("\"base_url\" key not defined in %q", configAlias), + }, + } + + for _, test := range tests { + store := configstore.NewStore() + store.RegisterProvider("tests", configstoreProvider(test.utaskCfg, test.callbackCfg)) + + init := NewCallbackInit() + err := init.loadConfig(store) + + if test.expectedError == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, *test.expectedError) + } + + if test.expectedCfg != nil { + assert.Equal(t, test.expectedCfg, &(init.cfg)) + } + } +} + +func configstoreProvider(utaskCfg *string, callbackCfg *string) func() (configstore.ItemList, error) { + var items []configstore.Item + + if utaskCfg != nil { + items = append(items, configstore.NewItem("utask-cfg", *utaskCfg, 1)) + } + + if callbackCfg != nil { + items = append(items, configstore.NewItem("callback-config", *callbackCfg, 1)) + } + + return func() (configstore.ItemList, error) { + ret := configstore.ItemList{ + Items: items, + } + return ret, nil + } +} + +func cfg(v map[string]interface{}) *string { + jsonBytes, err := json.Marshal(v) + if err != nil { + return nil + } + jsonStr := string(jsonBytes) + return &jsonStr +} + +func str(format string, a ...interface{}) *string { + ret := fmt.Sprintf(format, a...) + return &ret +} diff --git a/pkg/plugins/builtin/callback/models.go b/pkg/plugins/builtin/callback/models.go new file mode 100644 index 00000000..a95f040e --- /dev/null +++ b/pkg/plugins/builtin/callback/models.go @@ -0,0 +1,286 @@ +package plugincallback + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/Masterminds/squirrel" + "github.com/gofrs/uuid" + "github.com/juju/errors" + "github.com/loopfz/gadgeto/zesty" + "github.com/ovh/utask" + "github.com/ovh/utask/db/pgjuju" + "github.com/ovh/utask/db/sqlgenerator" + "github.com/ovh/utask/models" + "github.com/ovh/utask/models/resolution" + "github.com/ovh/utask/models/task" + "github.com/ovh/utask/pkg/jsonschema" + "github.com/ovh/utask/pkg/now" +) + +type callback struct { + ID int64 `json:"-" db:"id"` + PublicID string `json:"id" db:"public_id"` + TaskID int64 `json:"-" db:"id_task"` + ResolutionID int64 `json:"-" db:"id_resolution"` + Created time.Time `json:"created" db:"created"` + Updated time.Time `json:"updated" db:"updated"` + Called *time.Time `json:"called" db:"called"` + ResolverUsername string `json:"resolver" db:"resolver_username"` + EncryptedSchema []byte `json:"-" db:"encrypted_schema"` + EncryptedSecret []byte `json:"-" db:"encrypted_secret"` + EncryptedBody []byte `json:"-" db:"encrypted_body"` + Secret string `json:"-" db:"-"` + Body json.RawMessage `json:"body" db:"-"` + Schema json.RawMessage `json:"schema" db:"-"` +} + +func createCallback(dbp zesty.DBProvider, task *task.Task, ctx *CallbackContext, schemaJSON string) (cb *callback, err error) { + defer errors.DeferredAnnotatef(&err, "Failed to create callback") + + resolution, err := resolution.LoadFromPublicID(dbp, *task.Resolution) + if err != nil { + return nil, err + } + + cb = &callback{ + PublicID: uuid.Must(uuid.NewV4()).String(), + TaskID: task.ID, + ResolutionID: resolution.ID, + Created: now.Get(), + Updated: now.Get(), + ResolverUsername: ctx.RequesterUsername, + Secret: uuid.Must(uuid.NewV4()).String(), + } + + if schemaJSON != "" { + cb.Schema, err = jsonschema.NormalizeAndCompile(ctx.StepName, json.RawMessage(schemaJSON)) + + if err != nil { + return nil, errors.NewBadRequest(fmt.Errorf("unable to parse provided schema: %s", err), "") + } + } + + cb.EncryptedSchema, err = models.EncryptionKey.Encrypt(cb.Schema, []byte(cb.PublicID)) + if err != nil { + return nil, err + } + + cb.EncryptedSecret, err = models.EncryptionKey.Encrypt([]byte(cb.Secret), []byte(cb.PublicID)) + if err != nil { + return nil, err + } + + cb.EncryptedBody, err = models.EncryptionKey.Encrypt([]byte{}, []byte(cb.PublicID)) + if err != nil { + return nil, err + } + + err = dbp.DB().Insert(cb) + if err != nil { + return nil, pgjuju.Interpret(err) + } + + return cb, nil +} + +func (cb *callback) update(dbp zesty.DBProvider) (err error) { + defer errors.DeferredAnnotatef(&err, "failed to update callback") + + cb.EncryptedSchema, err = models.EncryptionKey.Encrypt(cb.Schema, []byte(cb.PublicID)) + if err != nil { + return err + } + + cb.EncryptedSecret, err = models.EncryptionKey.Encrypt([]byte(cb.Secret), []byte(cb.PublicID)) + if err != nil { + return err + } + + cb.EncryptedBody, err = models.EncryptionKey.Encrypt(cb.Body, []byte(cb.PublicID)) + if err != nil { + return err + } + + rows, err := dbp.DB().Update(&cb) + if err != nil { + return pgjuju.Interpret(err) + } else if rows == 0 { + return errors.NotFoundf("no such callback to update: %s", cb.PublicID) + } + + return nil +} + +func (cb *callback) SetCalled(dbp zesty.DBProvider, body interface{}) (err error) { + defer errors.DeferredAnnotatef(&err, "failed to update callback") + + bodyBytes, err := json.Marshal(body) + if err != nil { + return err + } + + cb.EncryptedBody, err = models.EncryptionKey.Encrypt(bodyBytes, []byte(cb.PublicID)) + if err != nil { + return err + } + + nowTime := now.Get() + cb.Called = &nowTime + + rows, err := dbp.DB().Update(cb) + if err != nil { + return pgjuju.Interpret(err) + } else if rows == 0 { + return errors.NotFoundf("no such task to update: %s", cb.PublicID) + } + + return nil +} + +func loadFromPublicID(dbp zesty.DBProvider, publicID string, forUpdate bool) (*callback, error) { + cb, err := load(dbp, publicID, forUpdate) + if err != nil { + return nil, err + } + + schemaBytes, err := models.EncryptionKey.Decrypt(cb.EncryptedSchema, []byte(cb.PublicID)) + if err != nil { + return nil, err + } + cb.Schema = schemaBytes + + secretBytes, err := models.EncryptionKey.Decrypt(cb.EncryptedSecret, []byte(cb.PublicID)) + if err != nil { + return nil, err + } + cb.Secret = string(secretBytes) + + bodyBytes, err := models.EncryptionKey.Decrypt(cb.EncryptedBody, []byte(cb.PublicID)) + if err != nil { + return nil, err + } + cb.Body = bodyBytes + + return cb, err +} + +func load(dbp zesty.DBProvider, publicID string, locked bool) (cb *callback, err error) { + defer errors.DeferredAnnotatef(&err, "failed to load callback from public id") + + sel := rSelector + + if locked { + sel = sel.Suffix(`FOR NO KEY UPDATE OF "callback"`) + } + + query, params, err := sel. + Where(squirrel.Eq{`"callback".public_id`: publicID}). + ToSql() + if err != nil { + return nil, err + } + + var rows []*callback + _, err = dbp.DB().Select(&rows, query, params...) + if err != nil { + return nil, pgjuju.Interpret(err) + } else if len(rows) != 1 { + return nil, errors.NotFoundf("callback") + } else { + cb = rows[0] + } + + return cb, nil +} + +func listCallbacks(dbp zesty.DBProvider, pageSize uint64, last *string, locked bool) (r []*callback, err error) { + defer errors.DeferredAnnotatef(&err, "failed to list callbacks") + + sel := rSelector.OrderBy( + `"resolution".id`, + ).Limit( + pageSize, + ) + + if locked { + sel = sel.Suffix(`FOR NO KEY UPDATE OF "callback"`) + } + + if last != nil { + lastR, err := loadFromPublicID(dbp, *last, locked) + if err != nil { + return nil, err + } + sel = sel.Where(`"callback".id > ?`, lastR.ID) + } + + query, params, err := sel.ToSql() + if err != nil { + return nil, err + } + + _, err = dbp.DB().Select(&r, query, params...) + if err != nil { + return nil, pgjuju.Interpret(err) + } + + return r, nil +} + +// RotateEncryptionKeys loads all callbacks stored in DB and makes sure +// that their cyphered content has been handled with the latest +// available storage key +func RotateEncryptionKeys(dbp zesty.DBProvider) (err error) { + defer errors.DeferredAnnotatef(&err, "Failed to rotate encrypted callbacks to new key") + + var last string + for { + var lastID *string + if last != "" { + lastID = &last + } + // load all callbacks + callbacks, err := listCallbacks(dbp, utask.MaxPageSize, lastID, false) + if err != nil { + return err + } + if len(callbacks) == 0 { + break + } + last = callbacks[len(callbacks)-1].PublicID + + for _, c := range callbacks { + sp, err := dbp.TxSavepoint() + if err != nil { + return err + } + // load callback locked + cb, err := loadFromPublicID(dbp, c.PublicID, true) + if err != nil { + dbp.RollbackTo(sp) + return err + } + // update callback (encrypt) + if err := cb.update(dbp); err != nil { + dbp.RollbackTo(sp) + return err + } + // commit + if err := dbp.Commit(); err != nil { + return err + } + } + } + + return nil +} + +var rSelector = sqlgenerator.PGsql.Select( + `"callback".id, "callback".public_id, "callback".id_task, "callback".id_resolution, "callback".resolver_username, "callback".created, "callback".updated, "callback".called, "callback".encrypted_schema, "callback".encrypted_body, "callback".encrypted_secret`, +).From( + `"callback"`, +).OrderBy( + `"callback".id`, +) diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index 8ff37852..16e72e06 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -62,15 +62,11 @@ func InitializersFromFolder(path string, service *Service) error { return fmt.Errorf("failed to load Plugin from %s: received a non-pointer object", fileName) } pluginInterface := reflectvalue.Elem().Interface() - plug, ok := pluginInterface.(InitializerPlugin) + plugin, ok := pluginInterface.(InitializerPlugin) if !ok { return fmt.Errorf("failed to assert type of plugin '%s': expected InitializerPlugin got %T", fileName, pluginInterface) } - if err := plug.Init(service); err != nil { - return fmt.Errorf("failed to run initialization plugin: %s", err) - } - logrus.Infof("Ran initialization plugin: %s", plug.Description()) - return nil + return RegisterInit(fileName, plugin, service) }) } @@ -98,3 +94,11 @@ func loadPlugins(path string, load func(string, plugin.Symbol) error) error { } return nil } + +func RegisterInit(pluginName string, plugin InitializerPlugin, service *Service) error { + if err := plugin.Init(service); err != nil { + return fmt.Errorf("failed to run initialization plugin: %s", err) + } + logrus.Infof("Ran initialization plugin: %s", plugin.Description()) + return nil +} diff --git a/sql/migrations/008_callbacks.sql b/sql/migrations/008_callbacks.sql new file mode 100644 index 00000000..b429d5bd --- /dev/null +++ b/sql/migrations/008_callbacks.sql @@ -0,0 +1,25 @@ +-- +migrate Up + +CREATE TABLE "callback" ( + id BIGSERIAL PRIMARY KEY, + created TIMESTAMP with time zone DEFAULT now() NOT NULL, + updated TIMESTAMP with time zone DEFAULT now() NOT NULL, + called TIMESTAMP with time zone, + public_id UUID UNIQUE NOT NULL, + id_task BIGINT NOT NULL REFERENCES "task"(id) ON DELETE CASCADE, + id_resolution BIGINT NOT NULL REFERENCES "resolution"(id) ON DELETE CASCADE, + resolver_username TEXT NOT NULL, + encrypted_schema BYTEA NOT NULL, + encrypted_body BYTEA NOT NULL, + encrypted_secret BYTEA NOT NULL +); +CREATE INDEX ON "callback"(id_task); +CREATE INDEX ON "callback"(id_resolution); + +INSERT INTO "utask_sql_migrations" VALUES ('v1.20.0-migration008'); + +-- +migrate Down + +DROP TABLE "callback" CASCADE; + +DELETE FROM "utask_sql_migrations" WHERE current_migration_applied = 'v1.20.0-migration008'; diff --git a/sql/schema.sql b/sql/schema.sql index e7c0d407..e64cef81 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -113,10 +113,26 @@ CREATE TABLE "runner_instance" ( heartbeat TIMESTAMP with time zone DEFAULT now() NOT NULL ); +CREATE TABLE "callback" ( + id BIGSERIAL PRIMARY KEY, + created TIMESTAMP with time zone DEFAULT now() NOT NULL, + updated TIMESTAMP with time zone DEFAULT now() NOT NULL, + called TIMESTAMP with time zone, + public_id UUID UNIQUE NOT NULL, + id_task BIGINT NOT NULL REFERENCES "task"(id) ON DELETE CASCADE, + id_resolution BIGINT NOT NULL REFERENCES "resolution"(id) ON DELETE CASCADE, + resolver_username TEXT NOT NULL, + encrypted_schema BYTEA NOT NULL, + encrypted_body BYTEA NOT NULL, + encrypted_secret BYTEA NOT NULL +); +CREATE INDEX ON "callback"(id_task); +CREATE INDEX ON "callback"(id_resolution); + CREATE TABLE "utask_sql_migrations" ( current_migration_applied TEXT PRIMARY KEY ); -INSERT INTO "utask_sql_migrations" VALUES ('v1.19.0-migration007'); +INSERT INTO "utask_sql_migrations" VALUES ('v1.20.0-migration008'); END;