Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service: update error types to be more logicless, simplify validations #402

Merged
merged 11 commits into from
Aug 29, 2018
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- setup_remote_docker
- run: go get -t ./...
- run: docker swarm init
- run: docker pull nginx
- run: docker pull nginx:stable-alpine
- run: docker pull alpine
- run: env MESG_CORE_IMAGE=mesg/core:$CIRCLE_SHA1 go test -timeout 180s -p 1 -coverprofile=coverage.txt ./...
- run: bash <(curl -s https://codecov.io/bash)
Expand Down
4 changes: 2 additions & 2 deletions api/emit_event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ func newEventEmitter(api *API) *eventEmitter {

// Emit emits a MESG event eventKey with eventData for service token.
func (e *eventEmitter) Emit(token, eventKey string, eventData map[string]interface{}) error {
service, err := services.Get(token)
s, err := services.Get(token)
if err != nil {
return err
}
event, err := event.Create(&service, eventKey, eventData)
event, err := event.Create(&s, eventKey, eventData)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions api/execute_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (e *taskExecutor) checkServiceStatus(s *service.Service) error {
}

// execute executes task.
func (e *taskExecutor) execute(s *service.Service, key string, inputs map[string]interface{},
func (e *taskExecutor) execute(s *service.Service, taskKey string, taskInputs map[string]interface{},
tags []string) (executionID string, err error) {
exc, err := execution.Create(s, key, inputs, tags)
exc, err := execution.Create(s, taskKey, taskInputs, tags)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion api/execute_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCheckService(t *testing.T) {
Name: "TestCheckService",
Dependencies: map[string]*service.Dependency{
"test": {
Image: "nginx",
Image: "nginx:stable-alpine",
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions container/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func startTestService(name []string) (string, error) {
return "", err
}
return c.StartService(ServiceOptions{
Image: "nginx",
Image: "nginx:stable-alpine",
Namespace: name,
})
}
Expand Down Expand Up @@ -137,14 +137,14 @@ func TestIntegrationListServices(t *testing.T) {
c, err := New()
require.Nil(t, err)
c.StartService(ServiceOptions{
Image: "nginx",
Image: "nginx:stable-alpine",
Namespace: []string{"TestListServices"},
Labels: map[string]string{
"label_name": "value_1",
},
})
c.StartService(ServiceOptions{
Image: "nginx",
Image: "nginx:stable-alpine",
Namespace: []string{"TestListServiceswithValue2"},
Labels: map[string]string{
"label_name_2": "value_2",
Expand Down
2 changes: 1 addition & 1 deletion container/service_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestServiceOptionNamespace(t *testing.T) {
}

func TestServiceOptionImage(t *testing.T) {
image := "nginx"
image := "nginx:stable-alpine"
options := &ServiceOptions{
Image: image,
}
Expand Down
2 changes: 1 addition & 1 deletion container/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestStartService(t *testing.T) {
namespace := []string{"namespace"}
containerID := "id"
options := ServiceOptions{
Image: "nginx",
Image: "nginx:stable-alpine",
Namespace: namespace,
}

Expand Down
2 changes: 1 addition & 1 deletion daemon/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func startForTest() {
}
_, err = defaultContainer.StartService(container.ServiceOptions{
Namespace: Namespace(),
Image: "nginx",
Image: "nginx:stable-alpine",
NetworksID: []string{sharedNetworkID},
})
if err != nil {
Expand Down
23 changes: 12 additions & 11 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ type Event struct {
}

// Create creates an event.
func Create(serviceForEvent *service.Service, eventKey string, data map[string]interface{}) (*Event, error) {
serviceEvent, eventFound := serviceForEvent.Events[eventKey]
if !eventFound {
func Create(s *service.Service, eventKey string, eventData map[string]interface{}) (*Event, error) {
event, ok := s.Events[eventKey]
if !ok {
return nil, &service.EventNotFoundError{
Service: serviceForEvent,
EventKey: eventKey,
EventKey: eventKey,
ServiceName: s.Name,
}
}
if !serviceEvent.IsValid(data) {
warnings := s.ValidateParametersSchema(event.Data, eventData)
if len(warnings) > 0 {
return nil, &service.InvalidEventDataError{
Event: serviceEvent,
EventKey: eventKey,
EventData: data,
EventKey: eventKey,
ServiceName: s.Name,
Warnings: warnings,
}
}
return &Event{
Service: serviceForEvent,
Service: s,
Key: eventKey,
Data: data,
Data: eventData,
CreatedAt: time.Now(),
}, nil
}
Expand Down
35 changes: 24 additions & 11 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,35 @@ func TestCreate(t *testing.T) {
}

func TestCreateNotPresentEvent(t *testing.T) {
var (
serviceName = "TestCreateNotPresentEvent"
eventName = "test"
invalidEventName = "testInvalid"
)
s := service.Service{
Name: "TestCreateNotPresentEvent",
Name: serviceName,
Events: map[string]*service.Event{
"test": {},
eventName: {},
},
}
var data map[string]interface{}
_, err := Create(&s, "testinvalid", data)
require.NotNil(t, err)
_, notFound := err.(*service.EventNotFoundError)
require.True(t, notFound)
_, err := Create(&s, invalidEventName, data)
require.Error(t, err)
notFoundErr, ok := err.(*service.EventNotFoundError)
require.True(t, ok)
require.Equal(t, invalidEventName, notFoundErr.EventKey)
require.Equal(t, serviceName, notFoundErr.ServiceName)
}

func TestCreateInvalidData(t *testing.T) {
var (
eventName = "test"
serviceName = "TestCreateInvalidData"
)
s := service.Service{
Name: "TestCreateInvalidData",
Name: serviceName,
Events: map[string]*service.Event{
"test": {
eventName: {
Data: map[string]*service.Parameter{
"xxx": {},
},
Expand All @@ -50,7 +61,9 @@ func TestCreateInvalidData(t *testing.T) {
}
var data map[string]interface{}
_, err := Create(&s, "test", data)
require.NotNil(t, err)
_, invalid := err.(*service.InvalidEventDataError)
require.True(t, invalid)
require.Error(t, err)
invalidErr, ok := err.(*service.InvalidEventDataError)
require.True(t, ok)
require.Equal(t, eventName, invalidErr.EventKey)
require.Equal(t, serviceName, invalidErr.ServiceName)
}
35 changes: 18 additions & 17 deletions execution/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,31 @@ import (
)

// Complete marks an execution as complete and puts into the list of processed tasks.
func (execution *Execution) Complete(output string, data map[string]interface{}) error {
serviceOutput, outputFound := execution.Service.Tasks[execution.Task].Outputs[output]
if !outputFound {
return &service.OutputNotFoundError{
Service: execution.Service,
OutputKey: output,
TaskKey: execution.Task,
func (execution *Execution) Complete(outputKey string, outputData map[string]interface{}) error {
output, ok := execution.Service.Tasks[execution.Task].Outputs[outputKey]
if !ok {
return &service.TaskOutputNotFoundError{
TaskKey: execution.Task,
TaskOutputKey: outputKey,
ServiceName: execution.Service.Name,
}
}
if !serviceOutput.IsValid(data) {
return &service.InvalidOutputDataError{
Output: serviceOutput,
TaskKey: execution.Task,
OutputKey: output,
OutputData: data,
warnings := execution.Service.ValidateParametersSchema(output.Data, outputData)
if len(warnings) > 0 {
return &service.InvalidTaskOutputError{
TaskKey: execution.Task,
TaskOutputKey: outputKey,
ServiceName: execution.Service.Name,
Warnings: warnings,
}
}
err := execution.moveFromInProgressToProcessed()
if err != nil {

if err := execution.moveFromInProgressToProcessed(); err != nil {
return err
}
execution.ExecutionDuration = time.Since(execution.ExecutedAt)
execution.Output = output
execution.OutputData = data
execution.Output = outputKey
execution.OutputData = outputData

go pubsub.Publish(execution.Service.ResultSubscriptionChannel(), execution)
return nil
Expand Down
78 changes: 46 additions & 32 deletions execution/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,67 +29,81 @@ func TestComplete(t *testing.T) {
require.True(t, execution.ExecutionDuration > 0)
}

func TestCompleteNotFound(t *testing.T) {
func TestCompleteNotProcessed(t *testing.T) {
s := service.Service{
Name: "TestCompleteNotFound",
Name: "TestCompleteNotProcessed",
Tasks: map[string]*service.Task{
"test": {},
"test": {
Outputs: map[string]*service.Output{
"output": {},
},
},
},
}
var inputs map[string]interface{}
execution, _ := Create(&s, "test", inputs, []string{})
execution.Execute()
var outputs map[string]interface{}
err := execution.Complete("output", outputs)
require.NotNil(t, err)
x, missingOutputError := err.(*service.OutputNotFoundError)
require.True(t, missingOutputError)
require.Equal(t, "output", x.OutputKey)
x, notInQueueError := err.(*NotInQueueError)
require.True(t, notInQueueError)
require.Equal(t, "inProgress", x.Queue)
}

func TestCompleteInvalidOutputs(t *testing.T) {
func TestCompleteNotFound(t *testing.T) {
var (
taskKey = "test"
outputKey = "output"
serviceName = "TestCompleteNotFound"
)
s := service.Service{
Name: "TestCompleteInvalidOutputs",
Name: serviceName,
Tasks: map[string]*service.Task{
"test": {
Outputs: map[string]*service.Output{
"output": {
Data: map[string]*service.Parameter{
"foo": {},
},
},
},
},
taskKey: {},
},
}
var inputs map[string]interface{}
execution, _ := Create(&s, "test", inputs, []string{})
execution, _ := Create(&s, taskKey, inputs, []string{})
execution.Execute()
var outputs map[string]interface{}
err := execution.Complete("output", outputs)
err := execution.Complete(outputKey, outputs)
require.NotNil(t, err)
x, invalidOutputError := err.(*service.InvalidOutputDataError)
require.True(t, invalidOutputError)
require.Equal(t, "output", x.OutputKey)
notFoundErr, ok := err.(*service.TaskOutputNotFoundError)
require.True(t, ok)
require.Equal(t, taskKey, notFoundErr.TaskKey)
require.Equal(t, outputKey, notFoundErr.TaskOutputKey)
require.Equal(t, serviceName, notFoundErr.ServiceName)
}

func TestCompleteNotProcessed(t *testing.T) {
func TestCompleteInvalidOutputs(t *testing.T) {
var (
taskKey = "test"
outputKey = "output"
serviceName = "TestCompleteInvalidOutputs"
)
s := service.Service{
Name: "TestCompleteNotProcessed",
Name: serviceName,
Tasks: map[string]*service.Task{
"test": {
taskKey: {
Outputs: map[string]*service.Output{
"output": {},
outputKey: {
Data: map[string]*service.Parameter{
"foo": {},
},
},
},
},
},
}
var inputs map[string]interface{}
execution, _ := Create(&s, "test", inputs, []string{})
execution, _ := Create(&s, taskKey, inputs, []string{})
execution.Execute()
var outputs map[string]interface{}
err := execution.Complete("output", outputs)
err := execution.Complete(outputKey, outputs)
require.NotNil(t, err)
x, notInQueueError := err.(*NotInQueueError)
require.True(t, notInQueueError)
require.Equal(t, "inProgress", x.Queue)
invalidErr, ok := err.(*service.InvalidTaskOutputError)
require.True(t, ok)
require.Equal(t, taskKey, invalidErr.TaskKey)
require.Equal(t, outputKey, invalidErr.TaskOutputKey)
require.Equal(t, serviceName, invalidErr.ServiceName)
}
25 changes: 13 additions & 12 deletions execution/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,26 @@ import (
)

// Create creates an execution with a unique ID and puts it in the pending list.
func Create(serviceForExecution *service.Service, task string, inputs map[string]interface{}, tags []string) (*Execution, error) {
serviceTask, taskFound := serviceForExecution.Tasks[task]
if !taskFound {
func Create(s *service.Service, taskKey string, taskInputs map[string]interface{}, tags []string) (*Execution, error) {
task, ok := s.Tasks[taskKey]
if !ok {
return nil, &service.TaskNotFoundError{
Service: serviceForExecution,
TaskKey: task,
TaskKey: taskKey,
ServiceName: s.Name,
}
}
if !serviceTask.IsValid(inputs) {
warnings := s.ValidateParametersSchema(task.Inputs, taskInputs)
if len(warnings) > 0 {
return nil, &service.InvalidTaskInputError{
Task: serviceTask,
TaskKey: task,
InputData: inputs,
TaskKey: taskKey,
ServiceName: s.Name,
Warnings: warnings,
}
}
execution := &Execution{
Service: serviceForExecution,
Inputs: inputs,
Task: task,
Service: s,
Inputs: taskInputs,
Task: taskKey,
Tags: tags,
CreatedAt: time.Now(),
}
Expand Down
Loading