Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API filter on events and results #179

Merged
merged 21 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3151905
update proto build script
antho1404 May 31, 2018
538e4c4
Merge branch 'dev' into 23-filter-message-from-services
antho1404 May 31, 2018
24e9788
update test command to test existing service and keep it alive
antho1404 May 31, 2018
f9eff43
add api to filter service events
antho1404 May 31, 2018
38d808d
add test for parameter filter
antho1404 May 31, 2018
6202eb3
add filter on subscription for results
antho1404 Jun 1, 2018
adaa87b
update test command to listen resutl from task and/or output
antho1404 Jun 1, 2018
8e2b1a7
update name of filter parameters
antho1404 Jun 1, 2018
2405b3b
add changelog
antho1404 Jun 1, 2018
1c45d9c
refacto of the includedin method
antho1404 Jun 1, 2018
36d207b
Merge branch 'dev' into 23-filter-message-from-services
antho1404 Jun 1, 2018
c129726
test pgp signature
antho1404 Jun 1, 2018
09c1b88
test pgp signature
antho1404 Jun 1, 2018
71dafbe
Merge branch 'dev' into 23-filter-message-from-services
antho1404 Jun 2, 2018
40b9496
simplify validate output key function
antho1404 Jun 2, 2018
cf83ffd
Merge commit 'd313837c8e4029347980f67d76f3426b304c4d6c' into 23-filte…
NicolasMahe Jun 5, 2018
b0e36f7
Improve errors in api/core/listen_result.
NicolasMahe Jun 5, 2018
3b1c5e2
Rename service to serviceID in command test. Improve flag description.
NicolasMahe Jun 5, 2018
f055462
Improve errors in api/core/listen_event
NicolasMahe Jun 5, 2018
5f2b1a3
Rename service test flags event, result and output to event-filter, t…
NicolasMahe Jun 5, 2018
dacfc34
Fix CodeClimate issue
NicolasMahe Jun 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

