Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Execution Attributes #994

Merged
merged 16 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,35 +108,39 @@ func (a *API) EmitEvent(token, eventKey string, eventData map[string]interface{}
}

// ExecuteTask executes a task tasKey with inputData and tags for service serviceID.
func (a *API) ExecuteTask(serviceID, taskKey string, inputData map[string]interface{},
tags []string) (executionID string, err error) {
func (a *API) ExecuteTask(serviceID, taskKey string, inputData map[string]interface{}, tags []string) (executionHash []byte, err error) {
s, err := a.db.Get(serviceID)
if err != nil {
return "", err
return nil, err
}
// a task should be executed only if task's service is running.
status, err := a.m.Status(s)
if err != nil {
return "", err
return nil, err
}
if status != service.RUNNING {
return "", &NotRunningServiceError{ServiceID: s.Sid}
return nil, &NotRunningServiceError{ServiceID: s.Sid}
}

// execute the task.
eventID := uuid.NewV4().String()
exec, err := execution.New(s, eventID, taskKey, inputData, tags)
task, err := s.GetTask(taskKey)
if err != nil {
return "", err
return nil, err
}
if err := task.RequireInputs(inputData); err != nil {
return nil, err
}

// execute the task.
eventID := uuid.NewV4().String()
exec := execution.New(s.Hash, nil, eventID, taskKey, inputData, tags)
if err := exec.Execute(); err != nil {
return "", err
return nil, err
}
if err = a.execDB.Save(exec); err != nil {
return "", err
return nil, err
}
go a.ps.Pub(exec, executionSubTopic(s.Hash))
return exec.ID, nil
return exec.Hash, nil
}

// ListenEvent listens events matches with eventFilter on serviceID.
Expand Down Expand Up @@ -176,24 +180,24 @@ func (a *API) ListenExecution(service string, f *ExecutionFilter) (*ExecutionLis
}

// SubmitResult submits results for executionID.
func (a *API) SubmitResult(executionID string, outputs []byte, reterr error) error {
exec, err := a.processExecution(executionID, outputs, reterr)
func (a *API) SubmitResult(executionHash []byte, outputs []byte, reterr error) error {
exec, err := a.processExecution(executionHash, outputs, reterr)
if err != nil {
return err
}

go a.ps.Pub(exec, executionSubTopic(exec.Service.Hash))
go a.ps.Pub(exec, executionSubTopic(exec.ServiceHash))
return nil
}

// processExecution processes execution and marks it as complated or failed.
func (a *API) processExecution(executionID string, outputData []byte, reterr error) (*execution.Execution, error) {
func (a *API) processExecution(executionHash []byte, outputData []byte, reterr error) (*execution.Execution, error) {
tx, err := a.execDB.OpenTransaction()
if err != nil {
return nil, err
}

exec, err := tx.Find(executionID)
exec, err := tx.Find(executionHash)
if err != nil {
tx.Discard()
return nil, err
Expand All @@ -205,8 +209,8 @@ func (a *API) processExecution(executionID string, outputData []byte, reterr err
return nil, err
}
} else {
var o map[string]interface{}
if err := json.Unmarshal(outputData, &o); err != nil {
o, err := a.validateExecutionOutput(exec.ServiceHash, exec.TaskKey, outputData)
if err != nil {
return nil, err
}

Expand All @@ -228,6 +232,28 @@ func (a *API) processExecution(executionID string, outputData []byte, reterr err
return exec, nil
}

func (a *API) validateExecutionOutput(service, taskKey string, jsonout []byte) (map[string]interface{}, error) {
var output map[string]interface{}
if err := json.Unmarshal(jsonout, &output); err != nil {
return nil, fmt.Errorf("invalid output: %s", err)
}

s, err := a.db.Get(service)
if err != nil {
return nil, err
}

task, err := s.GetTask(taskKey)
if err != nil {
return nil, err
}

if err := task.RequireOutputs(output); err != nil {
return nil, err
}
return output, nil
}

// NotRunningServiceError is an error returned when the service is not running that
// a task needed to be executed on.
type NotRunningServiceError struct {
Expand Down
12 changes: 6 additions & 6 deletions client/service/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

// Execution holds information about a Task execution.
type Execution struct {
// ID is the execution id of task.
ID, id string
// Hash is the execution id of task.
Hash, hash string

// Key is the name of task.
Key string
Expand All @@ -23,9 +23,9 @@ type Execution struct {

func newExecution(service *Service, data *serviceapi.TaskData) *Execution {
return &Execution{
ID: data.ExecutionID,
Hash: data.ExecutionHash,
Key: data.TaskKey,
id: data.ExecutionID,
hash: data.ExecutionHash,
inputs: data.InputData,
service: service,
}
Expand All @@ -42,7 +42,7 @@ func (e *Execution) reply(data interface{}, reterr error) (err error) {
defer cancel()
if reterr != nil {
_, err = e.service.client.SubmitResult(ctx, &serviceapi.SubmitResultRequest{
ExecutionID: e.id,
ExecutionHash: e.hash,
Result: &serviceapi.SubmitResultRequest_Error{
Error: reterr.Error(),
},
Expand All @@ -53,7 +53,7 @@ func (e *Execution) reply(data interface{}, reterr error) (err error) {
return err1
}
_, err = e.service.client.SubmitResult(ctx, &serviceapi.SubmitResultRequest{
ExecutionID: e.id,
ExecutionHash: e.hash,
Result: &serviceapi.SubmitResultRequest_OutputData{
OutputData: string(resp),
},
Expand Down
4 changes: 2 additions & 2 deletions client/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func TestListen(t *testing.T) {
require.True(t, err == nil || err == context.Canceled)
}()

id, execution, err := server.Execute(task, reqData)
hash, execution, err := server.Execute(task, reqData)
require.NoError(t, err)
require.Equal(t, id, execution.ID())
require.Equal(t, hash, execution.Hash())
require.Equal(t, token, server.ListenToken())

var data1 taskResponse
Expand Down
8 changes: 4 additions & 4 deletions client/service/servicetest/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (

// Execution is a testing task execution result.
type Execution struct {
id string
hash string
data string
key string
}

// ID returns the execution id of task.
func (e *Execution) ID() string {
return e.id
// Hash returns the execution id of task.
func (e *Execution) Hash() string {
return e.hash
}

// Key returns the output key of task.
Expand Down
10 changes: 5 additions & 5 deletions client/service/servicetest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (s *Server) Execute(task string, data interface{}) (string, *Execution, err
}

uuidV4 := uuid.NewV4()
id := uuidV4.String()
hash := uuidV4.String()

taskData := &serviceapi.TaskData{
ExecutionID: id,
TaskKey: task,
InputData: string(bytes),
ExecutionHash: hash,
TaskKey: task,
InputData: string(bytes),
}

select {
Expand All @@ -64,7 +64,7 @@ func (s *Server) Execute(task string, data interface{}) (string, *Execution, err
case <-s.service.closingC:
return "", nil, ErrConnectionClosed{}
case resp := <-s.service.submitC:
return id, resp, nil
return hash, resp, nil
}
}

Expand Down
12 changes: 6 additions & 6 deletions client/service/servicetest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ func TestExecute(t *testing.T) {
server := NewServer()
require.NotNil(t, server)

var executionID string
var executionHash string
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()

executionID1, execution, err := server.Execute(task, reqData)
executionHash1, execution, err := server.Execute(task, reqData)
require.NoError(t, err)
require.Equal(t, executionID, execution.ID())
require.Equal(t, executionID, executionID1)
require.Equal(t, executionHash, execution.Hash())
require.Equal(t, executionHash, executionHash1)

var data taskResponse
require.Nil(t, execution.Data(&data))
Expand All @@ -101,12 +101,12 @@ func TestExecute(t *testing.T) {
}()

taskData := <-stream.taskC
executionID = taskData.ExecutionID
executionHash = taskData.ExecutionHash
require.Equal(t, task, taskData.TaskKey)
require.Equal(t, reqDataStr, taskData.InputData)

_, err := server.service.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
ExecutionHash: executionHash,
Result: &serviceapi.SubmitResultRequest_OutputData{
OutputData: resDataStr,
},
Expand Down
4 changes: 2 additions & 2 deletions client/service/servicetest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (s *serviceServer) SubmitResult(context context.Context,
switch res := request.Result.(type) {
case *serviceapi.SubmitResultRequest_OutputData:
s.submitC <- &Execution{
id: request.ExecutionID,
hash: request.ExecutionHash,
data: res.OutputData,
}
case *serviceapi.SubmitResultRequest_Error:
s.submitC <- &Execution{
id: request.ExecutionID,
hash: request.ExecutionHash,
data: res.Error,
}
}
Expand Down
28 changes: 14 additions & 14 deletions database/execution_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (

// ExecutionDB exposes all the functionalities
type ExecutionDB interface {
Find(executionID string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
OpenTransaction() (ExecutionTransaction, error)
io.Closer
}

// ExecutionTransaction is the transaction handle.
type ExecutionTransaction interface {
Find(executionID string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
Commit() error
Discard()
Expand All @@ -40,9 +40,9 @@ func NewExecutionDB(path string) (*LevelDBExecutionDB, error) {
return &LevelDBExecutionDB{db: db}, nil
}

// Find the execution based on an executionID, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, error) {
return executionFind(db.db, executionID)
// Find the execution based on an executionHash, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(db.db, executionHash)
}

// Save an instance of executable in the database
Expand Down Expand Up @@ -71,9 +71,9 @@ type LevelDBExecutionTransaction struct {
tx *leveldb.Transaction
}

// Find the execution based on an executionID, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionID string) (*execution.Execution, error) {
return executionFind(tx.tx, executionID)
// Find the execution based on an executionHash, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(tx.tx, executionHash)
}

// Save an instance of executable in the database
Expand All @@ -92,9 +92,9 @@ func (tx *LevelDBExecutionTransaction) Discard() {
tx.tx.Discard()
}

// Find the execution based on an executionID, returns an error if not found
func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, error) {
data, err := db.Get([]byte(executionID), nil)
// Find the execution based on an executionHash, returns an error if not found
func executionFind(db leveldbTxDB, executionHash []byte) (*execution.Execution, error) {
data, err := db.Get(executionHash, nil)
if err != nil {
return nil, err
}
Expand All @@ -108,12 +108,12 @@ func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, er
// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func executionSave(db leveldbTxDB, execution *execution.Execution) error {
if execution.ID == "" {
return errors.New("database: can't save service without id")
if len(execution.Hash) == 0 {
return errors.New("database: can't save execution without hash")
}
data, err := json.Marshal(execution)
if err != nil {
return err
}
return db.Put([]byte(execution.ID), data, nil)
return db.Put(execution.Hash, data, nil)
}
14 changes: 7 additions & 7 deletions database/execution_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ func TestFind(t *testing.T) {
defer os.RemoveAll(dir)
db := db(t, dir)
defer db.Close()
e := &execution.Execution{ID: "xxx"}
e := &execution.Execution{Hash: []byte{'1'}}
db.Save(e)
tests := []struct {
id string
hash []byte
hasError bool
}{
{id: e.ID, hasError: false},
{id: "doesn't exists", hasError: true},
{hash: e.Hash, hasError: false},
{hash: []byte{1}, hasError: true},
}
for _, test := range tests {
execution, err := db.Find(test.id)
execution, err := db.Find(test.hash)
if test.hasError {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.NotNil(t, execution)
e, err := db.Find(execution.ID)
e, err := db.Find(execution.Hash)
require.NoError(t, err)
require.NotNil(t, e)
}
Expand All @@ -52,7 +52,7 @@ func TestSave(t *testing.T) {
execution *execution.Execution
hasError bool
}{
{&execution.Execution{ID: "xxx"}, false},
{&execution.Execution{Hash: []byte{'1'}}, false},
{&execution.Execution{}, true},
}
for _, test := range tests {
Expand Down
Loading