Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executions stored in Database #560

Merged
merged 38 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
34faf32
store executions in database
antho1404 Oct 26, 2018
de51b7e
update create api to accept pointer
antho1404 Oct 26, 2018
d462de7
use string for execution id
antho1404 Oct 26, 2018
594d444
add close function for db
antho1404 Oct 26, 2018
875ee00
add execution db to database
antho1404 Oct 26, 2018
7fa4bd1
add config for execution database
antho1404 Oct 26, 2018
aacde6b
update grpc api
antho1404 Oct 26, 2018
10c7e18
create server with execution db
antho1404 Oct 26, 2018
7ef7267
fix task key when listening task
antho1404 Oct 26, 2018
fe1541c
use service + task key for execution
antho1404 Oct 26, 2018
060556e
fix db initialization
antho1404 Oct 26, 2018
4b172ad
fix execution id
antho1404 Oct 26, 2018
2995814
Merge branch 'dev' into feature/execution-databse
antho1404 Oct 26, 2018
c6945ff
add more context in errors
antho1404 Oct 27, 2018
9d7fed5
improve tmp directory
antho1404 Oct 27, 2018
227a7fc
simplify id creation
antho1404 Oct 27, 2018
9fa87c7
empty line
antho1404 Oct 27, 2018
9d4612e
test readability
antho1404 Oct 27, 2018
11b207a
return leveldb instance
antho1404 Oct 27, 2018
ba91278
Merge branch 'dev' into feature/execution-databse
antho1404 Oct 29, 2018
7fd8e27
add more context to the leveldb name
antho1404 Oct 29, 2018
6a16eee
update LevelDBExecutionDB comment
antho1404 Oct 29, 2018
5a1ac20
reorganize execution package
antho1404 Oct 30, 2018
531c830
use execution api from database/execution package
antho1404 Oct 30, 2018
739d870
fix test
antho1404 Oct 30, 2018
318ef70
Remove empty line from imports
krhubert Oct 30, 2018
26bf450
Remove database after tests
krhubert Oct 30, 2018
bd59bda
Merge branch 'dev' into feature/execution-databse
ilgooz Oct 31, 2018
ead99c5
Update docs + do not return execution from Save
krhubert Oct 31, 2018
a3e6d2a
Merge branch 'feature/execution-databse' of github.com:mesg-foundatio…
krhubert Oct 31, 2018
c28927a
do not generate id from database
antho1404 Oct 31, 2018
28eef69
generate ID when create a new execution
antho1404 Oct 31, 2018
68487f8
fix test
antho1404 Oct 31, 2018
572fe63
execution: small improvements & more verbose tests
ilgooz Nov 1, 2018
9ed97b2
Merge branch 'dev' into feature/execution-databse
ilgooz Nov 1, 2018
99eb632
improve tests
ilgooz Nov 1, 2018
05dff0a
interface/grpc/core: [fix] upgrade service on Find() of execution db
ilgooz Nov 1, 2018
76180b6
Merge branch 'dev' into feature/execution-databse
krhubert Nov 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
// API exposes all functionalities of MESG core.
type API struct {
db database.ServiceDB
execDB database.ExecutionDB
container container.Container
}

// Option is a configuration func for MESG.
type Option func(*API)