#### Added
- (#174) Add CHANGELOG.md file
- (#179) [API] Add `eventFilter` on `ListenEvent` API to get notification when an event with a specific name occurs
[API] Add `taskFilter` on `ListenResult` API to get notification when a result from a specific task occurs
[API] Add `outputFilter` on `ListenResult` API to get notification when a result returns a specific output
- (#183) Add a `configuration` attribute in the `mesg.yml` file to accept docker configuration for your service
- (#187) Stop all services when the daemon stops

#### Removed

#### Fixed
- (#185) Fix logs with extra characters when `mesg-core daemon logs`
- (#179) [Doc] Outdated documentation for the CLI
- (#185) Fix logs with extra characters when `mesg-core daemon logs`
98 changes: 62 additions & 36 deletions api/core/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/core/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ service Core {

message ListenEventRequest {
string serviceID = 1;
string eventFilter = 2;
}

message ExecuteTaskRequest {
Expand All @@ -27,6 +28,8 @@ message ExecuteTaskRequest {

message ListenResultRequest {
string serviceID = 1;
string taskFilter = 2;
string outputFilter = 3;
}

message StartServiceRequest {
Expand Down
34 changes: 29 additions & 5 deletions api/core/listen_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package core

import (
"encoding/json"
"errors"

"github.com/mesg-foundation/core/database/services"
service "github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/utils/array"

"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/pubsub"
Expand All @@ -15,14 +18,35 @@ func (s *Server) ListenEvent(request *ListenEventRequest, stream Core_ListenEven
if err != nil {
return
}
if err = validateEventKey(&service, request.EventFilter); err != nil {
return
}
subscription := pubsub.Subscribe(service.EventSubscriptionChannel())
for data := range subscription {
event := data.(*event.Event)
eventData, _ := json.Marshal(event.Data)
stream.Send(&EventData{
EventKey: event.Key,
EventData: string(eventData),
})
if isSubscribedEvent(request, event) {
eventData, _ := json.Marshal(event.Data)
stream.Send(&EventData{
EventKey: event.Key,
EventData: string(eventData),
})
}
}
return
}

func validateEventKey(service *service.Service, eventKey string) (err error) {
if eventKey == "" || eventKey == "*" {
return
}
_, ok := service.Events[eventKey]
if ok {
return
}
err = errors.New("Event '" + eventKey + "' doesn't exist in this service")
return
}

func isSubscribedEvent(request *ListenEventRequest, e *event.Event) bool {
return array.IncludedIn([]string{"", "*", e.Key}, request.EventFilter)
}
39 changes: 39 additions & 0 deletions api/core/listen_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package core

import (
"testing"

"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/service"
"github.com/stvp/assert"
)

func TestValidateEventKey(t *testing.T) {
s := &service.Service{
Events: map[string]*service.Event{
"test": &service.Event{},
},
}
assert.Nil(t, validateEventKey(s, ""))
assert.Nil(t, validateEventKey(s, "*"))
assert.Nil(t, validateEventKey(s, "test"))
assert.NotNil(t, validateEventKey(s, "xxx"))
}

func TestIsSubscribedEvent(t *testing.T) {
e := &event.Event{Key: "test"}
r := &ListenEventRequest{}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: ""}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: "*"}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: "test"}
assert.True(t, isSubscribedEvent(r, e))

r = &ListenEventRequest{EventFilter: "xxx"}
assert.False(t, isSubscribedEvent(r, e))
}
67 changes: 59 additions & 8 deletions api/core/listen_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,80 @@ package core

import (
"encoding/json"
"errors"

"github.com/mesg-foundation/core/database/services"
service "github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/utils/array"

"github.com/mesg-foundation/core/execution"
"github.com/mesg-foundation/core/pubsub"
)

// ListenResult will listne for results from a services
// ListenResult will listen for results from a services
func (s *Server) ListenResult(request *ListenResultRequest, stream Core_ListenResultServer) (err error) {
service, err := services.Get(request.ServiceID)
if err != nil {
return
}
if err = validateTaskKey(&service, request.TaskFilter); err != nil {
return
}
if err = validateOutputKey(&service, request.TaskFilter, request.OutputFilter); err != nil {
return
}
subscription := pubsub.Subscribe(service.ResultSubscriptionChannel())
for data := range subscription {
execution := data.(*execution.Execution)
outputs, _ := json.Marshal(execution.OutputData)
stream.Send(&ResultData{
ExecutionID: execution.ID,
TaskKey: execution.Task,
OutputKey: execution.Output,
OutputData: string(outputs),
})
if isSubscribedTask(request, execution) && isSubscribedOutput(request, execution) {
outputs, _ := json.Marshal(execution.OutputData)
stream.Send(&ResultData{
ExecutionID: execution.ID,
TaskKey: execution.Task,
OutputKey: execution.Output,
OutputData: string(outputs),
})
}
}
return
}

func validateTaskKey(service *service.Service, taskKey string) (err error) {
if taskKey == "" || taskKey == "*" {
return
}
_, ok := service.Tasks[taskKey]
if ok {
return
}
err = errors.New("Task '" + taskKey + "' doesn't exist in this service")
return
}

func validateOutputKey(service *service.Service, taskKey string, outputFilter string) (err error) {
if outputFilter == "" || outputFilter == "*" {
return
}
if taskKey == "" {
err = errors.New("Cannot filter output without specifying a task")
return
}
task, ok := service.Tasks[taskKey]
if !ok {
err = errors.New("Task '" + taskKey + "' doesn't exist in this service")
return
}
_, ok = task.Outputs[outputFilter]
if !ok {
err = errors.New("Output '" + outputFilter + "' doesn't exist in the task '" + taskKey + "' of this service")
}
return
}

func isSubscribedTask(request *ListenResultRequest, e *execution.Execution) bool {
return array.IncludedIn([]string{"", "*", e.Task}, request.TaskFilter)
}

func isSubscribedOutput(request *ListenResultRequest, e *execution.Execution) bool {
return array.IncludedIn([]string{"", "*", e.Output}, request.OutputFilter)
}
Loading