Skip to content

Commit

Permalink
Merge pull request #1204 from mesg-foundation/feature/workflow-ressource
Browse files Browse the repository at this point in the history
Move workflow to its own database/api
  • Loading branch information
antho1404 authored Aug 9, 2019
2 parents 8934dfb + 665ef5c commit 8236363
Show file tree
Hide file tree
Showing 29 changed files with 1,708 additions and 819 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
serviceDBVersion = "v3"
executionDBVersion = "v2"
instanceDBVersion = "v1"
workflowDBVersion = "v1"
)

var (
Expand All @@ -44,6 +45,7 @@ type Config struct {
ServiceRelativePath string
InstanceRelativePath string
ExecutionRelativePath string
WorkflowRelativePath string
}

SystemServices []*ServiceConfig
Expand All @@ -66,6 +68,7 @@ func New() (*Config, error) {
c.Database.ServiceRelativePath = filepath.Join("database", "services", serviceDBVersion)
c.Database.InstanceRelativePath = filepath.Join("database", "instance", instanceDBVersion)
c.Database.ExecutionRelativePath = filepath.Join("database", "executions", executionDBVersion)
c.Database.WorkflowRelativePath = filepath.Join("database", "workflows", workflowDBVersion)
c.setupServices()
return &c, nil
}
Expand Down
14 changes: 11 additions & 3 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/mesg-foundation/engine/config"
"github.com/mesg-foundation/engine/container"
"github.com/mesg-foundation/engine/database"
"github.com/mesg-foundation/engine/engine"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/logger"
"github.com/mesg-foundation/engine/sdk"
instancesdk "github.com/mesg-foundation/engine/sdk/instance"
servicesdk "github.com/mesg-foundation/engine/sdk/service"
"github.com/mesg-foundation/engine/server/grpc"
"github.com/mesg-foundation/engine/version"
"github.com/mesg-foundation/engine/workflow"
"github.com/mesg-foundation/engine/x/xerrors"
"github.com/mesg-foundation/engine/x/xnet"
"github.com/mesg-foundation/engine/x/xos"
Expand All @@ -27,6 +27,7 @@ type dependencies struct {
config *config.Config
serviceDB database.ServiceDB
executionDB database.ExecutionDB
workflowDB database.WorkflowDB
container container.Container
sdk *sdk.SDK
}
Expand Down Expand Up @@ -56,6 +57,12 @@ func initDependencies() (*dependencies, error) {
return nil, err
}

// init workflow db.
workflowDB, err := database.NewWorkflowDB(filepath.Join(config.Path, config.Database.WorkflowRelativePath))
if err != nil {
return nil, err
}

// init container.
c, err := container.New(config.Name)
if err != nil {
Expand All @@ -65,13 +72,14 @@ func initDependencies() (*dependencies, error) {
_, port, _ := xnet.SplitHostPort(config.Server.Address)

// init sdk.
sdk := sdk.New(c, serviceDB, instanceDB, executionDB, config.Name, strconv.Itoa(port))
sdk := sdk.New(c, serviceDB, instanceDB, executionDB, workflowDB, config.Name, strconv.Itoa(port))

return &dependencies{
config: config,
container: c,
serviceDB: serviceDB,
executionDB: executionDB,
workflowDB: workflowDB,
sdk: sdk,
}, nil
}
Expand Down Expand Up @@ -165,7 +173,7 @@ func main() {
}()

logrus.Info("starting workflow engine")
wf := workflow.New(dep.sdk.Event, dep.sdk.Execution, dep.sdk.Service)
wf := engine.New(dep.sdk.Event, dep.sdk.Execution, dep.sdk.Workflow)
go func() {
if err := wf.Start(); err != nil {
logrus.Fatalln(err)
Expand Down
130 changes: 130 additions & 0 deletions database/workflow_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package database

import (
"encoding/json"
"fmt"

"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/workflow"
"github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
)

// WorkflowDB describes the API of database package.
type WorkflowDB interface {
// Save saves a workflow to database.
Save(s *workflow.Workflow) error

// Get gets a workflow from database by its unique hash.
Get(hash hash.Hash) (*workflow.Workflow, error)

// Delete deletes a workflow from database by its unique hash.
Delete(hash hash.Hash) error

// All returns all workflows from database.
All() ([]*workflow.Workflow, error)

// Close closes underlying database connection.
Close() error
}

// LevelDBWorkflowDB is a database for storing workflows definition.
type LevelDBWorkflowDB struct {
db *leveldb.DB
}

// NewWorkflowDB returns the database which is located under given path.
func NewWorkflowDB(path string) (*LevelDBWorkflowDB, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}
return &LevelDBWorkflowDB{db: db}, nil
}

// marshal returns the byte slice from workflow.
func (d *LevelDBWorkflowDB) marshal(s *workflow.Workflow) ([]byte, error) {
return json.Marshal(s)
}

// unmarshal returns the workflow from byte slice.
func (d *LevelDBWorkflowDB) unmarshal(hash hash.Hash, value []byte) (*workflow.Workflow, error) {
var s workflow.Workflow
if err := json.Unmarshal(value, &s); err != nil {
return nil, fmt.Errorf("database: could not decode workflow %q: %s", hash, err)
}
return &s, nil
}

// All returns every workflow in database.
func (d *LevelDBWorkflowDB) All() ([]*workflow.Workflow, error) {
var (
workflows []*workflow.Workflow
iter = d.db.NewIterator(nil, nil)
)
for iter.Next() {
hash := hash.Hash(iter.Key())
s, err := d.unmarshal(hash, iter.Value())
if err != nil {
// NOTE: Ignore all decode errors (possibly due to a workflow
// structure change or database corruption)
logrus.WithField("workflow", hash.String()).Warning(err.Error())
continue
}
workflows = append(workflows, s)
}
iter.Release()
return workflows, iter.Error()
}

// Delete deletes workflow from database.
func (d *LevelDBWorkflowDB) Delete(hash hash.Hash) error {
tx, err := d.db.OpenTransaction()
if err != nil {
return err
}
if _, err := tx.Get(hash, nil); err != nil {
tx.Discard()
if err == leveldb.ErrNotFound {
return &ErrNotFound{resource: "workflow", hash: hash}
}
return err
}
if err := tx.Delete(hash, nil); err != nil {
tx.Discard()
return err
}
return tx.Commit()

}

// Get retrives workflow from database.
func (d *LevelDBWorkflowDB) Get(hash hash.Hash) (*workflow.Workflow, error) {
b, err := d.db.Get(hash, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, &ErrNotFound{resource: "workflow", hash: hash}
}
return nil, err
}
return d.unmarshal(hash, b)
}

// Save stores workflow in database.
// If there is an another workflow that uses the same sid, it'll be deleted.
func (d *LevelDBWorkflowDB) Save(s *workflow.Workflow) error {
if s.Hash.IsZero() {
return errCannotSaveWithoutHash
}

b, err := d.marshal(s)
if err != nil {
return err
}
return d.db.Put(s.Hash, b, nil)
}

// Close closes database.
func (d *LevelDBWorkflowDB) Close() error {
return d.db.Close()
}
127 changes: 127 additions & 0 deletions engine/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package engine

import (
"fmt"

"github.com/mesg-foundation/engine/execution"
"github.com/mesg-foundation/engine/hash"
eventsdk "github.com/mesg-foundation/engine/sdk/event"
executionsdk "github.com/mesg-foundation/engine/sdk/execution"
workflowsdk "github.com/mesg-foundation/engine/sdk/workflow"
"github.com/mesg-foundation/engine/workflow"
)

// Workflow exposes functions of the workflow
type Workflow struct {
event *eventsdk.Event
eventStream *eventsdk.Listener

execution *executionsdk.Execution
executionStream *executionsdk.Listener

workflow *workflowsdk.Workflow

ErrC chan error
}

// New creates a new Workflow instance
func New(event *eventsdk.Event, execution *executionsdk.Execution, workflow *workflowsdk.Workflow) *Workflow {
return &Workflow{
event: event,
execution: execution,
workflow: workflow,
ErrC: make(chan error),
}
}

// Start the workflow engine
func (w *Workflow) Start() error {
if w.eventStream != nil || w.executionStream != nil {
return fmt.Errorf("workflow engine already running")
}
w.eventStream = w.event.GetStream(nil)
w.executionStream = w.execution.GetStream(&executionsdk.Filter{
Statuses: []execution.Status{execution.Completed},
})
for {
select {
case event := <-w.eventStream.C:
go w.processTrigger(workflow.EVENT, event.InstanceHash, event.Key, event.Data, event.Hash, nil)
case execution := <-w.executionStream.C:
go w.processTrigger(workflow.RESULT, execution.InstanceHash, execution.TaskKey, execution.Outputs, nil, execution)
go w.processExecution(execution)
}
}
}

func (w *Workflow) processTrigger(trigger workflow.TriggerType, instanceHash hash.Hash, key string, data map[string]interface{}, eventHash hash.Hash, exec *execution.Execution) {
workflows, err := w.workflow.List()
if err != nil {
w.ErrC <- err
return
}
for _, wf := range workflows {
if wf.Trigger.Match(trigger, instanceHash, key, data) {
if err := w.triggerExecution(wf, exec, eventHash, data); err != nil {
w.ErrC <- err
}
}
}
}

func (w *Workflow) processExecution(exec *execution.Execution) error {
if exec.WorkflowHash.IsZero() {
return nil
}
wf, err := w.workflow.Get(exec.WorkflowHash)
if err != nil {
return err
}
return w.triggerExecution(wf, exec, nil, exec.Outputs)
}

func (w *Workflow) triggerExecution(wf *workflow.Workflow, prev *execution.Execution, eventHash hash.Hash, data map[string]interface{}) error {
height, err := w.getHeight(wf, prev)
if err != nil {
return err
}
if len(wf.Tasks) <= height {
// end of workflow
return nil
}
var parentHash hash.Hash
if prev != nil {
parentHash = prev.Hash
}
task := wf.Tasks[height]
if _, err := w.execution.Execute(wf.Hash, task.InstanceHash, eventHash, parentHash, task.TaskKey, data, []string{}); err != nil {
return err
}
return nil
}

func (w *Workflow) getHeight(wf *workflow.Workflow, exec *execution.Execution) (int, error) {
if exec == nil {
return 0, nil
}
// Result from other workflow
if !exec.WorkflowHash.Equal(wf.Hash) {
return 0, nil
}
// Execution triggered by an event
if !exec.EventHash.IsZero() {
return 1, nil
}
if exec.ParentHash.IsZero() {
panic("parent hash should be present if event is not")
}
if exec.ParentHash.Equal(exec.Hash) {
panic("parent hash cannot be equal to execution hash")
}
parent, err := w.execution.Get(exec.ParentHash)
if err != nil {
return 0, err
}
parentHeight, err := w.getHeight(wf, parent)
return parentHeight + 1, err
}
Loading

0 comments on commit 8236363

Please sign in to comment.