// New creates a new API with given options.
func New(db database.ServiceDB, options ...Option) (*API, error) {
a := &API{db: db}
func New(db database.ServiceDB, execDB database.ExecutionDB, options ...Option) (*API, error) {
a := &API{db: db, execDB: execDB}
for _, option := range options {
option(a)
}
Expand Down
17 changes: 12 additions & 5 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,31 @@ import (
"github.com/stretchr/testify/require"
)

const testdbname = "db.test"
const (
servicedbname = "service.db.test"
execdbname = "exec.db.test"
)

func newAPIAndDockerTest(t *testing.T) (*API, *dockertest.Testing, func()) {

dt := dockertest.New()

container, err := container.New(container.ClientOption(dt.Client()))
require.NoError(t, err)

db, err := database.NewServiceDB(testdbname)
db, err := database.NewServiceDB(servicedbname)
require.NoError(t, err)

execDB, err := database.NewExecutionDB(execdbname)
require.NoError(t, err)

a, err := New(db, ContainerOption(container))
a, err := New(db, execDB, ContainerOption(container))
require.NoError(t, err)

closer := func() {
require.NoError(t, db.Close())
require.NoError(t, os.RemoveAll(testdbname))
require.NoError(t, execDB.Close())
require.NoError(t, os.RemoveAll(servicedbname))
require.NoError(t, os.RemoveAll(execdbname))
}
return a, dt, closer
}
18 changes: 13 additions & 5 deletions api/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"

"github.com/mesg-foundation/core/execution"
"github.com/mesg-foundation/core/pubsub"
"github.com/mesg-foundation/core/service"
uuid "github.com/satori/go.uuid"
)

// ExecuteTask executes a task tasKey with inputData and tags for service serviceID.
Expand Down Expand Up @@ -39,7 +41,7 @@ func (e *taskExecutor) Execute(serviceID, taskKey string, inputData map[string]i
if err := e.checkServiceStatus(s); err != nil {
return "", err
}
return e.execute(s, taskKey, inputData, tags)
return e.execute(s, uuid.NewV4().String(), taskKey, inputData, tags)
}

// checkServiceStatus checks service status. A task should be executed only if
Expand All @@ -56,13 +58,19 @@ func (e *taskExecutor) checkServiceStatus(s *service.Service) error {
}

// execute executes task.
func (e *taskExecutor) execute(s *service.Service, taskKey string, taskInputs map[string]interface{},
tags []string) (executionID string, err error) {
exc, err := execution.Create(s, taskKey, taskInputs, tags)
func (e *taskExecutor) execute(s *service.Service, eventID string, taskKey string, taskInputs map[string]interface{}, tags []string) (executionID string, err error) {
exec, err := execution.New(s, eventID, taskKey, taskInputs, tags)
if err != nil {
return "", err
}
return exc.ID, exc.Execute()
if err := exec.Execute(); err != nil {
return "", err
}
if err = e.api.execDB.Save(exec); err != nil {
return "", err
}
go pubsub.Publish(s.TaskSubscriptionChannel(), exec)
return exec.ID, nil
}

// NotRunningServiceError is an error returned when the service is not running that
Expand Down
4 changes: 2 additions & 2 deletions api/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestExecuteFunc(t *testing.T) {
},
},
}, service.ContainerOption(a.container))
id, err := executor.execute(s, "test", map[string]interface{}{}, []string{})
id, err := executor.execute(s, "xxx", "test", map[string]interface{}{}, []string{})
require.NoError(t, err)
require.NotNil(t, id)
}
Expand All @@ -34,7 +34,7 @@ func TestExecuteFuncInvalidTaskName(t *testing.T) {
defer closer()
executor := newTaskExecutor(a)
srv := &service.Service{}
_, err := executor.execute(srv, "test", map[string]interface{}{}, []string{})
_, err := executor.execute(srv, "xxx", "test", map[string]interface{}{}, []string{})
require.Error(t, err)
}

Expand Down
4 changes: 2 additions & 2 deletions api/listen_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ func (l *ResultListener) isSubscribed(e *execution.Execution) bool {
}

func (l *ResultListener) isSubscribedToTask(e *execution.Execution) bool {
return xstrings.SliceContains([]string{"", "*", e.Task}, l.taskKey)
return xstrings.SliceContains([]string{"", "*", e.TaskKey}, l.taskKey)
}

func (l *ResultListener) isSubscribedToOutput(e *execution.Execution) bool {
return xstrings.SliceContains([]string{"", "*", e.Output}, l.outputKey)
return xstrings.SliceContains([]string{"", "*", e.OutputKey}, l.outputKey)
}

