Skip to content

Commit

Permalink
split event api from sdk pkg to eventsdk pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
ilgooz committed Jun 14, 2019
1 parent 3ce7b99 commit 1dbcb32
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 70 deletions.
64 changes: 64 additions & 0 deletions sdk/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package eventsdk

import (
"github.com/cskr/pubsub"
"github.com/mesg-foundation/core/database"
"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/utils/hash"
)

const (
topic = "Event"
)

// Event exposes event APIs of MESG.
type Event struct {
ps *pubsub.PubSub
db database.ServiceDB
}

// New creates a new Event SDK with given options.
func New(ps *pubsub.PubSub, db database.ServiceDB) *Event {
return &Event{
ps: ps,
db: db,
}
}

// Emit emits a MESG event eventKey with eventData for service token.
func (e *Event) Emit(token, eventKey string, eventData map[string]interface{}) error {
s, err := e.db.Get(token)
if err != nil {
return err
}
ev, err := event.Create(s, eventKey, eventData)
if err != nil {
return err
}

go e.ps.Pub(ev, subTopic(s.Hash))
return nil
}

// Listen listens events matches with eventFilter on serviceID.
func (e *Event) Listen(service string, f *Filter) (*Listener, error) {
s, err := e.db.Get(service)
if err != nil {
return nil, err
}

if f.HasKey() {
if _, err := s.GetEvent(f.Key); err != nil {
return nil, err
}
}

l := NewListener(e.ps, subTopic(s.Hash), f)
go l.Listen()
return l, nil
}

// subTopic returns the topic to listen for events from this service.
func subTopic(serviceHash string) string {
return hash.Calculate([]string{serviceHash, topic})
}
26 changes: 13 additions & 13 deletions sdk/event_listener.go → sdk/event/event_listener.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
package sdk
package eventsdk

import (
"github.com/cskr/pubsub"
"github.com/mesg-foundation/core/event"
)

