Skip to content

Commit

Permalink
feat: add compress package & the ability to compress steps in a resol…
Browse files Browse the repository at this point in the history
…ution

Signed-off-by: Simon Delberghe <open-source@orandin.fr>
  • Loading branch information
orandin authored and rclsilver committed Nov 14, 2022
1 parent 729e8ab commit bf23fbb
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 9 deletions.
5 changes: 5 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/auth"
compress "github.com/ovh/utask/pkg/compress/init"
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/plugins/builtin/echo"
"github.com/ovh/utask/pkg/plugins/builtin/script"
Expand Down Expand Up @@ -71,6 +72,10 @@ func TestMain(m *testing.M) {
panic(err)
}

if err := compress.Register(); err != nil {
panic(err)
}

ctx := context.Background()
var wg sync.WaitGroup
if err := engine.Init(ctx, &wg, store); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions cmd/utask/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
functionsrunner "github.com/ovh/utask/engine/functions/runner"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/auth"
compress "github.com/ovh/utask/pkg/compress/init"
notify "github.com/ovh/utask/pkg/notify/init"
"github.com/ovh/utask/pkg/plugins"
"github.com/ovh/utask/pkg/plugins/builtin"
Expand Down Expand Up @@ -146,6 +147,8 @@ var rootCmd = &cobra.Command{
service := &plugins.Service{Store: store, Server: server}

for _, err := range []error{
// register compression algorithms
compress.Register(),
// run custom initialization code built as *.so plugins
plugins.InitializersFromFolder(utask.FInitializersFolder, service),
// register builtin initializers
Expand Down Expand Up @@ -178,6 +181,8 @@ var rootCmd = &cobra.Command{
server.SetDashboardSentryDSN(cfg.DashboardSentryDSN)
server.SetMaxBodyBytes(cfg.ServerOptions.MaxBodyBytes)

utask.StepsCompressionAlg = cfg.StepsCompression

if utask.FDebug {
log.SetLevel(log.DebugLevel)
}
Expand Down
5 changes: 4 additions & 1 deletion config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ postgres://user:pass@db/utask?sslmode=disable
// dashboard_sentry_dsn defines the Sentry DSN for the Dashboard UI. Used to retrieve Javascript execution errors inside a Sentry instance.
// default: empty, no SENTRY_DSN
"dashboard_sentry_dsn": "",
// server_options holds configuration to fine-tune DB connection
// steps_compression_algorithm defines the compression algorithm to use to compress the steps data in database.
// default: empty, no compression. Available compression algorithms: gzip
"steps_compression_algorithm": "",
// server_options holds configuration to fine-tune DB connection
"server_options": {
// max_body_bytes defines the maximum size that will be read when sending a body to the uTask server.
// value can't be smaller than 1KB (1024), and can't be bigger than 10MB (10*1024*1024)
Expand Down
5 changes: 5 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ovh/utask/models/resolution"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
compress "github.com/ovh/utask/pkg/compress/init"
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/plugins"
plugincallback "github.com/ovh/utask/pkg/plugins/builtin/callback"
Expand Down Expand Up @@ -71,6 +72,10 @@ func TestMain(m *testing.M) {
panic(err)
}

if err := compress.Register(); err != nil {
panic(err)
}

var wg sync.WaitGroup

if err := engine.Init(context.Background(), &wg, store); err != nil {
Expand Down
48 changes: 41 additions & 7 deletions models/resolution/resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ovh/utask/models"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/compress"
"github.com/ovh/utask/pkg/now"

"github.com/Masterminds/squirrel"
Expand Down Expand Up @@ -81,9 +82,10 @@ type DBModel struct {
RunCount int `json:"run_count" db:"run_count"`
RunMax int `json:"run_max" db:"run_max"`

CryptKey []byte `json:"-" db:"crypt_key"` // key for encrypting steps (itself encrypted with master key)
EncryptedInput []byte `json:"-" db:"encrypted_resolver_input"`
EncryptedSteps []byte `json:"-" db:"encrypted_steps"` // encrypted Steps map
CryptKey []byte `json:"-" db:"crypt_key"` // key for encrypting steps (itself encrypted with master key)
EncryptedInput []byte `json:"-" db:"encrypted_resolver_input"`
EncryptedSteps []byte `json:"-" db:"encrypted_steps"` // encrypted Steps map
StepsCompressionAlg string `json:"-" db:"steps_compression_alg"` // compression algorithm used

BaseConfigurations map[string]json.RawMessage `json:"base_configurations" db:"base_configurations"`
}
Expand Down Expand Up @@ -145,7 +147,18 @@ func Create(dbp zesty.DBProvider, t *task.Task, resolverInputs map[string]interf
if err != nil {
return nil, err
}
r.EncryptedSteps = []byte(encrSteps)

c, err := compress.Get(utask.StepsCompressionAlg)
if err != nil {
return nil, err
}

r.StepsCompressionAlg = utask.StepsCompressionAlg

r.EncryptedSteps, err = c.Compress([]byte(encrSteps))
if err != nil {
return nil, err
}

err = tt.ValidateResolverInputs(resolverInputs)
if err != nil {
Expand Down Expand Up @@ -221,8 +234,18 @@ func load(dbp zesty.DBProvider, publicID string, locked bool, lockNoWait bool) (

r.Values = values.NewValues()

c, err := compress.Get(r.StepsCompressionAlg)
if err != nil {
return nil, err
}

encryptSteps, err := c.Decompress(r.EncryptedSteps)
if err != nil {
return nil, err
}

st := make(map[string]*step.Step)
err = models.EncryptionKey.DecryptMarshal(string(r.EncryptedSteps), &st, []byte(r.PublicID))
err = models.EncryptionKey.DecryptMarshal(string(encryptSteps), &st, []byte(r.PublicID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -331,7 +354,18 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) {
if err != nil {
return err
}
r.EncryptedSteps = []byte(encrSteps)

c, err := compress.Get(r.StepsCompressionAlg)
if err != nil {
return err
}

compressedSteps, err := c.Compress([]byte(encrSteps))
if err != nil {
return err
}

r.EncryptedSteps = compressedSteps

encrInput, err := models.EncryptionKey.EncryptMarshal(r.ResolverInput, []byte(r.PublicID))
if err != nil {
Expand Down Expand Up @@ -514,7 +548,7 @@ func (r *Resolution) SetInput(input map[string]interface{}) {
}

var rSelector = sqlgenerator.PGsql.Select(
`"resolution".id, "resolution".public_id, "resolution".id_task, "resolution".resolver_username, "resolution".state, "resolution".instance_id, "resolution".created, "resolution".last_start, "resolution".last_stop, "resolution".next_retry, "resolution".run_count, "resolution".run_max, "resolution".crypt_key, "resolution".encrypted_steps, "resolution".encrypted_resolver_input, "resolution".base_configurations, "task".public_id as task_public_id, "task".title as task_title`,
`"resolution".id, "resolution".public_id, "resolution".id_task, "resolution".resolver_username, "resolution".state, "resolution".instance_id, "resolution".created, "resolution".last_start, "resolution".last_stop, "resolution".next_retry, "resolution".run_count, "resolution".run_max, "resolution".crypt_key, "resolution".encrypted_steps, "resolution".steps_compression_alg, "resolution".encrypted_resolver_input, "resolution".base_configurations, "task".public_id as task_public_id, "task".title as task_title`,
).From(
`"resolution"`,
).OrderBy(
Expand Down
42 changes: 42 additions & 0 deletions pkg/compress/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package compress

import (
"fmt"
"sync"
)

var (
compressions = map[string]Compression{}
compressionsMut sync.Mutex
)

type Compression interface {
Compress([]byte) ([]byte, error)
Decompress([]byte) ([]byte, error)
}

// RegisterAlgorithm registers a custom compression algorithm.
func RegisterAlgorithm(name string, c Compression) error {
if c == nil {
return nil
}
compressionsMut.Lock()
defer compressionsMut.Unlock()
_, found := compressions[name]
if found {
return fmt.Errorf("conflicting compression key compressions: %s", name)
}
compressions[name] = c
return nil
}

func Get(name string) (Compression, error) {
compressionsMut.Lock()
defer compressionsMut.Unlock()

c, ok := compressions[name]
if !ok {
return nil, fmt.Errorf("%s compression algorithm not found", name)
}
return c, nil
}
44 changes: 44 additions & 0 deletions pkg/compress/gzip/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package gzip

import (
"bytes"
"compress/gzip"
"io"

"github.com/ovh/utask/pkg/compress"
)

const AlgorithmName = "gzip"

type gzipCompression struct{}

// New returns a new compress.Compression with gzip as the compression algorithm.
func New() compress.Compression {
return &gzipCompression{}
}

// Compress transforms data into a compressed form.
func (c *gzipCompression) Compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)

if _, err := zw.Write(data); err != nil {
_ = zw.Close()
return nil, err
}

// Need to close the gzip writer before accessing the underlying bytes
_ = zw.Close()
return buf.Bytes(), nil
}

// Decompress transforms compressed form into an uncompressed form.
func (c *gzipCompression) Decompress(data []byte) ([]byte, error) {
zr, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer func() { _ = zr.Close() }()

return io.ReadAll(zr)
}
12 changes: 12 additions & 0 deletions pkg/compress/gzip/gzip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package gzip_test

import (
"testing"

"github.com/ovh/utask/pkg/compress/gzip"
"github.com/ovh/utask/pkg/compress/tests"
)

func TestCompression(t *testing.T) {
tests.CompressionTests(t, gzip.New())
}
24 changes: 24 additions & 0 deletions pkg/compress/init/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package init

import (
"github.com/ovh/utask/pkg/compress"
"github.com/ovh/utask/pkg/compress/gzip"
"github.com/ovh/utask/pkg/compress/noop"
)

// Register registers default compression algorithms.
func Register() error {
noopCompress := noop.New()

for name, c := range map[string]compress.Compression{
"": noopCompress, // to ensure backwards compatibility
noop.AlgorithmName: noopCompress,
gzip.AlgorithmName: gzip.New(),
} {
if err := compress.RegisterAlgorithm(name, c); err != nil {
return err
}
}

return nil
}
20 changes: 20 additions & 0 deletions pkg/compress/noop/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package noop

import "github.com/ovh/utask/pkg/compress"

const AlgorithmName = "noop"

type noneCompression struct{}

// New returns a new compress.Compression with no compression algorithm.
func New() compress.Compression {
return &noneCompression{}
}

func (c *noneCompression) Compress(data []byte) ([]byte, error) {
return data, nil
}

func (c *noneCompression) Decompress(data []byte) ([]byte, error) {
return data, nil
}
12 changes: 12 additions & 0 deletions pkg/compress/noop/noop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package noop_test

import (
"testing"

"github.com/ovh/utask/pkg/compress/noop"
"github.com/ovh/utask/pkg/compress/tests"
)

func TestCompression(t *testing.T) {
tests.CompressionTests(t, noop.New())
}
33 changes: 33 additions & 0 deletions pkg/compress/tests/tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package tests

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ovh/utask/pkg/compress"
)

// CompressionTests executes a range of tests to test a `Compression` module.
func CompressionTests(t *testing.T, c compress.Compression) {
tests := []struct {
name string
want string
}{
{name: "Hello world", want: "Hello world!"},
{name: "Empty string", want: " "},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := c.Compress([]byte(tt.want))
require.Nilf(t, err, "Compress(): %s", err)

got, err = c.Decompress(got)

require.Nilf(t, err, "Decompress(): %s", err)
assert.Equal(t, tt.want, string(got))
})
}
}
8 changes: 7 additions & 1 deletion pkg/plugins/builtin/callback/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/gin-gonic/gin"
"github.com/loopfz/gadgeto/zesty"
"github.com/ovh/configstore"
"github.com/stretchr/testify/assert"

"github.com/ovh/utask"
"github.com/ovh/utask/api"
"github.com/ovh/utask/db"
Expand All @@ -21,9 +23,9 @@ import (
"github.com/ovh/utask/models/resolution"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
compress "github.com/ovh/utask/pkg/compress/init"
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/plugins"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
Expand All @@ -45,6 +47,10 @@ func TestMain(m *testing.M) {
panic(err)
}

if err := compress.Register(); err != nil {
panic(err)
}

var wg sync.WaitGroup

if err := engine.Init(context.Background(), &wg, store); err != nil {
Expand Down
Loading

0 comments on commit bf23fbb

Please sign in to comment.