Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize notifications #11

Merged
merged 7 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ type fsmHandler struct {
stateType reflect.Type
stateKeyField StateKeyField
notifier Notifier
notifications chan notification
eventProcessor EventProcessor
stateEntryFuncs StateEntryFuncs
environment Environment
finalityStates map[StateKey]struct{}
}

const NotificationQueueSize = 128

type notification struct {
eventName EventName
state StateType
}

// NewFSMHandler defines an StateHandler for go-statemachine that implements
// a traditional Finite State Machine model -- transitions, start states,
// end states, and callbacks
Expand Down Expand Up @@ -65,6 +73,10 @@ func NewFSMHandler(parameters Parameters) (statemachine.StateHandler, error) {
d.finalityStates[finalityState] = struct{}{}
}

if d.notifier != nil {
d.notifications = make(chan notification)
}

return d, nil
}

Expand All @@ -90,7 +102,7 @@ func (d fsmHandler) Plan(events []statemachine.Event, user interface{}) (interfa
}
currentState := userValue.Elem().FieldByName(string(d.stateKeyField)).Interface()
if d.notifier != nil {
go d.notifier(eventName, userValue.Elem().Interface())
d.notifications <- notification{eventName, userValue.Elem().Interface()}
}
_, final := d.finalityStates[currentState]
if final {
Expand All @@ -107,6 +119,49 @@ func (d fsmHandler) reachedFinalityState(user interface{}) bool {
return final
}

// Init will start up a goroutine which processes the notification queue
// in order
func (d fsmHandler) Init(closing <-chan struct{}) {
if d.notifier != nil {
queue := make([]notification, 0, NotificationQueueSize)
toProcess := make(chan notification)
go func() {
for {
select {
case n := <-toProcess:
d.notifier(n.eventName, n.state)
case <-closing:
return
}
}
}()
go func() {
outgoing := func() chan<- notification {
if len(queue) == 0 {
return nil
}
return toProcess
}
nextNotification := func() notification {
if len(queue) == 0 {
return notification{}
}
return queue[0]
}
for {
select {
case n := <-d.notifications:
queue = append(queue, n)
case outgoing() <- nextNotification():
queue = queue[1:]
case <-closing:
return
}
}
}()
}
}

// handler makes a state next step function from the given callback
func (d fsmHandler) handler(cb interface{}) interface{} {
if cb == nil {
Expand Down
80 changes: 68 additions & 12 deletions fsm/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package fsm_test

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
Expand Down Expand Up @@ -55,7 +59,7 @@ var stateEntryFuncs = fsm.StateEntryFuncs{
}

func TestTypeCheckingOnSetup(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
t.Run("Bad state field", func(t *testing.T) {
smm, err := fsm.New(ds, fsm.Parameters{
Expand Down Expand Up @@ -348,7 +352,7 @@ func newFsm(ds datastore.Datastore, te *testEnvironment) (fsm.Group, error) {
}

func TestArgumentChecks(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
close(te.proceed)
Expand All @@ -370,7 +374,7 @@ func TestArgumentChecks(t *testing.T) {

func TestBasic(t *testing.T) {
for i := 0; i < 1000; i++ { // run a few times to expose any races
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
close(te.proceed)
Expand All @@ -387,7 +391,7 @@ func TestBasic(t *testing.T) {

func TestPersist(t *testing.T) {
for i := 0; i < 1000; i++ { // run a few times to expose any races
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
Expand All @@ -414,7 +418,7 @@ func TestPersist(t *testing.T) {

func TestSyncEventHandling(t *testing.T) {
ctx := context.Background()
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
Expand All @@ -440,13 +444,13 @@ func TestSyncEventHandling(t *testing.T) {
}

func TestNotification(t *testing.T) {
notifications := 0
notifications := new(uint64)

var notifier fsm.Notifier = func(eventName fsm.EventName, state fsm.StateType) {
notifications++
atomic.AddUint64(notifications, 1)
}

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0}
close(te.proceed)
Expand All @@ -465,11 +469,63 @@ func TestNotification(t *testing.T) {
require.NoError(t, err)
<-te.done

require.Equal(t, notifications, 2)
total := atomic.LoadUint64(notifications)
require.Equal(t, total, uint64(2))
}

func TestSerialNotification(t *testing.T) {
eventNames := []string{}

// Generate a slew of events that will occur in sequential order
for i := 0; i < 1000; i++ {
eventNames = append(eventNames, fmt.Sprintf("%04d", i))
}

events := fsm.Events{}
for _, eventName := range eventNames {
events = append(events, fsm.Event(eventName).FromAny().ToNoChange())
}

te := &testEnvironment{t: t}

var notifications []string

wg := sync.WaitGroup{}
handleNotifications := make(chan struct{})
wg.Add(len(events))

var notifier fsm.Notifier = func(eventName fsm.EventName, state fsm.StateType) {
<-handleNotifications
notifications = append(notifications, eventName.(string))
wg.Done()
}

ds := dss.MutexWrap(datastore.NewMapDatastore())
params := fsm.Parameters{
Environment: te,
StateType: statemachine.TestState{},
StateKeyField: "A",
Events: events,
StateEntryFuncs: fsm.StateEntryFuncs{},
Notifier: notifier,
}
smm, err := fsm.New(ds, params)
require.NoError(t, err)

// send all the events in order
for _, eventName := range eventNames {
err = smm.Send(uint64(2), eventName)
require.NoError(t, err)
}
close(handleNotifications)
wg.Wait()

// Expect that notifications happened in the order that the events happened
require.Equal(t, eventNames, notifications)
}

func TestNoChangeHandler(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0}
close(te.proceed)
Expand All @@ -488,7 +544,7 @@ func TestNoChangeHandler(t *testing.T) {
}

func TestAllStateEvent(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0}
close(te.proceed)
Expand All @@ -512,7 +568,7 @@ func TestFinalityStates(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())

te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm, err := newFsm(ds, te)
Expand Down
19 changes: 18 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ type StateHandler interface {
Plan(events []Event, user interface{}) (interface{}, uint64, error)
}

type StateHandlerWithInit interface {
StateHandler
Init(<-chan struct{})
}

// StateGroup manages a group of state machines sharing the same logic
type StateGroup struct {
sts *statestore.StateStore
hnd StateHandler
stateType reflect.Type

closing chan struct{}
initNotifier sync.Once

lk sync.Mutex
sms map[datastore.Key]*StateMachine
}
Expand All @@ -31,8 +39,15 @@ func New(ds datastore.Datastore, hnd StateHandler, stateType interface{}) *State
sts: statestore.New(ds),
hnd: hnd,
stateType: reflect.TypeOf(stateType),
closing: make(chan struct{}),
sms: map[datastore.Key]*StateMachine{},
}
}

sms: map[datastore.Key]*StateMachine{},
func (s *StateGroup) init() {
initter, ok := s.hnd.(StateHandlerWithInit)
if ok {
initter.Init(s.closing)
}
}

Expand Down Expand Up @@ -85,6 +100,7 @@ func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) {
}

func (s *StateGroup) loadOrCreate(name interface{}, userState interface{}) (*StateMachine, error) {
s.initNotifier.Do(s.init)
exists, err := s.sts.Has(name)
if err != nil {
return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err)
Expand Down Expand Up @@ -130,6 +146,7 @@ func (s *StateGroup) Stop(ctx context.Context) error {
}
}

close(s.closing)
return nil
}

Expand Down
Loading