Skip to content

Commit

Permalink
Merge pull request #179 from mesg-foundation/23-filter-message-from-s…
Browse files Browse the repository at this point in the history
…ervices

API filter on events and results
  • Loading branch information
NicolasMahe authored Jun 5, 2018
2 parents d313837 + dacfc34 commit e2cd65a
Show file tree
Hide file tree
Showing 26 changed files with 512 additions and 145 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

#### Added
- (#174) Add CHANGELOG.md file
- (#179) [API] Add `eventFilter` on `ListenEvent` API to get notification when an event with a specific name occurs
[API] Add `taskFilter` on `ListenResult` API to get notification when a result from a specific task occurs
[API] Add `outputFilter` on `ListenResult` API to get notification when a result returns a specific output
- (#183) Add a `configuration` attribute in the `mesg.yml` file to accept docker configuration for your service
- (#187) Stop all services when the daemon stops

#### Removed

#### Fixed
- (#185) Fix logs with extra characters when `mesg-core daemon logs`
- (#179) [Doc] Outdated documentation for the CLI
- (#185) Fix logs with extra characters when `mesg-core daemon logs`
98 changes: 62 additions & 36 deletions api/core/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/core/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ service Core {

message ListenEventRequest {
string serviceID = 1;
string eventFilter = 2;
}

message ExecuteTaskRequest {
Expand All @@ -27,6 +28,8 @@ message ExecuteTaskRequest {

message ListenResultRequest {
string serviceID = 1;
string taskFilter = 2;
string outputFilter = 3;
}

message StartServiceRequest {
Expand Down
34 changes: 29 additions & 5 deletions api/core/listen_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package core

import (
"encoding/json"
"errors"

"github.com/mesg-foundation/core/database/services"
service "github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/utils/array"

"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/pubsub"
Expand All @@ -15,14 +18,35 @@ func (s *Server) ListenEvent(request *ListenEventRequest, stream Core_ListenEven
if err != nil {
return
}
if err = validateEventKey(&service, request.EventFilter); err != nil {
return
}
subscription := pubsub.Subscribe(service.EventSubscriptionChannel())
for data := range subscription {
event := data.(*event.Event)
eventData, _ := json.Marshal(event.Data)
stream.Send(&EventData{
EventKey: event.Key,
EventData: string(eventData),
})
if isSubscribedEvent(request, event) {
eventData, _ := json.Marshal(event.Data)
stream.Send(&EventData{
EventKey: event.Key,
EventData: string(eventData),
})
}
}
return
}

func validateEventKey(service *service.Service, eventKey string) (err error) {
if eventKey == "" || eventKey == "*" {
return
}
_, ok := service.Events[eventKey]
if ok {
return
}
err = errors.New("Event '" + eventKey + "' doesn't exist in this service")
return
}

func isSubscribedEvent(request *ListenEventRequest, e *event.Event) bool {
return array.IncludedIn([]string{"", "*", e.Key}, request.EventFilter)
}
39 changes: 39 additions & 0 deletions api/core/listen_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package core

import (
"testing"

"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/service"
"github.com/stvp/assert"
)

func TestValidateEventKey(t *testing.T) {
s := &service.Service{
Events: map[string]*service.Event{
"test": &service.Event{},
},
}
assert.Nil(t, validateEventKey(s, ""))
assert.Nil(t, validateEventKey(s, "*"))
assert.Nil(t, validateEventKey(s, "test"))
assert.NotNil(t, validateEventKey(s, "xxx"))
}

func TestIsSubscribedEvent(t *testing.T) {
e := &event.Event{Key: "test"}
r := &ListenEventRequest{}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: ""}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: "*"}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: "test"}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: "xxx"}
assert.False(t, isSubscribedEvent(r, e))
}
67 changes: 59 additions & 8 deletions api/core/listen_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,80 @@ package core

import (
"encoding/json"
"errors"

"github.com/mesg-foundation/core/database/services"
service "github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/utils/array"

"github.com/mesg-foundation/core/execution"
"github.com/mesg-foundation/core/pubsub"
)

// ListenResult will listne for results from a services
// ListenResult will listen for results from a services
func (s *Server) ListenResult(request *ListenResultRequest, stream Core_ListenResultServer) (err error) {
service, err := services.Get(request.ServiceID)
if err != nil {
return
}
if err = validateTaskKey(&service, request.TaskFilter); err != nil {
return
}
if err = validateOutputKey(&service, request.TaskFilter, request.OutputFilter); err != nil {
return
}
subscription := pubsub.Subscribe(service.ResultSubscriptionChannel())
for data := range subscription {
execution := data.(*execution.Execution)
outputs, _ := json.Marshal(execution.OutputData)
stream.Send(&ResultData{
ExecutionID: execution.ID,
TaskKey: execution.Task,
OutputKey: execution.Output,
OutputData: string(outputs),
})
if isSubscribedTask(request, execution) && isSubscribedOutput(request, execution) {
outputs, _ := json.Marshal(execution.OutputData)
stream.Send(&ResultData{
ExecutionID: execution.ID,
TaskKey: execution.Task,
OutputKey: execution.Output,
OutputData: string(outputs),
})
}
}
return
}

func validateTaskKey(service *service.Service, taskKey string) (err error) {
if taskKey == "" || taskKey == "*" {
return
}
_, ok := service.Tasks[taskKey]
if ok {
return
}
err = errors.New("Task '" + taskKey + "' doesn't exist in this service")
return
}

func validateOutputKey(service *service.Service, taskKey string, outputFilter string) (err error) {
if outputFilter == "" || outputFilter == "*" {
return
}
if taskKey == "" {
err = errors.New("Cannot filter output without specifying a task")
return
}
task, ok := service.Tasks[taskKey]
if !ok {
err = errors.New("Task '" + taskKey + "' doesn't exist in this service")
return
}
_, ok = task.Outputs[outputFilter]
if !ok {
err = errors.New("Output '" + outputFilter + "' doesn't exist in the task '" + taskKey + "' of this service")
}
return
}

func isSubscribedTask(request *ListenResultRequest, e *execution.Execution) bool {
return array.IncludedIn([]string{"", "*", e.Task}, request.TaskFilter)
}

func isSubscribedOutput(request *ListenResultRequest, e *execution.Execution) bool {
return array.IncludedIn([]string{"", "*", e.Output}, request.OutputFilter)
}
Loading

0 comments on commit e2cd65a

Please sign in to comment.