Skip to content

Commit

Permalink
Merge pull request #994 from mesg-foundation/feature/execurion-db
Browse files Browse the repository at this point in the history
Change Execution Attributes
  • Loading branch information
antho1404 authored May 30, 2019
2 parents 9cf2d0b + f20bddb commit addd2e9
Show file tree
Hide file tree
Showing 22 changed files with 354 additions and 517 deletions.
12 changes: 6 additions & 6 deletions client/service/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

// Execution holds information about a Task execution.
type Execution struct {
// ID is the execution id of task.
ID, id string
// Hash is the execution id of task.
Hash, hash string

// Key is the name of task.
Key string
Expand All @@ -23,9 +23,9 @@ type Execution struct {

func newExecution(service *Service, data *serviceapi.TaskData) *Execution {
return &Execution{
ID: data.ExecutionID,
Hash: data.ExecutionHash,
Key: data.TaskKey,
id: data.ExecutionID,
hash: data.ExecutionHash,
inputs: data.InputData,
service: service,
}
Expand All @@ -42,7 +42,7 @@ func (e *Execution) reply(data interface{}, reterr error) (err error) {
defer cancel()
if reterr != nil {
_, err = e.service.client.SubmitResult(ctx, &serviceapi.SubmitResultRequest{
ExecutionID: e.id,
ExecutionHash: e.hash,
Result: &serviceapi.SubmitResultRequest_Error{
Error: reterr.Error(),
},
Expand All @@ -53,7 +53,7 @@ func (e *Execution) reply(data interface{}, reterr error) (err error) {
return err1
}
_, err = e.service.client.SubmitResult(ctx, &serviceapi.SubmitResultRequest{
ExecutionID: e.id,
ExecutionHash: e.hash,
Result: &serviceapi.SubmitResultRequest_OutputData{
OutputData: string(resp),
},
Expand Down
4 changes: 2 additions & 2 deletions client/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func TestListen(t *testing.T) {
require.True(t, err == nil || err == context.Canceled)
}()

id, execution, err := server.Execute(task, reqData)
hash, execution, err := server.Execute(task, reqData)
require.NoError(t, err)
require.Equal(t, id, execution.ID())
require.Equal(t, hash, execution.Hash())
require.Equal(t, token, server.ListenToken())

var data1 taskResponse
Expand Down
8 changes: 4 additions & 4 deletions client/service/servicetest/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (

// Execution is a testing task execution result.
type Execution struct {
id string
hash string
data string
key string
}

// ID returns the execution id of task.
func (e *Execution) ID() string {
return e.id
// Hash returns the execution id of task.
func (e *Execution) Hash() string {
return e.hash
}

// Key returns the output key of task.
Expand Down
10 changes: 5 additions & 5 deletions client/service/servicetest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (s *Server) Execute(task string, data interface{}) (string, *Execution, err
}

uuidV4 := uuid.NewV4()
id := uuidV4.String()
hash := uuidV4.String()

taskData := &serviceapi.TaskData{
ExecutionID: id,
TaskKey: task,
InputData: string(bytes),
ExecutionHash: hash,
TaskKey: task,
InputData: string(bytes),
}

select {
Expand All @@ -64,7 +64,7 @@ func (s *Server) Execute(task string, data interface{}) (string, *Execution, err
case <-s.service.closingC:
return "", nil, ErrConnectionClosed{}
case resp := <-s.service.submitC:
return id, resp, nil
return hash, resp, nil
}
}

Expand Down
12 changes: 6 additions & 6 deletions client/service/servicetest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ func TestExecute(t *testing.T) {
server := NewServer()
require.NotNil(t, server)

var executionID string
var executionHash string
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()

executionID1, execution, err := server.Execute(task, reqData)
executionHash1, execution, err := server.Execute(task, reqData)
require.NoError(t, err)
require.Equal(t, executionID, execution.ID())
require.Equal(t, executionID, executionID1)
require.Equal(t, executionHash, execution.Hash())
require.Equal(t, executionHash, executionHash1)

var data taskResponse
require.Nil(t, execution.Data(&data))
Expand All @@ -101,12 +101,12 @@ func TestExecute(t *testing.T) {
}()

taskData := <-stream.taskC
executionID = taskData.ExecutionID
executionHash = taskData.ExecutionHash
require.Equal(t, task, taskData.TaskKey)
require.Equal(t, reqDataStr, taskData.InputData)

_, err := server.service.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
ExecutionHash: executionHash,
Result: &serviceapi.SubmitResultRequest_OutputData{
OutputData: resDataStr,
},
Expand Down
4 changes: 2 additions & 2 deletions client/service/servicetest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (s *serviceServer) SubmitResult(context context.Context,
switch res := request.Result.(type) {
case *serviceapi.SubmitResultRequest_OutputData:
s.submitC <- &Execution{
id: request.ExecutionID,
hash: request.ExecutionHash,
data: res.OutputData,
}
case *serviceapi.SubmitResultRequest_Error:
s.submitC <- &Execution{
id: request.ExecutionID,
hash: request.ExecutionHash,
data: res.Error,
}
}
Expand Down
28 changes: 14 additions & 14 deletions database/execution_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (

// ExecutionDB exposes all the functionalities
type ExecutionDB interface {
Find(executionID string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
OpenTransaction() (ExecutionTransaction, error)
io.Closer
}

// ExecutionTransaction is the transaction handle.
type ExecutionTransaction interface {
Find(executionID string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
Commit() error
Discard()
Expand All @@ -40,9 +40,9 @@ func NewExecutionDB(path string) (*LevelDBExecutionDB, error) {
return &LevelDBExecutionDB{db: db}, nil
}

// Find the execution based on an executionID, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, error) {
return executionFind(db.db, executionID)
// Find the execution based on an executionHash, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(db.db, executionHash)
}

// Save an instance of executable in the database
Expand Down Expand Up @@ -71,9 +71,9 @@ type LevelDBExecutionTransaction struct {
tx *leveldb.Transaction
}

// Find the execution based on an executionID, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionID string) (*execution.Execution, error) {
return executionFind(tx.tx, executionID)
// Find the execution based on an executionHash, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(tx.tx, executionHash)
}

// Save an instance of executable in the database
Expand All @@ -92,9 +92,9 @@ func (tx *LevelDBExecutionTransaction) Discard() {
tx.tx.Discard()
}

// Find the execution based on an executionID, returns an error if not found
func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, error) {
data, err := db.Get([]byte(executionID), nil)
// Find the execution based on an executionHash, returns an error if not found
func executionFind(db leveldbTxDB, executionHash []byte) (*execution.Execution, error) {
data, err := db.Get(executionHash, nil)
if err != nil {
return nil, err
}
Expand All @@ -108,12 +108,12 @@ func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, er
// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func executionSave(db leveldbTxDB, execution *execution.Execution) error {
if execution.ID == "" {
return errors.New("database: can't save service without id")
if len(execution.Hash) == 0 {
return errors.New("database: can't save execution without hash")
}
data, err := json.Marshal(execution)
if err != nil {
return err
}
return db.Put([]byte(execution.ID), data, nil)
return db.Put(execution.Hash, data, nil)
}
14 changes: 7 additions & 7 deletions database/execution_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ func TestFind(t *testing.T) {
defer os.RemoveAll(dir)
db := db(t, dir)
defer db.Close()
e := &execution.Execution{ID: "xxx"}
e := &execution.Execution{Hash: []byte{'1'}}
db.Save(e)
tests := []struct {
id string
hash []byte
hasError bool
}{
{id: e.ID, hasError: false},
{id: "doesn't exists", hasError: true},
{hash: e.Hash, hasError: false},
{hash: []byte{1}, hasError: true},
}
for _, test := range tests {
execution, err := db.Find(test.id)
execution, err := db.Find(test.hash)
if test.hasError {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.NotNil(t, execution)
e, err := db.Find(execution.ID)
e, err := db.Find(execution.Hash)
require.NoError(t, err)
require.NotNil(t, e)
}
Expand All @@ -52,7 +52,7 @@ func TestSave(t *testing.T) {
execution *execution.Execution
hasError bool
}{
{&execution.Execution{ID: "xxx"}, false},
{&execution.Execution{Hash: []byte{'1'}}, false},
{&execution.Execution{}, true},
}
for _, test := range tests {
Expand Down
63 changes: 20 additions & 43 deletions execution/execution.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package execution

import (
"time"

"github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/x/xstructhash"
)

Expand Down Expand Up @@ -37,40 +34,31 @@ func (s Status) String() (r string) {

// Execution stores all informations about executions.
type Execution struct {
ID string `hash:"-"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
Service *service.Service `hash:"name:service"`
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
CreatedAt time.Time `hash:"-"`
ExecutedAt time.Time `hash:"-"`
ExecutionDuration time.Duration `hash:"-"`
Hash []byte `hash:"-"`
ParentHash []byte `hash:"name:parentHash"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
ServiceHash string `hash:"name:serviceHash"`
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
}

// New returns a new execution. It returns an error if inputs are invalid.
func New(service *service.Service, eventID string, taskKey string, inputs map[string]interface{}, tags []string) (*Execution, error) {
task, err := service.GetTask(taskKey)
if err != nil {
return nil, err
}
if err := task.RequireInputs(inputs); err != nil {
return nil, err
}
func New(service string, parentHash []byte, eventID, taskKey string, inputs map[string]interface{}, tags []string) *Execution {
exec := &Execution{
EventID: eventID,
Service: service,
Inputs: inputs,
TaskKey: taskKey,
Tags: tags,
CreatedAt: time.Now(),
Status: Created,
EventID: eventID,
ServiceHash: service,
ParentHash: parentHash,
Inputs: inputs,
TaskKey: taskKey,
Tags: tags,
Status: Created,
}
exec.ID = xstructhash.Hash(exec, 1)
return exec, nil
exec.Hash = xstructhash.Hash(exec, 1)
return exec
}

// Execute changes executions status to in progres and update its execute time.
Expand All @@ -82,7 +70,6 @@ func (execution *Execution) Execute() error {
ActualStatus: execution.Status,
}
}
execution.ExecutedAt = time.Now()
execution.Status = InProgress
return nil
}
Expand All @@ -97,15 +84,6 @@ func (execution *Execution) Complete(outputs map[string]interface{}) error {
}
}

task, err := execution.Service.GetTask(execution.TaskKey)
if err != nil {
return err
}
if err := task.RequireOutputs(outputs); err != nil {
return err
}

execution.ExecutionDuration = time.Since(execution.ExecutedAt)
execution.Outputs = outputs
execution.Status = Completed
return nil
Expand All @@ -122,7 +100,6 @@ func (execution *Execution) Failed(err error) error {
}

execution.Error = err.Error()
execution.ExecutionDuration = time.Since(execution.ExecutedAt)
execution.Status = Failed
return nil
}
Loading

0 comments on commit addd2e9

Please sign in to comment.