Skip to content

Commit

Permalink
Serialize notifications (#11)
Browse files Browse the repository at this point in the history
* Serialize notifications

* Remove sleep from tests

* fix(fsm): add buffering and channel close handling

* fix(fsm): use mutex datastores in test

Fix error running --count 100 due to concurrent map writes

* feat(statemachine): add abiltiy for Init on statehandler

* Fix a typo

Co-authored-by: hannahhoward <hannah@hannahhoward.net>
  • Loading branch information
ingar and hannahhoward committed Jun 12, 2020
1 parent 9cf2bfb commit 4eb3d0c
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 14 deletions.
57 changes: 56 additions & 1 deletion fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ type fsmHandler struct {
stateType reflect.Type
stateKeyField StateKeyField
notifier Notifier
notifications chan notification
eventProcessor EventProcessor
stateEntryFuncs StateEntryFuncs
environment Environment
finalityStates map[StateKey]struct{}
}

const NotificationQueueSize = 128

type notification struct {
eventName EventName
state StateType
}

// NewFSMHandler defines an StateHandler for go-statemachine that implements
// a traditional Finite State Machine model -- transitions, start states,
// end states, and callbacks
Expand Down Expand Up @@ -65,6 +73,10 @@ func NewFSMHandler(parameters Parameters) (statemachine.StateHandler, error) {
d.finalityStates[finalityState] = struct{}{}
}

if d.notifier != nil {
d.notifications = make(chan notification)
}

return d, nil
}

