From 5a1b0d5872cf357d42dfbd053971987b2e55dc0b Mon Sep 17 00:00:00 2001 From: Nicolas Mahe Date: Wed, 22 May 2019 15:53:57 +0700 Subject: [PATCH 1/4] Remove output from service package --- execution/execution.go | 6 +- execution/execution_test.go | 25 +-------- interface/grpc/service/service_test.go | 38 ------------- service/error.go | 22 ++------ service/error_test.go | 17 +----- service/task.go | 77 ++++---------------------- 6 files changed, 22 insertions(+), 163 deletions(-) diff --git a/execution/execution.go b/execution/execution.go index e9c70a36a..4056793e5 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -102,11 +102,7 @@ func (execution *Execution) Complete(outputKey string, outputData map[string]int if err != nil { return err } - output, err := task.GetOutput(outputKey) - if err != nil { - return err - } - if err := output.RequireData(outputData); err != nil { + if err := task.RequireOutputs(outputData); err != nil { return err } diff --git a/execution/execution_test.go b/execution/execution_test.go index 8cfc04648..233799f35 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -143,34 +143,13 @@ func TestComplete(t *testing.T) { data map[string]interface{} err error }{ - { - name: "task output not found because of empty output key", - key: "", - data: map[string]interface{}{}, - err: &service.TaskOutputNotFoundError{ - TaskKey: taskKey, - TaskOutputKey: "", - ServiceName: serviceName, - }, - }, - { - name: "task output not found because wrong output key", - key: "notfound", - data: map[string]interface{}{"foo": "bar"}, - err: &service.TaskOutputNotFoundError{ - TaskKey: taskKey, - TaskOutputKey: "notfound", - ServiceName: serviceName, - }, - }, { name: "invalid task output", key: "success", data: map[string]interface{}{}, err: &service.InvalidTaskOutputError{ - TaskKey: taskKey, - ServiceName: serviceName, - TaskOutputKey: "success", + TaskKey: taskKey, + ServiceName: serviceName, Warnings: []*service.ParameterWarning{ { Key: "foo", diff --git a/interface/grpc/service/service_test.go b/interface/grpc/service/service_test.go index 0e67fa0d9..61d850c39 100644 --- a/interface/grpc/service/service_test.go +++ b/interface/grpc/service/service_test.go @@ -231,43 +231,6 @@ func TestSubmitWithInvalidID(t *testing.T) { require.Error(t, err) } -func TestSubmitWithNonExistentOutputKey(t *testing.T) { - var ( - taskKey = "call" - taskData = map[string]interface{}{ - "url": "https://mesg.com", - "data": map[string]interface{}{}, - "headers": map[string]interface{}{}, - } - outputKey = "nonExistent" - outputData = `{"foo":{}}` - server, closer = newServer(t) - ) - defer closer() - - s, validationErr, err := server.api.DeployService(serviceTar(t, taskServicePath), nil) - require.Zero(t, validationErr) - require.NoError(t, err) - defer server.api.DeleteService(s.Hash, false) - - require.NoError(t, server.api.StartService(s.Hash)) - defer server.api.StopService(s.Hash) - - executionID, err := server.api.ExecuteTask(s.Hash, taskKey, taskData, nil) - require.NoError(t, err) - - _, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{ - ExecutionID: executionID, - OutputKey: outputKey, - OutputData: outputData, - }) - require.Error(t, err) - notFoundErr, ok := err.(*service.TaskOutputNotFoundError) - require.True(t, ok) - require.Equal(t, outputKey, notFoundErr.TaskOutputKey) - require.Equal(t, s.Name, notFoundErr.ServiceName) -} - func TestSubmitWithInvalidTaskOutputs(t *testing.T) { var ( taskKey = "call" @@ -302,6 +265,5 @@ func TestSubmitWithInvalidTaskOutputs(t *testing.T) { invalidErr, ok := err.(*service.InvalidTaskOutputError) require.True(t, ok) require.Equal(t, taskKey, invalidErr.TaskKey) - require.Equal(t, outputKey, invalidErr.TaskOutputKey) require.Equal(t, s.Name, invalidErr.ServiceName) } diff --git a/service/error.go b/service/error.go index edf0b39d8..ffdf1697b 100644 --- a/service/error.go +++ b/service/error.go @@ -22,18 +22,6 @@ func (e *TaskNotFoundError) Error() string { return fmt.Sprintf("Task %q not found in service %q", e.TaskKey, e.ServiceName) } -// TaskOutputNotFoundError is an error returned when service doesn't contain corresponding output. -type TaskOutputNotFoundError struct { - TaskKey string - TaskOutputKey string - ServiceName string -} - -func (e *TaskOutputNotFoundError) Error() string { - return fmt.Sprintf("Output %q of task %q not found in service %q", e.TaskOutputKey, e.TaskKey, - e.ServiceName) -} - // InvalidEventDataError is an error returned when the data of corresponding event is not valid. type InvalidEventDataError struct { EventKey string @@ -66,15 +54,13 @@ func (e *InvalidTaskInputError) Error() string { // InvalidTaskOutputError is an error returned when the outputs of corresponding task are not valid. type InvalidTaskOutputError struct { - TaskKey string - TaskOutputKey string - ServiceName string - Warnings []*ParameterWarning + TaskKey string + ServiceName string + Warnings []*ParameterWarning } func (e *InvalidTaskOutputError) Error() string { - s := fmt.Sprintf("Outputs %q of task %q are invalid in service %q", e.TaskOutputKey, e.TaskKey, - e.ServiceName) + s := fmt.Sprintf("Outputs of task %q are invalid in service %q", e.TaskKey, e.ServiceName) for _, warning := range e.Warnings { s = fmt.Sprintf("%s. %s", s, warning) } diff --git a/service/error_test.go b/service/error_test.go index 33d429eb0..9e2227ad1 100644 --- a/service/error_test.go +++ b/service/error_test.go @@ -97,26 +97,15 @@ func TestInvalidTaskInputError(t *testing.T) { tests.assert(t, err.Error()) } -// Test OutputNotFoundError -func TestOutputNotFoundError(t *testing.T) { - err := TaskOutputNotFoundError{ - TaskKey: "TaskKey", - TaskOutputKey: "OutputKey", - ServiceName: "TestOutputNotFoundError", - } - require.Equal(t, `Output "OutputKey" of task "TaskKey" not found in service "TestOutputNotFoundError"`, err.Error()) -} - // Test InvalidOutputDataError func TestInvalidOutputDataError(t *testing.T) { tests := newParameterTestCases() err := InvalidTaskOutputError{ - TaskKey: "TaskKey", - TaskOutputKey: "OutputKey", - ServiceName: "TestInvalidOutputDataError", + TaskKey: "TaskKey", + ServiceName: "TestInvalidOutputDataError", Warnings: validateParametersSchema(tests.parameterTestsToSliceParameters(), tests.parameterTestsToMapData()), } - require.Contains(t, err.Error(), `Outputs "OutputKey" of task "TaskKey" are invalid in service "TestInvalidOutputDataError"`) + require.Contains(t, err.Error(), `Outputs of task "TaskKey" are invalid in service "TestInvalidOutputDataError"`) tests.assert(t, err.Error()) } diff --git a/service/task.go b/service/task.go index e22d40f9f..cb5397902 100644 --- a/service/task.go +++ b/service/task.go @@ -21,27 +21,6 @@ type Task struct { serviceName string `hash:"-"` } -// Output describes task output. -type Output struct { - // Key is the key of output. - Key string `hash:"name:1"` - - // Name is the name of task output. - Name string `hash:"name:2"` - - // Description is the description of task output. - Description string `hash:"name:3"` - - // Data holds the output parameters of a task output. - Data []*Parameter `hash:"name:4"` - - // taskKey is the output's task's key. - taskKey string `hash:"-"` - - // serviceName is the output's service's name. - serviceName string `hash:"-"` -} - // GetTask returns task taskKey of service. func (s *Service) GetTask(taskKey string) (*Task, error) { for _, task := range s.Tasks { @@ -61,7 +40,12 @@ func (t *Task) ValidateInputs(taskInputs map[string]interface{}) []*ParameterWar return validateParametersSchema(t.Inputs, taskInputs) } -// RequireInputs requires task inputs to be matched with parameter schemas. +// ValidateOutputs produces warnings for task outputs that doesn't satisfy their parameter schemas. +func (t *Task) ValidateOutputs(taskOutputs map[string]interface{}) []*ParameterWarning { + return validateParametersSchema(t.Outputs, taskOutputs) +} + +// RequireInputs requires task inputs to match with parameter schemas. func (t *Task) RequireInputs(taskInputs map[string]interface{}) error { warnings := t.ValidateInputs(taskInputs) if len(warnings) > 0 { @@ -74,51 +58,14 @@ func (t *Task) RequireInputs(taskInputs map[string]interface{}) error { return nil } -// GetOutput returns output outputKey of task. -func (t *Task) GetOutput(outputKey string) (*Output, error) { - switch outputKey { - case "success": - return &Output{ - Key: "success", - Data: t.Outputs, - taskKey: t.Key, - serviceName: t.serviceName, - }, nil - case "error": - return &Output{ - Key: "error", - Data: []*Parameter{ - { - Key: "message", - Type: "String", - }, - }, - taskKey: t.Key, - serviceName: t.serviceName, - }, nil - default: - return nil, &TaskOutputNotFoundError{ - TaskKey: t.Key, - TaskOutputKey: outputKey, - ServiceName: t.serviceName, - } - } -} - -// ValidateData produces warnings for task outputs that doesn't satisfy their parameter schemas. -func (o *Output) ValidateData(outputData map[string]interface{}) []*ParameterWarning { - return validateParametersSchema(o.Data, outputData) -} - -// RequireData requires task outputs to be matched with parameter schemas. -func (o *Output) RequireData(outputData map[string]interface{}) error { - warnings := o.ValidateData(outputData) +// RequireOutputs requires task outputs to match with parameter schemas. +func (t *Task) RequireOutputs(taskOutputs map[string]interface{}) error { + warnings := t.ValidateOutputs(taskOutputs) if len(warnings) > 0 { return &InvalidTaskOutputError{ - TaskKey: o.taskKey, - TaskOutputKey: o.Key, - ServiceName: o.serviceName, - Warnings: warnings, + TaskKey: t.Key, + ServiceName: t.serviceName, + Warnings: warnings, } } return nil From 5f9b2113a2a246bf1572f394f0518a0a49d88999 Mon Sep 17 00:00:00 2001 From: krhubert Date: Wed, 22 May 2019 13:21:42 +0200 Subject: [PATCH 2/4] Remove output key from Execution --- api/api.go | 8 ++++---- execution/execution.go | 4 +--- execution/execution_test.go | 7 +------ interface/grpc/service/service.go | 2 +- interface/grpc/service/service_test.go | 9 --------- 5 files changed, 7 insertions(+), 23 deletions(-) diff --git a/api/api.go b/api/api.go index b93bb3572..fe3c01b11 100644 --- a/api/api.go +++ b/api/api.go @@ -176,8 +176,8 @@ func (a *API) ListenExecution(service string, f *ExecutionFilter) (*ExecutionLis } // SubmitResult submits results for executionID. -func (a *API) SubmitResult(executionID string, outputKey string, outputs []byte) error { - exec, stateChanged, err := a.processExecution(executionID, outputKey, outputs) +func (a *API) SubmitResult(executionID string, outputs []byte) error { + exec, stateChanged, err := a.processExecution(executionID, outputs) if stateChanged { // only publish to listeners when the execution's state changed. go a.ps.Pub(exec, executionSubTopic(exec.Service.Hash)) @@ -186,7 +186,7 @@ func (a *API) SubmitResult(executionID string, outputKey string, outputs []byte) } // processExecution processes execution and marks it as complated or failed. -func (a *API) processExecution(executionID string, outputKey string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) { +func (a *API) processExecution(executionID string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) { stateChanged = false tx, err := a.execDB.OpenTransaction() if err != nil { @@ -204,7 +204,7 @@ func (a *API) processExecution(executionID string, outputKey string, outputData return a.saveExecution(tx, exec, fmt.Errorf("invalid output data error: %s", err)) } - if err := exec.Complete(outputKey, outputDataMap); err != nil { + if err := exec.Complete(outputDataMap); err != nil { return a.saveExecution(tx, exec, err) } diff --git a/execution/execution.go b/execution/execution.go index 4056793e5..581beaff3 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -44,7 +44,6 @@ type Execution struct { TaskKey string `hash:"name:taskKey"` Tags []string `hash:"name:tags"` Inputs map[string]interface{} `hash:"name:inputs"` - OutputKey string `hash:"-"` OutputData map[string]interface{} `hash:"-"` Error string `hash:"-"` CreatedAt time.Time `hash:"-"` @@ -90,7 +89,7 @@ func (execution *Execution) Execute() error { // Complete changes execution status to completed. It verifies the output. // It returns an error if the status is different then InProgress or verification fails. -func (execution *Execution) Complete(outputKey string, outputData map[string]interface{}) error { +func (execution *Execution) Complete(outputData map[string]interface{}) error { if execution.Status != InProgress { return StatusError{ ExpectedStatus: InProgress, @@ -107,7 +106,6 @@ func (execution *Execution) Complete(outputKey string, outputData map[string]int } execution.ExecutionDuration = time.Since(execution.ExecutedAt) - execution.OutputKey = outputKey execution.OutputData = outputData execution.Status = Completed return nil diff --git a/execution/execution_test.go b/execution/execution_test.go index 233799f35..50c77e2b8 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -139,13 +139,11 @@ func TestComplete(t *testing.T) { e.Execute() tests := []struct { name string - key string data map[string]interface{} err error }{ { name: "invalid task output", - key: "success", data: map[string]interface{}{}, err: &service.InvalidTaskOutputError{ TaskKey: taskKey, @@ -161,13 +159,10 @@ func TestComplete(t *testing.T) { }, { name: "success", - key: "success", data: map[string]interface{}{"foo": "bar"}, - err: nil, }, { // this one is already proccessed name: "already executed", - key: "success", data: map[string]interface{}{"foo": "bar"}, err: StatusError{ ExpectedStatus: InProgress, @@ -176,7 +171,7 @@ func TestComplete(t *testing.T) { }, } for _, test := range tests { - err := e.Complete(test.key, test.data) + err := e.Complete(test.data) require.Equal(t, test.err, err, test.name) if test.err != nil { continue diff --git a/interface/grpc/service/service.go b/interface/grpc/service/service.go index e023a1321..753ba2a2f 100644 --- a/interface/grpc/service/service.go +++ b/interface/grpc/service/service.go @@ -69,5 +69,5 @@ func (s *Server) ListenTask(request *serviceapi.ListenTaskRequest, stream servic // SubmitResult submits results of an execution. func (s *Server) SubmitResult(context context.Context, request *serviceapi.SubmitResultRequest) (*serviceapi.SubmitResultReply, error) { - return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, request.OutputKey, []byte(request.OutputData)) + return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, []byte(request.OutputData)) } diff --git a/interface/grpc/service/service_test.go b/interface/grpc/service/service_test.go index 61d850c39..2258c9147 100644 --- a/interface/grpc/service/service_test.go +++ b/interface/grpc/service/service_test.go @@ -148,7 +148,6 @@ func TestSubmit(t *testing.T) { "data": map[string]interface{}{}, "headers": map[string]interface{}{}, } - outputKey = "success" outputData = `{"foo":{}}` server, closer = newServer(t) ) @@ -171,14 +170,12 @@ func TestSubmit(t *testing.T) { _, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{ ExecutionID: executionID, - OutputKey: outputKey, OutputData: outputData, }) require.NoError(t, err) execution := <-ln.C require.Equal(t, executionID, execution.ID) - require.Equal(t, outputKey, execution.OutputKey) require.Equal(t, outputData, jsonMarshal(t, execution.OutputData)) } @@ -190,7 +187,6 @@ func TestSubmitWithInvalidJSON(t *testing.T) { "data": map[string]interface{}{}, "headers": map[string]interface{}{}, } - outputKey = "result" server, closer = newServer(t) ) defer closer() @@ -208,7 +204,6 @@ func TestSubmitWithInvalidJSON(t *testing.T) { _, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{ ExecutionID: executionID, - OutputKey: outputKey, OutputData: "", }) require.Equal(t, "invalid output data error: unexpected end of JSON input", err.Error()) @@ -216,7 +211,6 @@ func TestSubmitWithInvalidJSON(t *testing.T) { func TestSubmitWithInvalidID(t *testing.T) { var ( - outputKey = "output" outputData = "{}" executionID = "1" server, closer = newServer(t) @@ -225,7 +219,6 @@ func TestSubmitWithInvalidID(t *testing.T) { _, err := server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{ ExecutionID: executionID, - OutputKey: outputKey, OutputData: outputData, }) require.Error(t, err) @@ -239,7 +232,6 @@ func TestSubmitWithInvalidTaskOutputs(t *testing.T) { "data": map[string]interface{}{}, "headers": map[string]interface{}{}, } - outputKey = "success" outputData = `{"foo":1}` server, closer = newServer(t) ) @@ -258,7 +250,6 @@ func TestSubmitWithInvalidTaskOutputs(t *testing.T) { _, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{ ExecutionID: executionID, - OutputKey: outputKey, OutputData: outputData, }) require.Error(t, err) From 9a38d6fe70f681104ec564065128b0c5b7466398 Mon Sep 17 00:00:00 2001 From: krhubert Date: Wed, 22 May 2019 14:31:42 +0200 Subject: [PATCH 3/4] Pass return output error to api --- api/api.go | 10 ++++++---- interface/grpc/service/service.go | 6 +++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/api/api.go b/api/api.go index fe3c01b11..6712640e4 100644 --- a/api/api.go +++ b/api/api.go @@ -176,8 +176,8 @@ func (a *API) ListenExecution(service string, f *ExecutionFilter) (*ExecutionLis } // SubmitResult submits results for executionID. -func (a *API) SubmitResult(executionID string, outputs []byte) error { - exec, stateChanged, err := a.processExecution(executionID, outputs) +func (a *API) SubmitResult(executionID string, outputs []byte, reterr error) error { + exec, stateChanged, err := a.processExecution(executionID, outputs, reterr) if stateChanged { // only publish to listeners when the execution's state changed. go a.ps.Pub(exec, executionSubTopic(exec.Service.Hash)) @@ -186,8 +186,7 @@ func (a *API) SubmitResult(executionID string, outputs []byte) error { } // processExecution processes execution and marks it as complated or failed. -func (a *API) processExecution(executionID string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) { - stateChanged = false +func (a *API) processExecution(executionID string, outputData []byte, reterr error) (exec *execution.Execution, stateChanged bool, err error) { tx, err := a.execDB.OpenTransaction() if err != nil { return nil, false, err @@ -198,6 +197,9 @@ func (a *API) processExecution(executionID string, outputData []byte) (exec *exe tx.Discard() return nil, false, err } + if reterr != nil { + return a.saveExecution(tx, exec, reterr) + } var outputDataMap map[string]interface{} if err := json.Unmarshal(outputData, &outputDataMap); err != nil { diff --git a/interface/grpc/service/service.go b/interface/grpc/service/service.go index 753ba2a2f..3623514fc 100644 --- a/interface/grpc/service/service.go +++ b/interface/grpc/service/service.go @@ -3,6 +3,7 @@ package service import ( "context" "encoding/json" + "errors" "github.com/mesg-foundation/core/api" "github.com/mesg-foundation/core/execution" @@ -69,5 +70,8 @@ func (s *Server) ListenTask(request *serviceapi.ListenTaskRequest, stream servic // SubmitResult submits results of an execution. func (s *Server) SubmitResult(context context.Context, request *serviceapi.SubmitResultRequest) (*serviceapi.SubmitResultReply, error) { - return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, []byte(request.OutputData)) + if request.OutputKey == "error" { + return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, nil, errors.New(request.OutputData)) + } + return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, []byte(request.OutputData), nil) } From 36f2e82879e0d890e75446dbb61057a8fe136a62 Mon Sep 17 00:00:00 2001 From: krhubert Date: Wed, 22 May 2019 14:43:46 +0200 Subject: [PATCH 4/4] Rename OutputData to Outputs in execution package --- execution/execution.go | 8 ++++---- interface/grpc/core/core.go | 2 +- interface/grpc/service/service_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/execution/execution.go b/execution/execution.go index 581beaff3..354502982 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -44,7 +44,7 @@ type Execution struct { TaskKey string `hash:"name:taskKey"` Tags []string `hash:"name:tags"` Inputs map[string]interface{} `hash:"name:inputs"` - OutputData map[string]interface{} `hash:"-"` + Outputs map[string]interface{} `hash:"-"` Error string `hash:"-"` CreatedAt time.Time `hash:"-"` ExecutedAt time.Time `hash:"-"` @@ -89,7 +89,7 @@ func (execution *Execution) Execute() error { // Complete changes execution status to completed. It verifies the output. // It returns an error if the status is different then InProgress or verification fails. -func (execution *Execution) Complete(outputData map[string]interface{}) error { +func (execution *Execution) Complete(outputs map[string]interface{}) error { if execution.Status != InProgress { return StatusError{ ExpectedStatus: InProgress, @@ -101,12 +101,12 @@ func (execution *Execution) Complete(outputData map[string]interface{}) error { if err != nil { return err } - if err := task.RequireOutputs(outputData); err != nil { + if err := task.RequireOutputs(outputs); err != nil { return err } execution.ExecutionDuration = time.Since(execution.ExecutedAt) - execution.OutputData = outputData + execution.Outputs = outputs execution.Status = Completed return nil } diff --git a/interface/grpc/core/core.go b/interface/grpc/core/core.go index 59c769e55..010935056 100644 --- a/interface/grpc/core/core.go +++ b/interface/grpc/core/core.go @@ -164,7 +164,7 @@ func (s *Server) ListenResult(request *coreapi.ListenResultRequest, stream corea return ctx.Err() case execution := <-ln.C: - outputs, err := json.Marshal(execution.OutputData) + outputs, err := json.Marshal(execution.Outputs) if err != nil { return err } diff --git a/interface/grpc/service/service_test.go b/interface/grpc/service/service_test.go index 2258c9147..1a1f5a15d 100644 --- a/interface/grpc/service/service_test.go +++ b/interface/grpc/service/service_test.go @@ -176,7 +176,7 @@ func TestSubmit(t *testing.T) { execution := <-ln.C require.Equal(t, executionID, execution.ID) - require.Equal(t, outputData, jsonMarshal(t, execution.OutputData)) + require.Equal(t, outputData, jsonMarshal(t, execution.Outputs)) } func TestSubmitWithInvalidJSON(t *testing.T) {