From efba6b6b8e13f5262aaebbb2b0d227ab195458d5 Mon Sep 17 00:00:00 2001 From: Nicolas Mahe Date: Fri, 4 Jan 2019 18:38:04 +0700 Subject: [PATCH] Remove gRPC client that expose a worflow-like logic --- interface/grpc/client/client.go | 38 -------- interface/grpc/client/client_test.go | 20 ----- interface/grpc/client/service.go | 52 ----------- interface/grpc/client/service_test.go | 60 ------------- interface/grpc/client/task.go | 45 ---------- interface/grpc/client/task_test.go | 76 ---------------- interface/grpc/client/types.go | 40 --------- interface/grpc/client/workflow.go | 118 ------------------------- interface/grpc/client/workflow_test.go | 82 ----------------- 9 files changed, 531 deletions(-) delete mode 100644 interface/grpc/client/client.go delete mode 100644 interface/grpc/client/client_test.go delete mode 100644 interface/grpc/client/service.go delete mode 100644 interface/grpc/client/service_test.go delete mode 100644 interface/grpc/client/task.go delete mode 100644 interface/grpc/client/task_test.go delete mode 100644 interface/grpc/client/types.go delete mode 100644 interface/grpc/client/workflow.go delete mode 100644 interface/grpc/client/workflow_test.go diff --git a/interface/grpc/client/client.go b/interface/grpc/client/client.go deleted file mode 100644 index b6671028f..000000000 --- a/interface/grpc/client/client.go +++ /dev/null @@ -1,38 +0,0 @@ -package client - -import ( - "fmt" - "os" - "sync" - - "github.com/mesg-foundation/core/config" - "github.com/mesg-foundation/core/protobuf/coreapi" - "github.com/mesg-foundation/core/utils/clierrors" - "google.golang.org/grpc" -) - -var _client coreapi.CoreClient -var once sync.Once - -// API returns the client necessary to access the API -func API() (coreapi.CoreClient, error) { - return getClient() -} - -func getClient() (cli coreapi.CoreClient, err error) { - once.Do(func() { - c, err := config.Global() - if err != nil { - fmt.Fprintln(os.Stderr, clierrors.ErrorMessage(err)) - os.Exit(1) - } - var connection *grpc.ClientConn - connection, err = grpc.Dial(c.Client.Address, grpc.WithInsecure()) - if err != nil { - return - } - _client = coreapi.NewCoreClient(connection) - }) - cli = _client - return -} diff --git a/interface/grpc/client/client_test.go b/interface/grpc/client/client_test.go deleted file mode 100644 index 3bc8b6631..000000000 --- a/interface/grpc/client/client_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package client - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestAPI(t *testing.T) { - api, err := API() - require.NoError(t, err) - require.NotNil(t, api) -} - -func TestGetClient(t *testing.T) { - c, err := getClient() - require.NoError(t, err) - require.NotNil(t, c) - require.NotNil(t, _client) -} diff --git a/interface/grpc/client/service.go b/interface/grpc/client/service.go deleted file mode 100644 index f51a8ed00..000000000 --- a/interface/grpc/client/service.go +++ /dev/null @@ -1,52 +0,0 @@ -package client - -import ( - "context" - - "github.com/mesg-foundation/core/protobuf/coreapi" -) - -func (wf *Workflow) services() (services []string) { - presence := make(map[string]bool) - if wf.OnEvent != nil && !presence[wf.OnEvent.ServiceID] { - services = append(services, wf.OnEvent.ServiceID) - presence[wf.OnEvent.ServiceID] = true - } - if wf.OnResult != nil && !presence[wf.OnResult.ServiceID] { - services = append(services, wf.OnResult.ServiceID) - presence[wf.OnResult.ServiceID] = true - } - if wf.Execute != nil && !presence[wf.Execute.ServiceID] { - services = append(services, wf.Execute.ServiceID) - presence[wf.Execute.ServiceID] = true - } - return -} - -func iterateService(wf *Workflow, action func(string) error) (err error) { - for _, ID := range wf.services() { - err = action(ID) - if err != nil { - break - } - } - return -} - -func startServices(wf *Workflow) error { - return iterateService(wf, func(ID string) (err error) { - _, err = wf.client.StartService(context.Background(), &coreapi.StartServiceRequest{ - ServiceID: ID, - }) - return - }) -} - -func stopServices(wf *Workflow) error { - return iterateService(wf, func(ID string) (err error) { - _, err = wf.client.StopService(context.Background(), &coreapi.StopServiceRequest{ - ServiceID: ID, - }) - return - }) -} diff --git a/interface/grpc/client/service_test.go b/interface/grpc/client/service_test.go deleted file mode 100644 index 19aa07ca6..000000000 --- a/interface/grpc/client/service_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package client - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestServices(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{ServiceID: "xxx"}, - OnResult: &Result{ServiceID: "yyy"}, - Execute: &Task{ServiceID: "zzz"}, - } - services := wf.services() - require.Equal(t, len(services), 3) - require.Equal(t, services[0], "xxx") - require.Equal(t, services[1], "yyy") - require.Equal(t, services[2], "zzz") -} - -func TestServicesDuplicate(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{ServiceID: "xxx"}, - OnResult: &Result{ServiceID: "yyy"}, - Execute: &Task{ServiceID: "xxx"}, - } - services := wf.services() - require.Equal(t, len(services), 2) - require.Equal(t, services[0], "xxx") - require.Equal(t, services[1], "yyy") -} - -func TestIterateService(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{ServiceID: "xxx"}, - OnResult: &Result{ServiceID: "yyy"}, - Execute: &Task{ServiceID: "zzz"}, - } - cpt := 0 - err := iterateService(wf, func(ID string) error { - cpt++ - return nil - }) - require.NoError(t, err) - require.Equal(t, cpt, 3) -} - -func TestIterateServiceWithError(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{ServiceID: "xxx"}, - OnResult: &Result{ServiceID: "yyy"}, - Execute: &Task{ServiceID: "zzz"}, - } - err := iterateService(wf, func(ID string) error { - return errors.New("test error") - }) - require.Error(t, err) -} diff --git a/interface/grpc/client/task.go b/interface/grpc/client/task.go deleted file mode 100644 index bb43c351a..000000000 --- a/interface/grpc/client/task.go +++ /dev/null @@ -1,45 +0,0 @@ -package client - -import ( - "context" - "encoding/json" - - "github.com/mesg-foundation/core/protobuf/coreapi" -) - -func (task *Task) processEvent(wf *Workflow, data *coreapi.EventData) (err error) { - return task.process(wf, data.EventData) -} - -func (task *Task) processResult(wf *Workflow, data *coreapi.ResultData) (err error) { - return task.process(wf, data.OutputData) -} - -func (task *Task) process(wf *Workflow, data string) (err error) { - var d interface{} - err = json.Unmarshal([]byte(data), &d) - if err != nil { - return - } - inputData, err := task.convertData(d) - if err != nil { - return - } - _, err = wf.client.ExecuteTask(context.Background(), &coreapi.ExecuteTaskRequest{ - ServiceID: task.ServiceID, - TaskKey: task.Name, - InputData: inputData, - }) - return -} - -func (task *Task) convertData(data interface{}) (res string, err error) { - inputData := task.Inputs(data) - var inputDataJSON []byte - inputDataJSON, err = json.Marshal(inputData) - if err != nil { - return - } - res = string(inputDataJSON) - return -} diff --git a/interface/grpc/client/task_test.go b/interface/grpc/client/task_test.go deleted file mode 100644 index 1249ea36b..000000000 --- a/interface/grpc/client/task_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package client - -import ( - "testing" - - "github.com/mesg-foundation/core/protobuf/coreapi" - "github.com/stretchr/testify/require" -) - -func TestProcessEventWithInvalidEventData(t *testing.T) { - wf := &Workflow{ - Execute: &Task{ - Name: "TestProcessEventWithInvalidEventData", - ServiceID: "xxx", - }, - } - data := &coreapi.EventData{ - EventKey: "EventX", - EventData: "", - } - err := wf.Execute.processEvent(wf, data) - require.Equal(t, err.Error(), "unexpected end of JSON input") -} - -func TestProcessResulsWithInvalidEventData(t *testing.T) { - wf := &Workflow{ - Execute: &Task{ - Name: "TestProcessResulsWithInvalidEventData", - ServiceID: "xxx", - }, - } - data := &coreapi.ResultData{ - ExecutionID: "xxx", - OutputData: "", - OutputKey: "outputx", - TaskKey: "taskx", - } - err := wf.Execute.processResult(wf, data) - require.Equal(t, err.Error(), "unexpected end of JSON input") -} - -func TestConvertData(t *testing.T) { - task := &Task{ - Inputs: func(interface{}) interface{} { - return "bar" - }, - } - res, err := task.convertData("foo") - require.NoError(t, err) - require.Equal(t, res, "\"bar\"") -} - -func TestConvertDataObject(t *testing.T) { - task := &Task{ - Inputs: func(d interface{}) interface{} { - return d - }, - } - res, err := task.convertData(map[string]interface{}{ - "foo": "bar", - "number": 42, - }) - require.NoError(t, err) - require.Equal(t, res, "{\"foo\":\"bar\",\"number\":42}") -} - -func TestConvertDataWithNull(t *testing.T) { - task := &Task{ - Inputs: func(d interface{}) interface{} { - return nil - }, - } - res, err := task.convertData("xxx") - require.NoError(t, err) - require.Equal(t, res, "null") -} diff --git a/interface/grpc/client/types.go b/interface/grpc/client/types.go deleted file mode 100644 index 103ec3ff7..000000000 --- a/interface/grpc/client/types.go +++ /dev/null @@ -1,40 +0,0 @@ -package client - -import "github.com/mesg-foundation/core/protobuf/coreapi" - -// Workflow is a struct that contains all the details of -// a workflow. A workflow contains an event source and -// triggers one or multiple tasks. The workflow is what -// is created on the **when** -type Workflow struct { - OnEvent *Event - OnResult *Result - Execute *Task - client coreapi.CoreClient -} - -// Task is a struct that contains the details of a task -// a task should be associated to a workflow. -// A task is corresponding to the **then** in a workflow -type Task struct { - ServiceID string - Name string - Inputs func(interface{}) interface{} -} - -// Event is a struct that contains all the informations -// to start a workflow. This is the **when** in the -// workflow -type Event struct { - ServiceID string - Name string -} - -// Result is a struct that contains all the informations -// to start a workflow. This is the **when** in the -// workflow -type Result struct { - ServiceID string - Name string - Output string -} diff --git a/interface/grpc/client/workflow.go b/interface/grpc/client/workflow.go deleted file mode 100644 index de9a3e0c1..000000000 --- a/interface/grpc/client/workflow.go +++ /dev/null @@ -1,118 +0,0 @@ -package client - -import ( - "context" - "errors" - "strings" - - "github.com/mesg-foundation/core/protobuf/coreapi" -) - -// Start is the function to start the workflow -func (wf *Workflow) Start() (err error) { - switch { - case wf.Execute == nil: - err = errors.New("a workflow needs a task") - case wf.OnEvent == nil && wf.OnResult == nil: - err = errors.New("a workflow needs an event OnEvent or OnResult") - } - if err != nil { - return - } - - wf.client, err = getClient() - if err != nil { - return - } - err = startServices(wf) - if err != nil { - return - } - if wf.OnEvent != nil { - err = listenEvents(wf) - } else { - err = listenResults(wf) - } - return -} - -// Stop will stop all the services in your workflow -func (wf *Workflow) Stop() (err error) { - err = stopServices(wf) - return -} - -func listenEvents(wf *Workflow) (err error) { - if wf.OnEvent.Name == "" { - err = errors.New("event's Name should be defined (you can use * to react to any event)") - return - } - stream, err := wf.client.ListenEvent(context.Background(), &coreapi.ListenEventRequest{ - ServiceID: wf.OnEvent.ServiceID, - }) - if err != nil { - return - } - - for { - var data *coreapi.EventData - data, err = stream.Recv() - if err != nil { - break - } - if wf.validEvent(data) { - err = wf.Execute.processEvent(wf, data) - if err != nil { - break - } - } - } - return -} - -func (wf *Workflow) validEvent(data *coreapi.EventData) bool { - if strings.Compare(wf.OnEvent.Name, "*") == 0 { - return true - } - return strings.Compare(wf.OnEvent.Name, data.EventKey) == 0 -} - -func listenResults(wf *Workflow) (err error) { - if wf.OnResult.Name == "" || wf.OnResult.Output == "" { - err = errors.New("result's Name and Output should be defined (you can use * to react to any result)") - return - } - stream, err := wf.client.ListenResult(context.Background(), &coreapi.ListenResultRequest{ - ServiceID: wf.OnResult.ServiceID, - }) - if err != nil { - return - } - - for { - var data *coreapi.ResultData - data, err = stream.Recv() - if err != nil { - break - } - if wf.validResult(data) { - err = wf.Execute.processResult(wf, data) - if err != nil { - break - } - } - } - return -} - -func (wf *Workflow) validResult(data *coreapi.ResultData) bool { - validName := strings.Compare(wf.OnResult.Name, "*") == 0 - if !validName { - validName = strings.Compare(wf.OnResult.Name, data.TaskKey) == 0 - } - validOutput := strings.Compare(wf.OnResult.Output, "*") == 0 - if !validOutput { - validOutput = strings.Compare(wf.OnResult.Output, data.OutputKey) == 0 - } - return validName && validOutput -} diff --git a/interface/grpc/client/workflow_test.go b/interface/grpc/client/workflow_test.go deleted file mode 100644 index ac381506c..000000000 --- a/interface/grpc/client/workflow_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package client - -import ( - "testing" - - "github.com/mesg-foundation/core/protobuf/coreapi" - "github.com/stretchr/testify/require" -) - -func TestValidEventFromAny(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{Name: "*"}, - } - require.True(t, wf.validEvent(&coreapi.EventData{EventKey: "xxx"})) -} - -func TestValidEventFromValue(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{Name: "xxx"}, - } - require.True(t, wf.validEvent(&coreapi.EventData{EventKey: "xxx"})) - require.False(t, wf.validEvent(&coreapi.EventData{EventKey: "yyy"})) -} - -func TestValidResultFromAnyNameAndAnyOutput(t *testing.T) { - wf := &Workflow{ - OnResult: &Result{Name: "*", Output: "*"}, - } - require.True(t, wf.validResult(&coreapi.ResultData{TaskKey: "xxx", OutputKey: "xxx"})) -} - -func TestValidResultFromAnyNameAndNotAnyOutput(t *testing.T) { - wf := &Workflow{ - OnResult: &Result{Name: "*", Output: "xxx"}, - } - require.True(t, wf.validResult(&coreapi.ResultData{TaskKey: "xxx", OutputKey: "xxx"})) - require.False(t, wf.validResult(&coreapi.ResultData{TaskKey: "yyy", OutputKey: "yyy"})) -} - -func TestValidResultFromNotAnyNameAndAnyOutput(t *testing.T) { - wf := &Workflow{ - OnResult: &Result{Name: "xxx", Output: "*"}, - } - require.True(t, wf.validResult(&coreapi.ResultData{TaskKey: "xxx", OutputKey: "xxx"})) - require.False(t, wf.validResult(&coreapi.ResultData{TaskKey: "yyy", OutputKey: "yyy"})) -} - -func TestValidResultFromNotAnyNameAndNotAnyOutput(t *testing.T) { - wf := &Workflow{ - OnResult: &Result{Name: "xxx", Output: "yyy"}, - } - require.True(t, wf.validResult(&coreapi.ResultData{TaskKey: "xxx", OutputKey: "yyy"})) - require.False(t, wf.validResult(&coreapi.ResultData{TaskKey: "yyy", OutputKey: "yyy"})) - require.False(t, wf.validResult(&coreapi.ResultData{TaskKey: "xxx", OutputKey: "xxx"})) - require.False(t, wf.validResult(&coreapi.ResultData{TaskKey: "yyy", OutputKey: "xxx"})) -} - -func TestInvalidListenResult(t *testing.T) { - wf := &Workflow{ - OnResult: &Result{}, - } - require.NotNil(t, listenResults(wf)) -} - -func TestInvalidListenEvent(t *testing.T) { - wf := &Workflow{ - OnEvent: &Event{}, - } - require.NotNil(t, listenEvents(wf)) -} - -func TestInvalidWorkflowWithNoExecute(t *testing.T) { - wf := Workflow{OnEvent: &Event{}} - err := wf.Start() - require.Equal(t, err.Error(), "a workflow needs a task") -} - -func TestInvalidWorkflowWithNoEvent(t *testing.T) { - wf := Workflow{Execute: &Task{}} - err := wf.Start() - require.Equal(t, err.Error(), "a workflow needs an event OnEvent or OnResult") -}