Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orchestrator #19

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.22
go-version: 1.23

- name: Build
run: go build -v ./...
Expand Down
23 changes: 23 additions & 0 deletions orchestrator/executors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package orchestrator

import (
"fmt"
"time"
)

type execFn func() error

// ExecuteWithTimeout - executes a function with a timeout
func ExecuteWithTimeout(fn execFn, name string, timeOutSeconds int) error {
res := make(chan error, 1)
go func() {
res <- fn()
}()

select {
case <-time.After(time.Duration(timeOutSeconds) * time.Second):
return fmt.Errorf("timeout occurred after %d seconds, while executing function %q", timeOutSeconds, name)
case result := <-res:
return result
}
}
66 changes: 66 additions & 0 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package orchestrator

import (
"errors"
"fmt"
)

// Orchestrator - struct that will contain an array of steps to be executed
type Orchestrator struct {
steps []*Step
}

// NewOrchestrator - creates an Orchestrator
func NewOrchestrator(opts ...BuildOrchestratorOption) *Orchestrator {
o := &Orchestrator{}

// process each option
for _, opt := range opts {
opt(o)
}

// fall back
if o.steps == nil {
o.steps = []*Step{}
}
return o
}

type BuildOrchestratorOption func(*Orchestrator)

// AddStep - adds a new execution step
func (o *Orchestrator) AddStep(step *Step) {
if step == nil {
return
}

if step.stepFn == nil {
panic(errors.New("execFn cannot be nil"))
}

o.steps = append(o.steps, step)
}

// Run - starts running all the steps in order
func (o *Orchestrator) Run() error {
var errStepIdx = 0
var lastErr error = nil

// executing steps one by one
for idx, s := range o.steps {
if err := s.ExecStep(); err != nil {
lastErr = fmt.Errorf("error executing step %q: %w", s.name, err)
errStepIdx = idx
break
}
}

// if there was an error, we need to rollback previous steps
for idx := errStepIdx; idx >= 0; idx-- {
if err := o.steps[idx].ExecRollback(); err != nil {
return errors.Join(lastErr, fmt.Errorf("error rolling back step %q: %w", o.steps[idx].name, err))
}
}

return lastErr
}
114 changes: 114 additions & 0 deletions orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package orchestrator_test

import (
"fmt"
"github.com/MickStanciu/go-fn/orchestrator"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

func TestOrchestrator_StartExecution_ShouldExecuteTheStepsWithoutRollback(t *testing.T) {
o := orchestrator.NewOrchestrator()

step1 := func() error {
fmt.Println("executing step 1")
return nil
}

step2 := func() error {
fmt.Println("executing step 2")
return nil
}

step2RollBack := func() error {
fmt.Println("executing rollback for step 2 ")
return nil
}

o.AddStep(orchestrator.NewStep("step 1", step1))
o.AddStep(orchestrator.NewStep("step 2", step2, orchestrator.WithRollbackStepFn(step2RollBack)))
err := o.Run()
require.NoError(t, err)
}

func TestOrchestrator_StartExecution_ShouldExecuteTheStepsWithRollback(t *testing.T) {
o := orchestrator.NewOrchestrator()

step1 := func() error {
fmt.Println("executing step 1")
return nil
}

step2 := func() error {
fmt.Println("executing step 2")
return nil
}

step2RollBack := func() error {
fmt.Println("executing rollback for step 2 ")
return nil
}

step3 := func() error {
fmt.Println("executing step 3")
return fmt.Errorf("error in step3")
}

o.AddStep(orchestrator.NewStep("step 1", step1))
o.AddStep(orchestrator.NewStep("step 2", step2, orchestrator.WithRollbackStepFn(step2RollBack)))
o.AddStep(orchestrator.NewStep("step 3", step3))
err := o.Run()
require.Error(t, err)
assert.Equal(t, err.Error(), `error executing step "step 3": error in step3`)
}

func TestOrchestrator_StartExecution_WhenRollbackFails(t *testing.T) {
o := orchestrator.NewOrchestrator()

step1 := func() error {
fmt.Println("executing step 1")
return nil
}

step1RollBack := func() error {
return fmt.Errorf("error rollback in step1")
}

step2 := func() error {
fmt.Println("executing step 2")
return nil
}

step2RollBack := func() error {
fmt.Println("executing rollback for step 2 ")
return nil
}

step3 := func() error {
fmt.Println("executing step 3")
return fmt.Errorf("error in step3")
}

o.AddStep(orchestrator.NewStep("step 1", step1, orchestrator.WithRollbackStepFn(step1RollBack)))
o.AddStep(orchestrator.NewStep("step 2", step2, orchestrator.WithRollbackStepFn(step2RollBack)))
o.AddStep(orchestrator.NewStep("step 3", step3))
err := o.Run()
require.Error(t, err)
assert.Equal(t, err.Error(), "error executing step \"step 3\": error in step3\nerror rolling back step \"step 1\": error rollback in step1")
}

// Test disabled because of the execution time
//func TestOrchestrator_StartExecution_WithTimeOut(t *testing.T) {
// o := orchestrator.NewOrchestrator()
//
// step1 := func() error {
// fmt.Println("executing step 1")
// time.Sleep(3 * time.Second)
// return nil
// }
//
// o.AddStep(orchestrator.NewStep("step 1", step1, orchestrator.WithTimeout(2)))
// err := o.Run()
// require.NoError(t, err)
//}
68 changes: 68 additions & 0 deletions orchestrator/step.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package orchestrator

// Step - struct that will contain instructions of Step execution and rollback
type Step struct {
name string
stepFn execFn
rollbackStepFn execFn
//retryStrategy - nice to have
timeOutSeconds int
}

type BuildStepOption func(*Step)

// WithTimeout - adds a timeout to the function to be executed
func WithTimeout(timeOutSeconds int) BuildStepOption {
return func(s *Step) {
s.timeOutSeconds = timeOutSeconds
}
}

// WithRollbackStepFn - adds a rollback function
func WithRollbackStepFn(fn execFn) BuildStepOption {
return func(s *Step) {
if fn != nil {
s.rollbackStepFn = fn
}
}
}

// NewStep - builds a new step
func NewStep(name string, fn execFn, opts ...BuildStepOption) *Step {
s := &Step{
name: name,
stepFn: fn,
timeOutSeconds: 0,
}

// process each option
for _, opt := range opts {
opt(s)
}

return s
}

// ExecStep - executes the step
func (s *Step) ExecStep() error {
if s.timeOutSeconds == 0 {
return s.stepFn()
}

// executing with timeout
return ExecuteWithTimeout(s.stepFn, s.name, s.timeOutSeconds)
}

// ExecRollback - executes the rollback step
func (s *Step) ExecRollback() error {
if s.rollbackStepFn == nil {
return nil
}

if s.timeOutSeconds == 0 {
return s.rollbackStepFn()
}

// executing with timeout
return ExecuteWithTimeout(s.rollbackStepFn, s.name, s.timeOutSeconds)
}
Loading