Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Execution Attributes #994

Merged
merged 16 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions client/service/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

// Execution holds information about a Task execution.
type Execution struct {
// ID is the execution id of task.
ID, id string
// Hash is the execution id of task.
Hash, hash string

// Key is the name of task.
Key string
Expand All @@ -23,9 +23,9 @@ type Execution struct {

func newExecution(service *Service, data *serviceapi.TaskData) *Execution {
return &Execution{
ID: data.ExecutionID,
Hash: data.ExecutionHash,
Key: data.TaskKey,
id: data.ExecutionID,
hash: data.ExecutionHash,
inputs: data.InputData,
service: service,
}
Expand All @@ -42,7 +42,7 @@ func (e *Execution) reply(data interface{}, reterr error) (err error) {
defer cancel()
if reterr != nil {
_, err = e.service.client.SubmitResult(ctx, &serviceapi.SubmitResultRequest{
ExecutionID: e.id,
ExecutionHash: e.hash,
Result: &serviceapi.SubmitResultRequest_Error{
Error: reterr.Error(),
},
Expand All @@ -53,7 +53,7 @@ func (e *Execution) reply(data interface{}, reterr error) (err error) {
return err1
}
_, err = e.service.client.SubmitResult(ctx, &serviceapi.SubmitResultRequest{
ExecutionID: e.id,
ExecutionHash: e.hash,
Result: &serviceapi.SubmitResultRequest_OutputData{
OutputData: string(resp),
},
Expand Down
4 changes: 2 additions & 2 deletions client/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func TestListen(t *testing.T) {
require.True(t, err == nil || err == context.Canceled)
}()

id, execution, err := server.Execute(task, reqData)
hash, execution, err := server.Execute(task, reqData)
require.NoError(t, err)
require.Equal(t, id, execution.ID())
require.Equal(t, hash, execution.Hash())
require.Equal(t, token, server.ListenToken())

var data1 taskResponse
Expand Down
8 changes: 4 additions & 4 deletions client/service/servicetest/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (

// Execution is a testing task execution result.
type Execution struct {
id string
hash string
data string
key string
}

// ID returns the execution id of task.
func (e *Execution) ID() string {
return e.id
// Hash returns the execution id of task.
func (e *Execution) Hash() string {
return e.hash
}

// Key returns the output key of task.
Expand Down
10 changes: 5 additions & 5 deletions client/service/servicetest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (s *Server) Execute(task string, data interface{}) (string, *Execution, err
}

uuidV4 := uuid.NewV4()
id := uuidV4.String()
hash := uuidV4.String()

taskData := &serviceapi.TaskData{
ExecutionID: id,
TaskKey: task,
InputData: string(bytes),
ExecutionHash: hash,
TaskKey: task,
InputData: string(bytes),
}

select {
Expand All @@ -64,7 +64,7 @@ func (s *Server) Execute(task string, data interface{}) (string, *Execution, err
case <-s.service.closingC:
return "", nil, ErrConnectionClosed{}
case resp := <-s.service.submitC:
return id, resp, nil
return hash, resp, nil
}
}

Expand Down
12 changes: 6 additions & 6 deletions client/service/servicetest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ func TestExecute(t *testing.T) {
server := NewServer()
require.NotNil(t, server)

var executionID string
var executionHash string
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()

executionID1, execution, err := server.Execute(task, reqData)
executionHash1, execution, err := server.Execute(task, reqData)
require.NoError(t, err)
require.Equal(t, executionID, execution.ID())
require.Equal(t, executionID, executionID1)
require.Equal(t, executionHash, execution.Hash())
require.Equal(t, executionHash, executionHash1)

var data taskResponse
require.Nil(t, execution.Data(&data))
Expand All @@ -101,12 +101,12 @@ func TestExecute(t *testing.T) {
}()

taskData := <-stream.taskC
executionID = taskData.ExecutionID
executionHash = taskData.ExecutionHash
require.Equal(t, task, taskData.TaskKey)
require.Equal(t, reqDataStr, taskData.InputData)

_, err := server.service.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
ExecutionHash: executionHash,
Result: &serviceapi.SubmitResultRequest_OutputData{
OutputData: resDataStr,
},
Expand Down
4 changes: 2 additions & 2 deletions client/service/servicetest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (s *serviceServer) SubmitResult(context context.Context,
switch res := request.Result.(type) {
case *serviceapi.SubmitResultRequest_OutputData:
s.submitC <- &Execution{
id: request.ExecutionID,
hash: request.ExecutionHash,
data: res.OutputData,
}
case *serviceapi.SubmitResultRequest_Error:
s.submitC <- &Execution{
id: request.ExecutionID,
hash: request.ExecutionHash,
data: res.Error,
}
}
Expand Down
28 changes: 14 additions & 14 deletions database/execution_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (

// ExecutionDB exposes all the functionalities
type ExecutionDB interface {
Find(executionID string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
OpenTransaction() (ExecutionTransaction, error)
io.Closer
}

// ExecutionTransaction is the transaction handle.
type ExecutionTransaction interface {
Find(executionID string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
Commit() error
Discard()
Expand All @@ -40,9 +40,9 @@ func NewExecutionDB(path string) (*LevelDBExecutionDB, error) {
return &LevelDBExecutionDB{db: db}, nil
}

// Find the execution based on an executionID, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, error) {
return executionFind(db.db, executionID)
// Find the execution based on an executionHash, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(db.db, executionHash)
}

// Save an instance of executable in the database
Expand Down Expand Up @@ -71,9 +71,9 @@ type LevelDBExecutionTransaction struct {
tx *leveldb.Transaction
}

// Find the execution based on an executionID, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionID string) (*execution.Execution, error) {
return executionFind(tx.tx, executionID)
// Find the execution based on an executionHash, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(tx.tx, executionHash)
}

// Save an instance of executable in the database
Expand All @@ -92,9 +92,9 @@ func (tx *LevelDBExecutionTransaction) Discard() {
tx.tx.Discard()
}

// Find the execution based on an executionID, returns an error if not found
func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, error) {
data, err := db.Get([]byte(executionID), nil)
// Find the execution based on an executionHash, returns an error if not found
func executionFind(db leveldbTxDB, executionHash []byte) (*execution.Execution, error) {
data, err := db.Get(executionHash, nil)
if err != nil {
return nil, err
}
Expand All @@ -108,12 +108,12 @@ func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, er
// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func executionSave(db leveldbTxDB, execution *execution.Execution) error {
if execution.ID == "" {
return errors.New("database: can't save service without id")
if len(execution.Hash) == 0 {
return errors.New("database: can't save execution without hash")
}
data, err := json.Marshal(execution)
if err != nil {
return err
}
return db.Put([]byte(execution.ID), data, nil)
return db.Put(execution.Hash, data, nil)
}
14 changes: 7 additions & 7 deletions database/execution_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ func TestFind(t *testing.T) {
defer os.RemoveAll(dir)
db := db(t, dir)
defer db.Close()
e := &execution.Execution{ID: "xxx"}
e := &execution.Execution{Hash: []byte{'1'}}
db.Save(e)
tests := []struct {
id string
hash []byte
hasError bool
}{
{id: e.ID, hasError: false},
{id: "doesn't exists", hasError: true},
{hash: e.Hash, hasError: false},
{hash: []byte{1}, hasError: true},
}
for _, test := range tests {
execution, err := db.Find(test.id)
execution, err := db.Find(test.hash)
if test.hasError {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.NotNil(t, execution)
e, err := db.Find(execution.ID)
e, err := db.Find(execution.Hash)
require.NoError(t, err)
require.NotNil(t, e)
}
Expand All @@ -52,7 +52,7 @@ func TestSave(t *testing.T) {
execution *execution.Execution
hasError bool
}{
{&execution.Execution{ID: "xxx"}, false},
{&execution.Execution{Hash: []byte{'1'}}, false},
{&execution.Execution{}, true},
}
for _, test := range tests {
Expand Down
63 changes: 20 additions & 43 deletions execution/execution.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package execution

import (
"time"

"github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/x/xstructhash"
)

Expand Down Expand Up @@ -37,40 +34,31 @@ func (s Status) String() (r string) {

// Execution stores all informations about executions.
type Execution struct {
ID string `hash:"-"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
Service *service.Service `hash:"name:service"`
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
CreatedAt time.Time `hash:"-"`
ExecutedAt time.Time `hash:"-"`
ExecutionDuration time.Duration `hash:"-"`
Hash []byte `hash:"-"`
ParentHash []byte `hash:"name:parentHash"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
ServiceHash string `hash:"name:serviceHash"`
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
}

// New returns a new execution. It returns an error if inputs are invalid.
func New(service *service.Service, eventID string, taskKey string, inputs map[string]interface{}, tags []string) (*Execution, error) {
task, err := service.GetTask(taskKey)
if err != nil {
return nil, err
}
if err := task.RequireInputs(inputs); err != nil {
return nil, err
}
func New(service string, parentHash []byte, eventID, taskKey string, inputs map[string]interface{}, tags []string) *Execution {
exec := &Execution{
EventID: eventID,
Service: service,
Inputs: inputs,
TaskKey: taskKey,
Tags: tags,
CreatedAt: time.Now(),
Status: Created,
EventID: eventID,
ServiceHash: service,
ParentHash: parentHash,
Inputs: inputs,
TaskKey: taskKey,
Tags: tags,
Status: Created,
}
exec.ID = xstructhash.Hash(exec, 1)
return exec, nil
exec.Hash = xstructhash.Hash(exec, 1)
return exec
}

// Execute changes executions status to in progres and update its execute time.
Expand All @@ -82,7 +70,6 @@ func (execution *Execution) Execute() error {
ActualStatus: execution.Status,
}
}
execution.ExecutedAt = time.Now()
execution.Status = InProgress
return nil
}
Expand All @@ -97,15 +84,6 @@ func (execution *Execution) Complete(outputs map[string]interface{}) error {
}
}

task, err := execution.Service.GetTask(execution.TaskKey)
if err != nil {
return err
}
if err := task.RequireOutputs(outputs); err != nil {
return err
}

execution.ExecutionDuration = time.Since(execution.ExecutedAt)
execution.Outputs = outputs
execution.Status = Completed
return nil
Expand All @@ -122,7 +100,6 @@ func (execution *Execution) Failed(err error) error {
}

execution.Error = err.Error()
execution.ExecutionDuration = time.Since(execution.ExecutedAt)
execution.Status = Failed
return nil
}
Loading