Skip to content

Commit

Permalink
Merge branch 'dev' into refactor/hash-string-to-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
krhubert committed Aug 30, 2019
2 parents 2fe1dbc + 888e2c0 commit dae744c
Show file tree
Hide file tree
Showing 16 changed files with 989 additions and 600 deletions.
14 changes: 8 additions & 6 deletions database/store/cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,27 @@ func (s *CosmosStore) Close() error {

// CosmosIterator is a Cosmos KVStore's iterator implementation of Iterator.
type CosmosIterator struct {
iter types.Iterator
err error
iter types.Iterator
err error
valid bool // HACK for next function. Iterator of cosmos already contains the first element at its creation.
}

// NewCosmosIterator returns a new Cosmos KVStore Iterator wrapper.
func NewCosmosIterator(iter types.Iterator) *CosmosIterator {
return &CosmosIterator{
iter: iter,
iter: iter,
valid: false,
}
}

// Next moves the iterator to the next sequential key in the store.
func (i *CosmosIterator) Next() bool {
defer i.handleError()
valid := i.iter.Valid()
if valid {
if i.valid {
i.iter.Next()
}
return valid
i.valid = i.iter.Valid()
return i.valid
}

// Key returns the key of the cursor.
Expand Down
65 changes: 28 additions & 37 deletions protobuf/api/workflow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion protobuf/api/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ service Workflow {
// The request's data for the `Create` API.
message CreateWorkflowRequest {
string key = 2; // Workflow's key
types.Workflow.Trigger trigger = 3; // Trigger for the workflow.
repeated types.Workflow.Node nodes = 4; // List of nodes of the workflow.
repeated types.Workflow.Edge edges = 5; // List of edges of the workflow.
}
Expand Down
159 changes: 93 additions & 66 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
executionsdk "github.com/mesg-foundation/engine/sdk/execution"
workflowsdk "github.com/mesg-foundation/engine/sdk/workflow"
"github.com/mesg-foundation/engine/workflow"
"github.com/sirupsen/logrus"
)

// Scheduler manages the executions based on the definition of the workflows
Expand Down Expand Up @@ -47,111 +48,125 @@ func (s *Scheduler) Start() error {
for {
select {
case event := <-s.eventStream.C:
go s.processTriggerFromEvent(event)
go s.process(s.eventFilter(event), nil, event, event.Data)
case execution := <-s.executionStream.C:
go s.processTriggerFromResult(execution)
go s.processExecution(execution)
go s.process(s.resultFilter(execution), execution, nil, execution.Outputs)
go s.process(s.dependencyFilter(execution), execution, nil, execution.Outputs)
}
}
}

func (s *Scheduler) processTriggerFromEvent(event *event.Event) {
workflows, err := s.workflowsMatchingFilter(func(wf *workflow.Workflow) bool {
return wf.Trigger.InstanceHash.Equal(event.InstanceHash) &&
wf.Trigger.EventKey == event.Key &&
wf.Trigger.Filters.Match(event.Data)
})
if err != nil {
s.ErrC <- err
return
}
for _, wf := range workflows {
nextStep, err := wf.FindNode(wf.Trigger.NodeKey)
if err != nil {
s.ErrC <- err
continue
func (s *Scheduler) eventFilter(event *event.Event) func(wf *workflow.Workflow, node workflow.Node) (bool, error) {
return func(wf *workflow.Workflow, node workflow.Node) (bool, error) {
switch n := node.(type) {
case *workflow.Event:
return n.InstanceHash.Equal(event.InstanceHash) && n.EventKey == event.Key, nil
default:
return false, nil
}
if _, err := s.execution.Execute(wf.Hash, nextStep.InstanceHash, event.Hash, nil, wf.Trigger.NodeKey, nextStep.TaskKey, event.Data, []string{}); err != nil {
s.ErrC <- err
}
}

func (s *Scheduler) resultFilter(exec *execution.Execution) func(wf *workflow.Workflow, node workflow.Node) (bool, error) {
return func(wf *workflow.Workflow, node workflow.Node) (bool, error) {
switch n := node.(type) {
case *workflow.Result:
return n.InstanceHash.Equal(exec.InstanceHash) && n.TaskKey == exec.TaskKey, nil
default:
return false, nil
}
}
}

func (s *Scheduler) processTriggerFromResult(result *execution.Execution) {
workflows, err := s.workflowsMatchingFilter(func(wf *workflow.Workflow) bool {
return wf.Trigger.InstanceHash.Equal(result.InstanceHash) &&
wf.Trigger.TaskKey == result.TaskKey &&
wf.Trigger.Filters.Match(result.Outputs)
})
if err != nil {
s.ErrC <- err
return
func (s *Scheduler) dependencyFilter(exec *execution.Execution) func(wf *workflow.Workflow, node workflow.Node) (bool, error) {
return func(wf *workflow.Workflow, node workflow.Node) (bool, error) {
if !exec.WorkflowHash.Equal(wf.Hash) {
return false, nil
}
parents := wf.ParentIDs(node.ID())
if len(parents) == 0 {
return false, nil
}
if len(parents) > 1 {
return false, fmt.Errorf("multi parents not supported")
}
return parents[0] == exec.StepID, nil
}
for _, wf := range workflows {
nextStep, err := wf.FindNode(wf.Trigger.NodeKey)
}

func (s *Scheduler) findNodes(wf *workflow.Workflow, filter func(wf *workflow.Workflow, n workflow.Node) (bool, error)) []workflow.Node {
return wf.FindNodes(func(n workflow.Node) bool {
res, err := filter(wf, n)
if err != nil {
s.ErrC <- err
continue
}
if _, err := s.execution.Execute(wf.Hash, nextStep.InstanceHash, nil, result.Hash, wf.Trigger.NodeKey, nextStep.TaskKey, result.Outputs, []string{}); err != nil {
s.ErrC <- err
}
}
return res
})
}

func (s *Scheduler) workflowsMatchingFilter(filter func(wf *workflow.Workflow) bool) ([]*workflow.Workflow, error) {
func (s *Scheduler) process(filter func(wf *workflow.Workflow, node workflow.Node) (bool, error), exec *execution.Execution, event *event.Event, data map[string]interface{}) {
workflows, err := s.workflow.List()
if err != nil {
return nil, err
s.ErrC <- err
return
}
wfs := make([]*workflow.Workflow, 0)
for _, wf := range workflows {
if filter(wf) {
wfs = append(wfs, wf)
for _, node := range s.findNodes(wf, filter) {
if err := s.processNode(wf, node, exec, event, data); err != nil {
s.ErrC <- err
}
}
}
return wfs, nil
}

func (s *Scheduler) processExecution(exec *execution.Execution) {
if exec.WorkflowHash.IsZero() {
return
}
wf, err := s.workflow.Get(exec.WorkflowHash)
if err != nil {
s.ErrC <- err
return
}
for _, edge := range wf.EdgesFrom(exec.StepID) {
inputs, err := s.mapInputs(wf.Hash, exec, edge)
func (s *Scheduler) processNode(wf *workflow.Workflow, n workflow.Node, exec *execution.Execution, event *event.Event, data map[string]interface{}) error {
logrus.WithField("module", "orchestrator").WithField("nodeID", n.ID()).WithField("type", fmt.Sprintf("%T", n)).Debug("process workflow")
if node, ok := n.(*workflow.Task); ok {
// This returns directly because a task cannot process its children.
// Children will be processed only when the execution is done and the dependencies are resolved
return s.processTask(node, wf, exec, event, data)
}
if node, ok := n.(*workflow.Map); ok {
var err error
data, err = s.processMap(node, wf, exec, data)
if err != nil {
s.ErrC <- err
return
return err
}
nextStep, err := wf.FindNode(edge.Dst)
}
for _, childrenID := range wf.ChildrenIDs(n.ID()) {
children, err := wf.FindNode(childrenID)
if err != nil {
// does not return an error to continue to process other tasks if needed
s.ErrC <- err
continue
}
if _, err := s.execution.Execute(wf.Hash, nextStep.InstanceHash, nil, exec.Hash, edge.Dst, nextStep.TaskKey, inputs, []string{}); err != nil {
if err := s.processNode(wf, children, exec, event, data); err != nil {
// does not return an error to continue to process other tasks if needed
s.ErrC <- err
}
}
return nil
}

func (s *Scheduler) mapInputs(wfHash hash.Hash, prevExec *execution.Execution, edge workflow.Edge) (map[string]interface{}, error) {
if len(edge.Inputs) == 0 {
return prevExec.Outputs, nil
}
inputs := make(map[string]interface{})
for _, input := range edge.Inputs {
value, err := s.resolveInput(wfHash, prevExec, input.Ref.NodeKey, input.Ref.Key)
func (s *Scheduler) processMap(mapping *workflow.Map, wf *workflow.Workflow, exec *execution.Execution, data map[string]interface{}) (map[string]interface{}, error) {
result := make(map[string]interface{})
for _, output := range mapping.Outputs {
node, err := wf.FindNode(output.Ref.NodeKey)
if err != nil {
return nil, err
}
inputs[input.Key] = value
_, isTask := node.(*workflow.Task)
if isTask {
value, err := s.resolveInput(wf.Hash, exec, output.Ref.NodeKey, output.Ref.Key)
if err != nil {
return nil, err
}
result[output.Key] = value
} else {
result[output.Key] = data[output.Ref.Key]
}
}
return inputs, nil
return result, nil
}

func (s *Scheduler) resolveInput(wfHash hash.Hash, exec *execution.Execution, nodeKey string, outputKey string) (interface{}, error) {
Expand All @@ -167,3 +182,15 @@ func (s *Scheduler) resolveInput(wfHash hash.Hash, exec *execution.Execution, no
}
return exec.Outputs[outputKey], nil
}

func (s *Scheduler) processTask(task *workflow.Task, wf *workflow.Workflow, exec *execution.Execution, event *event.Event, data map[string]interface{}) error {
var eventHash, execHash hash.Hash
if event != nil {
eventHash = event.Hash
}
if exec != nil {
execHash = exec.Hash
}
_, err := s.execution.Execute(wf.Hash, task.InstanceHash, eventHash, execHash, task.ID(), task.TaskKey, data, nil)
return err
}
18 changes: 13 additions & 5 deletions sdk/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ func New(instance *instancesdk.Instance, workflowDB database.WorkflowDB) *Workfl
func (w *Workflow) Create(wf *workflow.Workflow) (*workflow.Workflow, error) {
wf.Hash = hash.Dump(wf)

if _, err := w.instance.Get(wf.Trigger.InstanceHash); err != nil {
return nil, err
}
for _, node := range wf.Nodes {
if _, err := w.instance.Get(node.InstanceHash); err != nil {
return nil, err
switch n := node.(type) {
case workflow.Result:
if _, err := w.instance.Get(n.InstanceHash); err != nil {
return nil, err
}
case workflow.Event:
if _, err := w.instance.Get(n.InstanceHash); err != nil {
return nil, err
}
case workflow.Task:
if _, err := w.instance.Get(n.InstanceHash); err != nil {
return nil, err
}
}
}

Expand Down
Loading

0 comments on commit dae744c

Please sign in to comment.