Skip to content

Commit

Permalink
wip: execution plan and workflow driver
Browse files Browse the repository at this point in the history
Signed-off-by: Carolyn Van Slyck <me@carolynvanslyck.com>
  • Loading branch information
carolynvs committed May 31, 2022
1 parent 8559078 commit a0c3603
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ replace (

require (
get.porter.sh/magefiles v0.1.3
github.com/Masterminds/semver v1.5.0
github.com/Masterminds/semver/v3 v3.1.1
github.com/PaesslerAG/jsonpath v0.1.1
github.com/carolynvs/aferox v0.3.0
Expand Down Expand Up @@ -89,6 +88,7 @@ require (
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/PaesslerAG/gval v1.0.0 // indirect
github.com/PuerkitoBio/goquery v1.5.0 // indirect
Expand Down
34 changes: 29 additions & 5 deletions pkg/workflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (

// Engine handles executing a workflow of bundles to execute.
type Engine struct {
driver WorkflowDriver
resolver DependencyResolver
rootInstallation storage.Installation
}

// TODO: do we need both a dep graph made up of just bundles (i.e. the unresolved representation) and other with everything resolved (execution plan half filled out)?
func (t Engine) GetDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle) (*BundleGraph, error) {
g := NewBundleGraph()

Expand All @@ -26,11 +28,11 @@ func (t Engine) GetDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle)
Reference: cnab.BundleReference{Definition: bun},
}

err := t.AddBundleToGraph(ctx, g, root)
err := t.addBundleToGraph(ctx, g, root)
return g, err
}

func (t Engine) AddBundleToGraph(ctx context.Context, g *BundleGraph, node BundleNode) error {
func (t Engine) addBundleToGraph(ctx context.Context, g *BundleGraph, node BundleNode) error {
if exists := g.RegisterNode(node); exists {
// We have already processed this bundle, return to avoid an infinite loop
return nil
Expand All @@ -50,7 +52,7 @@ func (t Engine) AddBundleToGraph(ctx context.Context, g *BundleGraph, node Bundl
for depName, dep := range deps.Requires {
depKey := fmt.Sprintf("%s.%s", node.Key, depName)

resolved, err := t.ResolveDependency(ctx, depKey, dep)
resolved, err := t.resolveDependency(ctx, depKey, dep)
if err != nil {
return err
}
Expand Down Expand Up @@ -81,13 +83,13 @@ func (t Engine) AddBundleToGraph(ctx context.Context, g *BundleGraph, node Bundl
for _, source := range dep.Credentials {
requireOutput(source)
}
t.AddBundleToGraph(ctx, g, depNode)
t.addBundleToGraph(ctx, g, depNode)
}

return nil
}

func (t Engine) ResolveDependency(ctx context.Context, name string, dep depsv2.Dependency) (Node, error) {
func (t Engine) resolveDependency(ctx context.Context, name string, dep depsv2.Dependency) (Node, error) {
unresolved := Dependency{Key: name}
if dep.Bundle != "" {
ref, err := cnab.ParseOCIReference(dep.Bundle)
Expand Down Expand Up @@ -143,3 +145,25 @@ func (t Engine) ResolveDependency(ctx context.Context, name string, dep depsv2.D

return depNode, nil
}

func (t Engine) BuildExecutionPlan(ctx context.Context, g *BundleGraph) (ExecutionPlan, error) {
nodes, ok := g.Sort()
if !ok {
return ExecutionPlan{}, fmt.Errorf("could not generate an execution plan, the bundle graph has a cyle")
}

opts := ExecutionOptions{}
return NewExecutionPlan(nodes, opts), nil
}

func (t Engine) Execute(ctx context.Context, plan ExecutionPlan) error {
// TODO: for a workflow managed by something external, do we need porter to run the entire time? Can we add a task at the end to update the installation status?
w, err := t.driver.CreateWorkflow(ctx, plan)
if err != nil {
return err
}

if err = t.driver.StartWorkflow(ctx, w); err != nil {
return err
}
}
67 changes: 67 additions & 0 deletions pkg/workflow/execution_plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package workflow

// ExecutionPlan outlines the set of tasks required to execute a bundle
// and indicates when tasks may run in parallel.
type ExecutionPlan struct {
// Ordered list of tasks
Tasks TaskSet

// debugMode indicates that Porter is going to step through the workflow a task at a time
// This indicates that the workflow driver should generate a workflow definition that supports debugging.
DebugMode bool
}

type ExecutionOptions struct {
// DebugMode indicates that Porter is going to step through the workflow a task at a time
// This indicates that the workflow driver should generate a workflow definition that supports debugging.
DebugMode bool
}

func NewExecutionPlan(nodes []Node, opts ExecutionOptions) ExecutionPlan {
return ExecutionPlan{
Tasks: nil,
DebugMode: opts.DebugMode,
}
}

// TaskList is an ordered list of tasks.
type TaskList []Task

// TaskSet contains groups of tasks that can be run in parallel.
type TaskSet []TaskList

type Task struct {
// Name of the task. Used to refer to a task output
Name string

// InstallerType defines the type of the installer: docker image, webassembly module, etc.
InstallerType string

// InstallerReference fully qualified reference to the definition of the installer.
InstallerReference string

// Inputs given to the task
Inputs []TaskInput

// Outputs that were generated by the task
Outputs map[string]TaskOutput
}

type TaskInput struct {
// Env is the name of the environment variable to inject
Env string

// Path is the full path of the file to inject
Path string

// Contents of the input value.
Contents string

// Source where the contents can be resolved. Guaranteed that the source is resolvable when the task is run.
Source string
}

type TaskOutput struct {
// Path is the full path of the file to collect.
Path string
}
24 changes: 24 additions & 0 deletions pkg/workflow/workflow_driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package workflow

import "context"

// WorkflowDriver is how Porter interacts with workflow drivers, e.g. argo, cadence, etc.
type WorkflowDriver interface {
// CreateWorkflow converts the ExecutionPlan into a definition that the driver understands.
CreateWorkflow(ctx context.Context, plan ExecutionPlan) (WorkflowDefinition, error)

// StartWorkflow begins the specified workflow.
StartWorkflow(ctx context.Context, workflow WorkflowDefinition) error

// CancelWorkflow stops the specified workflow.
CancelWorkflow(ctx context.Context, workflow WorkflowDefinition) error

// RetryWorkflow starts the workflow over at the last failed job(s).
RetryWorkflow(ctx context.Context, workflow WorkflowDefinition) error

// StepThrough runs only the specified task in the workflow, pausing afterwards so that the workflow can be debugged.
StepThrough(ctx context.Context, workflow WorkflowDefinition, taskName string) error
}

// WorkflowDefinition is the representation of the ExecutionPlan against a specific workflow driver.
type WorkflowDefinition map[string]interface{}

0 comments on commit a0c3603

Please sign in to comment.