Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7th step - Rename OutputData to Outputs in execution package #976

Merged
merged 4 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func (a *API) ListenExecution(service string, f *ExecutionFilter) (*ExecutionLis
}

// SubmitResult submits results for executionID.
func (a *API) SubmitResult(executionID string, outputKey string, outputs []byte) error {
exec, stateChanged, err := a.processExecution(executionID, outputKey, outputs)
func (a *API) SubmitResult(executionID string, outputs []byte, reterr error) error {
exec, stateChanged, err := a.processExecution(executionID, outputs, reterr)
if stateChanged {
// only publish to listeners when the execution's state changed.
go a.ps.Pub(exec, executionSubTopic(exec.Service.Hash))
Expand All @@ -186,8 +186,7 @@ func (a *API) SubmitResult(executionID string, outputKey string, outputs []byte)
}

// processExecution processes execution and marks it as complated or failed.
func (a *API) processExecution(executionID string, outputKey string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) {
stateChanged = false
func (a *API) processExecution(executionID string, outputData []byte, reterr error) (exec *execution.Execution, stateChanged bool, err error) {
tx, err := a.execDB.OpenTransaction()
if err != nil {
return nil, false, err
Expand All @@ -198,13 +197,16 @@ func (a *API) processExecution(executionID string, outputKey string, outputData
tx.Discard()
return nil, false, err
}
if reterr != nil {
return a.saveExecution(tx, exec, reterr)
}

var outputDataMap map[string]interface{}
if err := json.Unmarshal(outputData, &outputDataMap); err != nil {
return a.saveExecution(tx, exec, fmt.Errorf("invalid output data error: %s", err))
}

if err := exec.Complete(outputKey, outputDataMap); err != nil {
if err := exec.Complete(outputDataMap); err != nil {
return a.saveExecution(tx, exec, err)
}

Expand Down
14 changes: 4 additions & 10 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type Execution struct {
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
OutputKey string `hash:"-"`
OutputData map[string]interface{} `hash:"-"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
CreatedAt time.Time `hash:"-"`
ExecutedAt time.Time `hash:"-"`
Expand Down Expand Up @@ -90,7 +89,7 @@ func (execution *Execution) Execute() error {

// Complete changes execution status to completed. It verifies the output.
// It returns an error if the status is different then InProgress or verification fails.
func (execution *Execution) Complete(outputKey string, outputData map[string]interface{}) error {
func (execution *Execution) Complete(outputs map[string]interface{}) error {
if execution.Status != InProgress {
return StatusError{
ExpectedStatus: InProgress,
Expand All @@ -102,17 +101,12 @@ func (execution *Execution) Complete(outputKey string, outputData map[string]int
if err != nil {
return err
}
output, err := task.GetOutput(outputKey)
if err != nil {
return err
}
if err := output.RequireData(outputData); err != nil {
if err := task.RequireOutputs(outputs); err != nil {
return err
}

execution.ExecutionDuration = time.Since(execution.ExecutedAt)
execution.OutputKey = outputKey
execution.OutputData = outputData
execution.Outputs = outputs
execution.Status = Completed
return nil
}
Expand Down
32 changes: 3 additions & 29 deletions execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,38 +139,15 @@ func TestComplete(t *testing.T) {
e.Execute()
tests := []struct {
name string
key string
data map[string]interface{}
err error
}{
{
name: "task output not found because of empty output key",
key: "",
data: map[string]interface{}{},
err: &service.TaskOutputNotFoundError{
TaskKey: taskKey,
TaskOutputKey: "",
ServiceName: serviceName,
},
},
{
name: "task output not found because wrong output key",
key: "notfound",
data: map[string]interface{}{"foo": "bar"},
err: &service.TaskOutputNotFoundError{
TaskKey: taskKey,
TaskOutputKey: "notfound",
ServiceName: serviceName,
},
},
{
name: "invalid task output",
key: "success",
data: map[string]interface{}{},
err: &service.InvalidTaskOutputError{
TaskKey: taskKey,
ServiceName: serviceName,
TaskOutputKey: "success",
TaskKey: taskKey,
ServiceName: serviceName,
Warnings: []*service.ParameterWarning{
{
Key: "foo",
Expand All @@ -182,13 +159,10 @@ func TestComplete(t *testing.T) {
},
{
name: "success",
key: "success",
data: map[string]interface{}{"foo": "bar"},
err: nil,
},
{ // this one is already proccessed
name: "already executed",
key: "success",
data: map[string]interface{}{"foo": "bar"},
err: StatusError{
ExpectedStatus: InProgress,
Expand All @@ -197,7 +171,7 @@ func TestComplete(t *testing.T) {
},
}
for _, test := range tests {
err := e.Complete(test.key, test.data)
err := e.Complete(test.data)
require.Equal(t, test.err, err, test.name)
if test.err != nil {
continue
Expand Down
2 changes: 1 addition & 1 deletion interface/grpc/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *Server) ListenResult(request *coreapi.ListenResultRequest, stream corea
return ctx.Err()

case execution := <-ln.C:
outputs, err := json.Marshal(execution.OutputData)
outputs, err := json.Marshal(execution.Outputs)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion interface/grpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"encoding/json"
"errors"

"github.com/mesg-foundation/core/api"
"github.com/mesg-foundation/core/execution"
Expand Down Expand Up @@ -69,5 +70,8 @@ func (s *Server) ListenTask(request *serviceapi.ListenTaskRequest, stream servic

// SubmitResult submits results of an execution.
func (s *Server) SubmitResult(context context.Context, request *serviceapi.SubmitResultRequest) (*serviceapi.SubmitResultReply, error) {
return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, request.OutputKey, []byte(request.OutputData))
if request.OutputKey == "error" {
return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, nil, errors.New(request.OutputData))
}
return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionID, []byte(request.OutputData), nil)
}
49 changes: 1 addition & 48 deletions interface/grpc/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func TestSubmit(t *testing.T) {
"data": map[string]interface{}{},
"headers": map[string]interface{}{},
}
outputKey = "success"
outputData = `{"foo":{}}`
server, closer = newServer(t)
)
Expand All @@ -171,15 +170,13 @@ func TestSubmit(t *testing.T) {

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
OutputKey: outputKey,
OutputData: outputData,
})
require.NoError(t, err)

execution := <-ln.C
require.Equal(t, executionID, execution.ID)
require.Equal(t, outputKey, execution.OutputKey)
require.Equal(t, outputData, jsonMarshal(t, execution.OutputData))
require.Equal(t, outputData, jsonMarshal(t, execution.Outputs))
}

func TestSubmitWithInvalidJSON(t *testing.T) {
Expand All @@ -190,7 +187,6 @@ func TestSubmitWithInvalidJSON(t *testing.T) {
"data": map[string]interface{}{},
"headers": map[string]interface{}{},
}
outputKey = "result"
server, closer = newServer(t)
)
defer closer()
Expand All @@ -208,15 +204,13 @@ func TestSubmitWithInvalidJSON(t *testing.T) {

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
OutputKey: outputKey,
OutputData: "",
})
require.Equal(t, "invalid output data error: unexpected end of JSON input", err.Error())
}

func TestSubmitWithInvalidID(t *testing.T) {
var (
outputKey = "output"
outputData = "{}"
executionID = "1"
server, closer = newServer(t)
Expand All @@ -225,49 +219,11 @@ func TestSubmitWithInvalidID(t *testing.T) {

_, err := server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
OutputKey: outputKey,
OutputData: outputData,
})
require.Error(t, err)
}

func TestSubmitWithNonExistentOutputKey(t *testing.T) {
var (
taskKey = "call"
taskData = map[string]interface{}{
"url": "https://mesg.com",
"data": map[string]interface{}{},
"headers": map[string]interface{}{},
}
outputKey = "nonExistent"
outputData = `{"foo":{}}`
server, closer = newServer(t)
)
defer closer()

s, validationErr, err := server.api.DeployService(serviceTar(t, taskServicePath), nil)
require.Zero(t, validationErr)
require.NoError(t, err)
defer server.api.DeleteService(s.Hash, false)

require.NoError(t, server.api.StartService(s.Hash))
defer server.api.StopService(s.Hash)

executionID, err := server.api.ExecuteTask(s.Hash, taskKey, taskData, nil)
require.NoError(t, err)

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
OutputKey: outputKey,
OutputData: outputData,
})
require.Error(t, err)
notFoundErr, ok := err.(*service.TaskOutputNotFoundError)
require.True(t, ok)
require.Equal(t, outputKey, notFoundErr.TaskOutputKey)
require.Equal(t, s.Name, notFoundErr.ServiceName)
}

func TestSubmitWithInvalidTaskOutputs(t *testing.T) {
var (
taskKey = "call"
Expand All @@ -276,7 +232,6 @@ func TestSubmitWithInvalidTaskOutputs(t *testing.T) {
"data": map[string]interface{}{},
"headers": map[string]interface{}{},
}
outputKey = "success"
outputData = `{"foo":1}`
server, closer = newServer(t)
)
Expand All @@ -295,13 +250,11 @@ func TestSubmitWithInvalidTaskOutputs(t *testing.T) {

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionID: executionID,
OutputKey: outputKey,
OutputData: outputData,
})
require.Error(t, err)
invalidErr, ok := err.(*service.InvalidTaskOutputError)
require.True(t, ok)
require.Equal(t, taskKey, invalidErr.TaskKey)
require.Equal(t, outputKey, invalidErr.TaskOutputKey)
require.Equal(t, s.Name, invalidErr.ServiceName)
}
22 changes: 4 additions & 18 deletions service/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@ func (e *TaskNotFoundError) Error() string {
return fmt.Sprintf("Task %q not found in service %q", e.TaskKey, e.ServiceName)
}

// TaskOutputNotFoundError is an error returned when service doesn't contain corresponding output.
type TaskOutputNotFoundError struct {
TaskKey string
TaskOutputKey string
ServiceName string
}

func (e *TaskOutputNotFoundError) Error() string {
return fmt.Sprintf("Output %q of task %q not found in service %q", e.TaskOutputKey, e.TaskKey,
e.ServiceName)
}

// InvalidEventDataError is an error returned when the data of corresponding event is not valid.
type InvalidEventDataError struct {
EventKey string
Expand Down Expand Up @@ -66,15 +54,13 @@ func (e *InvalidTaskInputError) Error() string {

// InvalidTaskOutputError is an error returned when the outputs of corresponding task are not valid.
type InvalidTaskOutputError struct {
TaskKey string
TaskOutputKey string
ServiceName string
Warnings []*ParameterWarning
TaskKey string
ServiceName string
Warnings []*ParameterWarning
}

func (e *InvalidTaskOutputError) Error() string {
s := fmt.Sprintf("Outputs %q of task %q are invalid in service %q", e.TaskOutputKey, e.TaskKey,
e.ServiceName)
s := fmt.Sprintf("Outputs of task %q are invalid in service %q", e.TaskKey, e.ServiceName)
for _, warning := range e.Warnings {
s = fmt.Sprintf("%s. %s", s, warning)
}
Expand Down
17 changes: 3 additions & 14 deletions service/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,15 @@ func TestInvalidTaskInputError(t *testing.T) {
tests.assert(t, err.Error())
}

// Test OutputNotFoundError
func TestOutputNotFoundError(t *testing.T) {
err := TaskOutputNotFoundError{
TaskKey: "TaskKey",
TaskOutputKey: "OutputKey",
ServiceName: "TestOutputNotFoundError",
}
require.Equal(t, `Output "OutputKey" of task "TaskKey" not found in service "TestOutputNotFoundError"`, err.Error())
}

// Test InvalidOutputDataError
func TestInvalidOutputDataError(t *testing.T) {
tests := newParameterTestCases()
err := InvalidTaskOutputError{
TaskKey: "TaskKey",
TaskOutputKey: "OutputKey",
ServiceName: "TestInvalidOutputDataError",
TaskKey: "TaskKey",
ServiceName: "TestInvalidOutputDataError",
Warnings: validateParametersSchema(tests.parameterTestsToSliceParameters(),
tests.parameterTestsToMapData()),
}
require.Contains(t, err.Error(), `Outputs "OutputKey" of task "TaskKey" are invalid in service "TestInvalidOutputDataError"`)
require.Contains(t, err.Error(), `Outputs of task "TaskKey" are invalid in service "TestInvalidOutputDataError"`)
tests.assert(t, err.Error())
}
Loading