Expand All @@ -90,7 +102,7 @@ func (d fsmHandler) Plan(events []statemachine.Event, user interface{}) (interfa
}
currentState := userValue.Elem().FieldByName(string(d.stateKeyField)).Interface()
if d.notifier != nil {
go d.notifier(eventName, userValue.Elem().Interface())
d.notifications <- notification{eventName, userValue.Elem().Interface()}
}
_, final := d.finalityStates[currentState]
if final {
Expand All @@ -107,6 +119,49 @@ func (d fsmHandler) reachedFinalityState(user interface{}) bool {
return final
}

// Init will start up a goroutine which processes the notification queue
// in order
func (d fsmHandler) Init(closing <-chan struct{}) {
if d.notifier != nil {
queue := make([]notification, 0, NotificationQueueSize)
toProcess := make(chan notification)
go func() {
for {
select {
case n := <-toProcess:
d.notifier(n.eventName, n.state)
case <-closing:
return
}
}
}()
go func() {
outgoing := func() chan<- notification {
if len(queue) == 0 {
return nil
}
return toProcess
}
nextNotification := func() notification {
if len(queue) == 0 {
return notification{}
}
return queue[0]
}
for {
select {
case n := <-d.notifications:
queue = append(queue, n)
case outgoing() <- nextNotification():
queue = queue[1:]
case <-closing:
return
}
}
}()
}
}

// handler makes a state next step function from the given callback
func (d fsmHandler) handler(cb interface{}) interface{} {
if cb == nil {
Expand Down
80 changes: 68 additions & 12 deletions fsm/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package fsm_test

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
Expand Down Expand Up @@ -55,7 +59,7 @@ var stateEntryFuncs = fsm.StateEntryFuncs{
}

func TestTypeCheckingOnSetup(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
t.Run("Bad state field", func(t *testing.T) {
smm, err := fsm.New(ds, fsm.Parameters{
Expand Down Expand Up @@ -348,7 +352,7 @@ func newFsm(ds datastore.Datastore, te *testEnvironment) (fsm.Group, error) {
}

func TestArgumentChecks(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
close(te.proceed)
Expand All @@ -370,7 +374,7 @@ func TestArgumentChecks(t *testing.T) {

func TestBasic(t *testing.T) {
for i := 0; i < 1000; i++ { // run a few times to expose any races
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
close(te.proceed)
Expand All @@ -387,7 +391,7 @@ func TestBasic(t *testing.T) {

func TestPersist(t *testing.T) {
for i := 0; i < 1000; i++ { // run a few times to expose any races
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
Expand All @@ -414,7 +418,7 @@ func TestPersist(t *testing.T) {

func TestSyncEventHandling(t *testing.T) {
ctx := context.Background()
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
Expand All @@ -440,13 +444,13 @@ func TestSyncEventHandling(t *testing.T) {
}

func TestNotification(t *testing.T) {
notifications := 0
notifications := new(uint64)

var notifier fsm.Notifier = func(eventName fsm.EventName, state fsm.StateType) {
notifications++
atomic.AddUint64(notifications, 1)
}

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0}
close(te.proceed)
Expand All @@ -465,11 +469,63 @@ func TestNotification(t *testing.T) {
require.NoError(t, err)
<-te.done

require.Equal(t, notifications, 2)
total := atomic.LoadUint64(notifications)
require.Equal(t, total, uint64(2))
}

func TestSerialNotification(t *testing.T) {
eventNames := []string{}

// Generate a slew of events that will occur in sequential order
for i := 0; i < 1000; i++ {
eventNames = append(eventNames, fmt.Sprintf("%04d", i))
}

events := fsm.Events{}
for _, eventName := range eventNames {
events = append(events, fsm.Event(eventName).FromAny().ToNoChange())
}

te := &testEnvironment{t: t}

var notifications []string

wg := sync.WaitGroup{}
handleNotifications := make(chan struct{})
wg.Add(len(events))

var notifier fsm.Notifier = func(eventName fsm.EventName, state fsm.StateType) {
<-handleNotifications
notifications = append(notifications, eventName.(string))
wg.Done()
}

ds := dss.MutexWrap(datastore.NewMapDatastore())
params := fsm.Parameters{
Environment: te,
StateType: statemachine.TestState{},
StateKeyField: "A",
Events: events,
StateEntryFuncs: fsm.StateEntryFuncs{},
Notifier: notifier,
}
smm, err := fsm.New(ds, params)
require.NoError(t, err)

// send all the events in order
for _, eventName := range eventNames {
err = smm.Send(uint64(2), eventName)
require.NoError(t, err)
}
close(handleNotifications)
wg.Wait()

// Expect that notifications happened in the order that the events happened
require.Equal(t, eventNames, notifications)
}

func TestNoChangeHandler(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0}
close(te.proceed)
Expand All @@ -488,7 +544,7 @@ func TestNoChangeHandler(t *testing.T) {
}

func TestAllStateEvent(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0}
close(te.proceed)
Expand All @@ -512,7 +568,7 @@ func TestFinalityStates(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
Expand Down
19 changes: 18 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ type StateHandler interface {
Plan(events []Event, user interface{}) (interface{}, uint64, error)
}

type StateHandlerWithInit interface {
StateHandler
Init(<-chan struct{})
}

// StateGroup manages a group of state machines sharing the same logic
type StateGroup struct {
sts *statestore.StateStore
hnd StateHandler
stateType reflect.Type

closing chan struct{}
initNotifier sync.Once

lk sync.Mutex
sms map[datastore.Key]*StateMachine
}
Expand All @@ -31,8 +39,15 @@ func New(ds datastore.Datastore, hnd StateHandler, stateType interface{}) *State
sts: statestore.New(ds),
hnd: hnd,
stateType: reflect.TypeOf(stateType),
closing: make(chan struct{}),
sms: map[datastore.Key]*StateMachine{},
}
}

sms: map[datastore.Key]*StateMachine{},
func (s *StateGroup) init() {
initter, ok := s.hnd.(StateHandlerWithInit)
if ok {
initter.Init(s.closing)
}
}

Expand Down Expand Up @@ -85,6 +100,7 @@ func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) {
}

func (s *StateGroup) loadOrCreate(name interface{}, userState interface{}) (*StateMachine, error) {
s.initNotifier.Do(s.init)
exists, err := s.sts.Has(name)
if err != nil {
return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err)
Expand Down Expand Up @@ -130,6 +146,7 @@ func (s *StateGroup) Stop(ctx context.Context) error {
}
}

close(s.closing)
return nil
}

Expand Down
Loading

0 comments on commit 4eb3d0c

Please sign in to comment.