Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move workflow to its own database/api #1204

Merged
merged 10 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
serviceDBVersion = "v3"
executionDBVersion = "v2"
instanceDBVersion = "v1"
workflowDBVersion = "v1"
)

var (
Expand All @@ -44,6 +45,7 @@ type Config struct {
ServiceRelativePath string
InstanceRelativePath string
ExecutionRelativePath string
WorkflowRelativePath string
}

SystemServices []*ServiceConfig
Expand All @@ -66,6 +68,7 @@ func New() (*Config, error) {
c.Database.ServiceRelativePath = filepath.Join("database", "services", serviceDBVersion)
c.Database.InstanceRelativePath = filepath.Join("database", "instance", instanceDBVersion)
c.Database.ExecutionRelativePath = filepath.Join("database", "executions", executionDBVersion)
c.Database.WorkflowRelativePath = filepath.Join("database", "workflows", workflowDBVersion)
c.setupServices()
return &c, nil
}
Expand Down
14 changes: 11 additions & 3 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/mesg-foundation/engine/config"
"github.com/mesg-foundation/engine/container"
"github.com/mesg-foundation/engine/database"
"github.com/mesg-foundation/engine/engine"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/logger"
"github.com/mesg-foundation/engine/sdk"
instancesdk "github.com/mesg-foundation/engine/sdk/instance"
servicesdk "github.com/mesg-foundation/engine/sdk/service"
"github.com/mesg-foundation/engine/server/grpc"
"github.com/mesg-foundation/engine/version"
"github.com/mesg-foundation/engine/workflow"
"github.com/mesg-foundation/engine/x/xerrors"
"github.com/mesg-foundation/engine/x/xnet"
"github.com/mesg-foundation/engine/x/xos"
Expand All @@ -27,6 +27,7 @@ type dependencies struct {
config *config.Config
serviceDB database.ServiceDB
executionDB database.ExecutionDB
workflowDB database.WorkflowDB
container container.Container
sdk *sdk.SDK
}
Expand Down Expand Up @@ -56,6 +57,12 @@ func initDependencies() (*dependencies, error) {
return nil, err
}

// init workflow db.
workflowDB, err := database.NewWorkflowDB(filepath.Join(config.Path, config.Database.WorkflowRelativePath))
if err != nil {
return nil, err
}

// init container.
c, err := container.New(config.Name)
if err != nil {
Expand All @@ -65,13 +72,14 @@ func initDependencies() (*dependencies, error) {
_, port, _ := xnet.SplitHostPort(config.Server.Address)

// init sdk.
sdk := sdk.New(c, serviceDB, instanceDB, executionDB, config.Name, strconv.Itoa(port))
sdk := sdk.New(c, serviceDB, instanceDB, executionDB, workflowDB, config.Name, strconv.Itoa(port))

return &dependencies{
config: config,
container: c,
serviceDB: serviceDB,
executionDB: executionDB,
workflowDB: workflowDB,
sdk: sdk,
}, nil
}
Expand Down Expand Up @@ -165,7 +173,7 @@ func main() {
}()

logrus.Info("starting workflow engine")
wf := workflow.New(dep.sdk.Event, dep.sdk.Execution, dep.sdk.Service)
wf := engine.New(dep.sdk.Event, dep.sdk.Execution, dep.sdk.Workflow)
go func() {
if err := wf.Start(); err != nil {
logrus.Fatalln(err)
Expand Down
130 changes: 130 additions & 0 deletions database/workflow_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package database

import (
"encoding/json"
"fmt"

"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/workflow"
"github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
)

// WorkflowDB describes the API of database package.
type WorkflowDB interface {
// Save saves a workflow to database.
Save(s *workflow.Workflow) error

// Get gets a workflow from database by its unique hash.
Get(hash hash.Hash) (*workflow.Workflow, error)

// Delete deletes a workflow from database by its unique hash.
Delete(hash hash.Hash) error

// All returns all workflows from database.
All() ([]*workflow.Workflow, error)

// Close closes underlying database connection.
Close() error
}

// LevelDBWorkflowDB is a database for storing workflows definition.
type LevelDBWorkflowDB struct {
db *leveldb.DB
}

// NewWorkflowDB returns the database which is located under given path.
func NewWorkflowDB(path string) (*LevelDBWorkflowDB, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}
return &LevelDBWorkflowDB{db: db}, nil
}

// marshal returns the byte slice from workflow.
func (d *LevelDBWorkflowDB) marshal(s *workflow.Workflow) ([]byte, error) {
return json.Marshal(s)
}

// unmarshal returns the workflow from byte slice.
func (d *LevelDBWorkflowDB) unmarshal(hash hash.Hash, value []byte) (*workflow.Workflow, error) {
var s workflow.Workflow
if err := json.Unmarshal(value, &s); err != nil {
return nil, fmt.Errorf("database: could not decode workflow %q: %s", hash, err)
}
return &s, nil
}

// All returns every workflow in database.
func (d *LevelDBWorkflowDB) All() ([]*workflow.Workflow, error) {
var (
workflows []*workflow.Workflow
iter = d.db.NewIterator(nil, nil)
)
for iter.Next() {
hash := hash.Hash(iter.Key())
s, err := d.unmarshal(hash, iter.Value())
if err != nil {
// NOTE: Ignore all decode errors (possibly due to a workflow
// structure change or database corruption)
logrus.WithField("workflow", hash.String()).Warning(err.Error())
continue
}
workflows = append(workflows, s)
}
iter.Release()
return workflows, iter.Error()
}

// Delete deletes workflow from database.
func (d *LevelDBWorkflowDB) Delete(hash hash.Hash) error {
tx, err := d.db.OpenTransaction()
if err != nil {
return err
}
if _, err := tx.Get(hash, nil); err != nil {
tx.Discard()
if err == leveldb.ErrNotFound {
return &ErrNotFound{resource: "workflow", hash: hash}
}
return err
}
if err := tx.Delete(hash, nil); err != nil {
tx.Discard()
return err
}
return tx.Commit()

}

// Get retrives workflow from database.
func (d *LevelDBWorkflowDB) Get(hash hash.Hash) (*workflow.Workflow, error) {
b, err := d.db.Get(hash, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, &ErrNotFound{resource: "workflow", hash: hash}
}
return nil, err
}
return d.unmarshal(hash, b)
}

// Save stores workflow in database.
// If there is an another workflow that uses the same sid, it'll be deleted.
func (d *LevelDBWorkflowDB) Save(s *workflow.Workflow) error {
if s.Hash.IsZero() {
return errCannotSaveWithoutHash
}

b, err := d.marshal(s)
if err != nil {
return err
}
return d.db.Put(s.Hash, b, nil)
}

// Close closes database.
func (d *LevelDBWorkflowDB) Close() error {
return d.db.Close()
}
127 changes: 127 additions & 0 deletions engine/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package engine

import (
"fmt"

"github.com/mesg-foundation/engine/execution"
"github.com/mesg-foundation/engine/hash"
eventsdk "github.com/mesg-foundation/engine/sdk/event"
executionsdk "github.com/mesg-foundation/engine/sdk/execution"
workflowsdk "github.com/mesg-foundation/engine/sdk/workflow"
"github.com/mesg-foundation/engine/workflow"
)

// Workflow exposes functions of the workflow
type Workflow struct {
event *eventsdk.Event
eventStream *eventsdk.Listener

execution *executionsdk.Execution
executionStream *executionsdk.Listener

workflow *workflowsdk.Workflow

ErrC chan error
}

// New creates a new Workflow instance
func New(event *eventsdk.Event, execution *executionsdk.Execution, workflow *workflowsdk.Workflow) *Workflow {
return &Workflow{
event: event,
execution: execution,
workflow: workflow,
ErrC: make(chan error),
}
}

// Start the workflow engine
func (w *Workflow) Start() error {
if w.eventStream != nil || w.executionStream != nil {
return fmt.Errorf("workflow engine already running")
}
w.eventStream = w.event.GetStream(nil)
w.executionStream = w.execution.GetStream(&executionsdk.Filter{
Statuses: []execution.Status{execution.Completed},
})
for {
select {
case event := <-w.eventStream.C:
go w.processTrigger(workflow.EVENT, event.InstanceHash, event.Key, event.Data, event.Hash, nil)
case execution := <-w.executionStream.C:
go w.processTrigger(workflow.RESULT, execution.InstanceHash, execution.TaskKey, execution.Outputs, nil, execution)
go w.processExecution(execution)
}
}
}

func (w *Workflow) processTrigger(trigger workflow.TriggerType, instanceHash hash.Hash, key string, data map[string]interface{}, eventHash hash.Hash, exec *execution.Execution) {
workflows, err := w.workflow.List()
if err != nil {
w.ErrC <- err
return
}
for _, wf := range workflows {
if wf.Trigger.Match(trigger, instanceHash, key, data) {
if err := w.triggerExecution(wf, exec, eventHash, data); err != nil {
w.ErrC <- err
}
}
}
}

func (w *Workflow) processExecution(exec *execution.Execution) error {
if exec.WorkflowHash.IsZero() {
return nil
}
wf, err := w.workflow.Get(exec.WorkflowHash)
if err != nil {
return err
}
return w.triggerExecution(wf, exec, nil, exec.Outputs)
}

func (w *Workflow) triggerExecution(wf *workflow.Workflow, prev *execution.Execution, eventHash hash.Hash, data map[string]interface{}) error {
height, err := w.getHeight(wf, prev)
if err != nil {
return err
}
if len(wf.Tasks) <= height {
// end of workflow
return nil
}
var parentHash hash.Hash
if prev != nil {
parentHash = prev.Hash
}
task := wf.Tasks[height]
if _, err := w.execution.Execute(wf.Hash, task.InstanceHash, eventHash, parentHash, task.TaskKey, data, []string{}); err != nil {
return err
}
return nil
}

func (w *Workflow) getHeight(wf *workflow.Workflow, exec *execution.Execution) (int, error) {
if exec == nil {
return 0, nil
}
// Result from other workflow
if !exec.WorkflowHash.Equal(wf.Hash) {
return 0, nil
}
// Execution triggered by an event
if !exec.EventHash.IsZero() {
return 1, nil
}
if exec.ParentHash.IsZero() {
panic("parent hash should be present if event is not")
}
if exec.ParentHash.Equal(exec.Hash) {
panic("parent hash cannot be equal to execution hash")
}
parent, err := w.execution.Get(exec.ParentHash)
if err != nil {
return 0, err
}
parentHeight, err := w.getHeight(wf, parent)
return parentHeight + 1, err
}
Loading