func (l *ResultListener) isSubscribedToTags(e *execution.Execution) bool {
Expand Down
4 changes: 2 additions & 2 deletions api/listen_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestIsSubscribedToTask(t *testing.T) {
defer closer()
ln := newResultListener(a)

x := &execution.Execution{Task: "task"}
x := &execution.Execution{TaskKey: "task"}

ln.taskKey = ""
require.True(t, ln.isSubscribedToTask(x))
Expand All @@ -110,7 +110,7 @@ func TestIsSubscribedToOutput(t *testing.T) {
defer closer()
ln := newResultListener(a)

x := &execution.Execution{Output: "output"}
x := &execution.Execution{OutputKey: "output"}

ln.outputKey = ""
require.True(t, ln.isSubscribedToOutput(x))
Expand Down
39 changes: 19 additions & 20 deletions api/submit_result.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package api

import (
"fmt"

"github.com/mesg-foundation/core/execution"
"github.com/mesg-foundation/core/pubsub"
"github.com/mesg-foundation/core/service"
)

// SubmitResult submits results for executionID.
func (a *API) SubmitResult(executionID, outputKey string, outputData map[string]interface{}) error {
func (a *API) SubmitResult(executionID string, outputKey string, outputData map[string]interface{}) error {
return newResultSubmitter(a).Submit(executionID, outputKey, outputData)
}

Expand All @@ -24,21 +23,21 @@ func newResultSubmitter(api *API) *resultSubmitter {
}

// Submit submits results for executionID.
func (s *resultSubmitter) Submit(executionID, outputKey string, outputData map[string]interface{}) error {
execution := execution.InProgress(executionID)
if execution == nil {
return &MissingExecutionError{
ID: executionID,
}
func (s *resultSubmitter) Submit(executionID string, outputKey string, outputData map[string]interface{}) error {
exec, err := s.api.execDB.Find(executionID)
if err != nil {
return err
}
return execution.Complete(outputKey, outputData)
}

// MissingExecutionError is returned when corresponding execution doesn't exists.
type MissingExecutionError struct {
ID string
}

func (e *MissingExecutionError) Error() string {
return fmt.Sprintf("Execution %q doesn't exists", e.ID)
exec.Service, err = service.FromService(exec.Service, service.ContainerOption(s.api.container))
if err != nil {
return err
}
if err := exec.Complete(outputKey, outputData); err != nil {
return err
}
if err = s.api.execDB.Save(exec); err != nil {
return err
}
go pubsub.Publish(exec.Service.ResultSubscriptionChannel(), exec)
return nil
}
12 changes: 0 additions & 12 deletions api/submit_result_test.go

This file was deleted.

6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type Config struct {
Path string

Database struct {
RelativePath string
ServiceRelativePath string
ExecutionRelativePath string
}
}

Expand All @@ -69,7 +70,8 @@ func New() (*Config, error) {
c.Core.Image = "mesg/core:" + strings.Split(version.Version, " ")[0]
c.Core.Name = "core"
c.Core.Path = filepath.Join(home, ".mesg")
c.Core.Database.RelativePath = filepath.Join("database", "services")
c.Core.Database.ServiceRelativePath = filepath.Join("database", "services")
c.Core.Database.ExecutionRelativePath = filepath.Join("database", "executions")
c.Docker.Core.Path = "/mesg"
c.Docker.Socket = "/var/run/docker.sock"
return &c, nil
Expand Down
3 changes: 2 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestDefaultValue(t *testing.T) {
require.Equal(t, "text", c.Log.Format)
require.Equal(t, "info", c.Log.Level)
require.Equal(t, filepath.Join(home, ".mesg"), c.Core.Path)
require.Equal(t, filepath.Join("database", "services"), c.Core.Database.RelativePath)
require.Equal(t, filepath.Join("database", "services"), c.Core.Database.ServiceRelativePath)
require.Equal(t, filepath.Join("database", "executions"), c.Core.Database.ExecutionRelativePath)
require.Equal(t, "core", c.Core.Name)
require.Equal(t, "/mesg", c.Docker.Core.Path)
require.Equal(t, "/var/run/docker.sock", c.Docker.Socket)
Expand Down
10 changes: 8 additions & 2 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ import (

func initGRPCServer(c *config.Config) (*grpc.Server, error) {
// init services db.
db, err := database.NewServiceDB(filepath.Join(c.Core.Path, c.Core.Database.RelativePath))
db, err := database.NewServiceDB(filepath.Join(c.Core.Path, c.Core.Database.ServiceRelativePath))
if err != nil {
return nil, err
}

// init execution db.
execDB, err := database.NewExecutionDB(filepath.Join(c.Core.Path, c.Core.Database.ExecutionRelativePath))
if err != nil {
return nil, err
}

// init api.
a, err := api.New(db)
a, err := api.New(db, execDB)
if err != nil {
return nil, err
}
Expand Down
62 changes: 62 additions & 0 deletions database/execution_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package database

import (
"encoding/json"
"errors"

"github.com/mesg-foundation/core/execution"
"github.com/syndtr/goleveldb/leveldb"
)

// ExecutionDB exposes all the functionalities
type ExecutionDB interface {
Find(executionID string) (*execution.Execution, error)
Save(execution *execution.Execution) error
Close() error
}

// LevelDBExecutionDB is a concrete implementation of the DB interface
type LevelDBExecutionDB struct {
db *leveldb.DB
}

// NewExecutionDB creates a new DB instance
func NewExecutionDB(path string) (*LevelDBExecutionDB, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}

return &LevelDBExecutionDB{db: db}, nil
}

// Find the execution based on an executionID, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, error) {
data, err := db.db.Get([]byte(executionID), nil)
if err != nil {
return nil, err
}
var execution execution.Execution
if err := json.Unmarshal(data, &execution); err != nil {
return nil, err
}
return &execution, nil
}

// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func (db *LevelDBExecutionDB) Save(execution *execution.Execution) error {
if execution.ID == "" {
return errors.New("database: can't save service without id")
}
data, err := json.Marshal(execution)
if err != nil {
return err
}
return db.db.Put([]byte(execution.ID), data, nil)
}

// Close closes database.
func (db *LevelDBExecutionDB) Close() error {
return db.db.Close()
}
66 changes: 66 additions & 0 deletions database/execution_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package database

import (
"io/ioutil"
"os"
"testing"

"github.com/mesg-foundation/core/execution"
"github.com/stretchr/testify/require"
)

func db(t *testing.T, dir string) ExecutionDB {
db, err := NewExecutionDB(dir)
require.NoError(t, err)
return db
}

func TestFind(t *testing.T) {
dir, _ := ioutil.TempDir("", "TestFind")
defer os.RemoveAll(dir)
db := db(t, dir)
defer db.Close()
e := &execution.Execution{ID: "xxx"}
db.Save(e)
tests := []struct {
id string
hasError bool
}{
{id: e.ID, hasError: false},
{id: "doesn't exists", hasError: true},
}
for _, test := range tests {
execution, err := db.Find(test.id)
if test.hasError {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.NotNil(t, execution)
e, err := db.Find(execution.ID)
require.NoError(t, err)
require.NotNil(t, e)
}
}

func TestSave(t *testing.T) {
dir, _ := ioutil.TempDir("", "TestSave")
defer os.RemoveAll(dir)
db := db(t, dir)
defer db.Close()
tests := []struct {
execution *execution.Execution
hasError bool
}{
{&execution.Execution{ID: "xxx"}, false},
{&execution.Execution{}, true},
}
for _, test := range tests {
err := db.Save(test.execution)
if test.hasError {
require.Error(t, err)
continue
}
require.NoError(t, err)
}
}
Loading