// EventFilter store fileds for matching events.
type EventFilter struct {
// Filter store fileds for matching events.
type Filter struct {
Key string
}

// Match matches event.
func (f *EventFilter) Match(e *event.Event) bool {
func (f *Filter) Match(e *event.Event) bool {
return f == nil || f.Key == "" || f.Key == "*" || f.Key == e.Key
}

// HasKey returns true if key is set to specified value.
func (f *EventFilter) HasKey() bool {
func (f *Filter) HasKey() bool {
return f != nil && f.Key != "" && f.Key != "*"
}

// EventListener provides functionalities to listen MESG events.
type EventListener struct {
// Listener provides functionalities to listen MESG events.
type Listener struct {
C chan *event.Event

ps *pubsub.PubSub
topic string
c chan interface{}

filter *EventFilter
filter *Filter
}

// NewEventListener creates a new EventListener with given sdk and filters.
func NewEventListener(ps *pubsub.PubSub, topic string, f *EventFilter) *EventListener {
return &EventListener{
// NewListener creates a new Listener with given sdk and filters.
func NewListener(ps *pubsub.PubSub, topic string, f *Filter) *Listener {
return &Listener{
C: make(chan *event.Event, 1),
ps: ps,
topic: topic,
Expand All @@ -43,15 +43,15 @@ func NewEventListener(ps *pubsub.PubSub, topic string, f *EventFilter) *EventLis
}

// Close stops listening for events.
func (l *EventListener) Close() {
func (l *Listener) Close() {
go func() {
l.ps.Unsub(l.c, l.topic)
close(l.C)
}()
}

// Listen listens events that match filter.
func (l *EventListener) Listen() {
func (l *Listener) Listen() {
for v := range l.c {
if e := v.(*event.Event); l.filter.Match(e) {
l.C <- e
Expand Down
16 changes: 8 additions & 8 deletions sdk/event_listener_test.go → sdk/event/event_listener_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package eventsdk

import (
"testing"
Expand All @@ -8,9 +8,9 @@ import (
"github.com/stretchr/testify/assert"
)

func TestEventFilter(t *testing.T) {
func TestFilter(t *testing.T) {
var tests = []struct {
f *EventFilter
f *Filter
e *event.Event
match bool
}{
Expand All @@ -20,22 +20,22 @@ func TestEventFilter(t *testing.T) {
true,
},
{
&EventFilter{},
&Filter{},
&event.Event{},
true,
},
{
&EventFilter{Key: "0"},
&Filter{Key: "0"},
&event.Event{Key: "0"},
true,
},
{
&EventFilter{Key: "*"},
&Filter{Key: "*"},
&event.Event{Key: "0"},
true,
},
{
&EventFilter{Key: "0"},
&Filter{Key: "0"},
&event.Event{Key: "1"},
false,
},
Expand All @@ -50,7 +50,7 @@ func TestEventListener(t *testing.T) {
topic := "test-topic"
testEvent := &event.Event{Key: "0"}
ps := pubsub.New(0)
el := NewEventListener(ps, topic, &EventFilter{Key: "0"})
el := NewListener(ps, topic, &Filter{Key: "0"})

go func() {
ps.Pub(&event.Event{Key: "1"}, topic)
Expand Down
13 changes: 13 additions & 0 deletions sdk/event/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eventsdk

import (
"testing"

"github.com/mesg-foundation/core/utils/hash"
"github.com/stretchr/testify/require"
)

func TestSubTopic(t *testing.T) {
serviceHash := "1"
require.Equal(t, subTopic(serviceHash), hash.Calculate([]string{serviceHash, topic}))
}
46 changes: 5 additions & 41 deletions sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/cskr/pubsub"
"github.com/mesg-foundation/core/container"
"github.com/mesg-foundation/core/database"
"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/execution"
eventsdk "github.com/mesg-foundation/core/sdk/event"
servicesdk "github.com/mesg-foundation/core/sdk/service"
"github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/service/manager"
Expand All @@ -22,6 +22,7 @@ const executionStreamTopic = "execution-stream"
// SDK exposes all functionalities of MESG core.
type SDK struct {
Service *servicesdk.Service
Event *eventsdk.Event

ps *pubsub.PubSub

Expand All @@ -33,9 +34,11 @@ type SDK struct {

// New creates a new SDK with given options.
func New(m manager.Manager, c container.Container, db database.ServiceDB, execDB database.ExecutionDB) *SDK {
ps := pubsub.New(0)
return &SDK{
Service: servicesdk.New(m, c, db, execDB),
ps: pubsub.New(0),
Event: eventsdk.New(ps, db),
ps: ps,
m: m,
container: c,
db: db,
Expand Down Expand Up @@ -116,21 +119,6 @@ func (sdk *SDK) DeleteService(serviceID string, deleteData bool) error {
return sdk.db.Delete(serviceID)
}

// EmitEvent emits a MESG event eventKey with eventData for service token.
func (sdk *SDK) EmitEvent(token, eventKey string, eventData map[string]interface{}) error {
s, err := sdk.db.Get(token)
if err != nil {
return err
}
e, err := event.Create(s, eventKey, eventData)
if err != nil {
return err
}

go sdk.ps.Pub(e, eventSubTopic(s.Hash))
return nil
}

// ExecuteTask executes a task tasKey with inputData and tags for service serviceID.
func (sdk *SDK) ExecuteTask(serviceID, taskKey string, inputData map[string]interface{}, tags []string) (executionHash []byte, err error) {
s, err := sdk.db.Get(serviceID)
Expand Down Expand Up @@ -165,24 +153,6 @@ func (sdk *SDK) ExecuteTask(serviceID, taskKey string, inputData map[string]inte
return exec.Hash, nil
}

// ListenEvent listens events matches with eventFilter on serviceID.
func (sdk *SDK) ListenEvent(service string, f *EventFilter) (*EventListener, error) {
s, err := sdk.db.Get(service)
if err != nil {
return nil, err
}

if f.HasKey() {
if _, err := s.GetEvent(f.Key); err != nil {
return nil, err
}
}

l := NewEventListener(sdk.ps, eventSubTopic(s.Hash), f)
go l.Listen()
return l, nil
}

// ListenExecution listens executions on service.
func (sdk *SDK) ListenExecution(service string, f *ExecutionFilter) (*ExecutionListener, error) {
s, err := sdk.db.Get(service)
Expand Down Expand Up @@ -285,15 +255,9 @@ func (e *NotRunningServiceError) Error() string {
}

const (
eventTopic = "Event"
executionTopic = "Execution"
)

// eventSubTopic returns the topic to listen for events from this service.
func eventSubTopic(serviceHash string) string {
return hash.Calculate([]string{serviceHash, eventTopic})
}

// executionSubTopic returns the topic to listen for tasks from this service.
func executionSubTopic(serviceHash string) string {
return hash.Calculate([]string{serviceHash, executionTopic})
Expand Down
5 changes: 0 additions & 5 deletions sdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ func TestExecuteTaskForNotRunningService(t *testing.T) {
require.True(t, notRunningError)
}

func TestEventSubTopic(t *testing.T) {
serviceHash := "1"
require.Equal(t, eventSubTopic(serviceHash), hash.Calculate([]string{serviceHash, eventTopic}))
}

func TestExecutionSubTopic(t *testing.T) {
serviceHash := "1"
require.Equal(t, executionSubTopic(serviceHash), hash.Calculate([]string{serviceHash, executionTopic}))
Expand Down
3 changes: 2 additions & 1 deletion server/grpc/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mesg-foundation/core/protobuf/acknowledgement"
"github.com/mesg-foundation/core/protobuf/coreapi"
"github.com/mesg-foundation/core/sdk"
eventsdk "github.com/mesg-foundation/core/sdk/event"
"github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/version"
"github.com/mesg-foundation/core/x/xerrors"
Expand Down Expand Up @@ -107,7 +108,7 @@ func (s *Server) DeleteService(ctx context.Context, request *coreapi.DeleteServi

// ListenEvent listens events matches with eventFilter on serviceID.
func (s *Server) ListenEvent(request *coreapi.ListenEventRequest, stream coreapi.Core_ListenEventServer) error {
ln, err := s.sdk.ListenEvent(request.ServiceID, &sdk.EventFilter{Key: request.EventFilter})
ln, err := s.sdk.Event.Listen(request.ServiceID, &eventsdk.Filter{Key: request.EventFilter})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/grpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Server) EmitEvent(context context.Context, request *serviceapi.EmitEven
if err := json.Unmarshal([]byte(request.EventData), &data); err != nil {
return nil, err
}
return &serviceapi.EmitEventReply{}, s.sdk.EmitEvent(request.Token, request.EventKey, data)
return &serviceapi.EmitEventReply{}, s.sdk.Event.Emit(request.Token, request.EventKey, data)
}

// ListenTask creates a stream that will send data for every task to execute.
Expand Down
2 changes: 1 addition & 1 deletion server/grpc/service/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestEmit(t *testing.T) {
require.NoError(t, err)
defer server.sdk.DeleteService(s.Hash, false)

ln, err := server.sdk.ListenEvent(s.Hash, nil)
ln, err := server.sdk.Event.Listen(s.Hash, nil)
require.NoError(t, err)
defer ln.Close()

Expand Down

0 comments on commit 1dbcb32

Please sign in to comment.