From 1f13c81bb0cc62d06cbdad30af0954a3e66bf00f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 9 Jun 2020 17:51:37 -0700 Subject: [PATCH 1/3] fix(fsm): add buffering and channel close handling --- fsm/fsm.go | 43 +++++++++++++++++++++++++++++++++++++------ fsm/fsm_group.go | 22 ++++++++++++++++++++-- fsm/fsm_test.go | 4 +++- 3 files changed, 60 insertions(+), 9 deletions(-) diff --git a/fsm/fsm.go b/fsm/fsm.go index 5fb8bdb..bdb69be 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -73,7 +73,9 @@ func NewFSMHandler(parameters Parameters) (statemachine.StateHandler, error) { d.finalityStates[finalityState] = struct{}{} } - d.initNotifier() + if d.notifier != nil { + d.notifications = make(chan notification) + } return d, nil } @@ -119,13 +121,42 @@ func (d fsmHandler) reachedFinalityState(user interface{}) bool { // initNotifier will start up a goroutine which processes the notification queue // in order -func (d *fsmHandler) initNotifier() { +func (d fsmHandler) initNotifier(closing <-chan struct{}) { if d.notifier != nil { - d.notifications = make(chan notification, NotificationQueueSize) - + queue := make([]notification, 0, NotificationQueueSize) + toProcess := make(chan notification) + go func() { + for { + select { + case n := <-toProcess: + d.notifier(n.eventName, n.state) + case <-closing: + return + } + } + }() go func() { - for n := range d.notifications { - d.notifier(n.eventName, n.state) + outgoing := func() chan<- notification { + if len(queue) == 0 { + return nil + } + return toProcess + } + nextNofication := func() notification { + if len(queue) == 0 { + return notification{} + } + return queue[0] + } + for { + select { + case n := <-d.notifications: + queue = append(queue, n) + case outgoing() <- nextNofication(): + queue = queue[1:] + case <-closing: + return + } } }() } diff --git a/fsm/fsm_group.go b/fsm/fsm_group.go index 9eff365..71e37ac 100644 --- a/fsm/fsm_group.go +++ b/fsm/fsm_group.go @@ -2,6 +2,7 @@ package fsm import ( "context" + "sync" "github.com/filecoin-project/go-statemachine" "github.com/ipfs/go-datastore" @@ -11,7 +12,9 @@ import ( // StateGroup manages a group of states with finite state machine logic type stateGroup struct { *statemachine.StateGroup - d fsmHandler + d fsmHandler + closing chan struct{} + initNotifier sync.Once } // New generates a new state group that operates like a finite state machine, @@ -24,13 +27,18 @@ func New(ds datastore.Datastore, parameters Parameters) (Group, error) { return nil, err } d := handler.(fsmHandler) - return &stateGroup{StateGroup: statemachine.New(ds, handler, parameters.StateType), d: d}, nil + return &stateGroup{StateGroup: statemachine.New(ds, handler, parameters.StateType), d: d, closing: make(chan struct{})}, nil +} + +func (s *stateGroup) init() { + s.d.initNotifier(s.closing) } // Send sends the given event name and parameters to the state specified by id // it will error if there are underlying state store errors or if the parameters // do not match what is expected for the event name func (s *stateGroup) Send(id interface{}, name EventName, args ...interface{}) (err error) { + s.initNotifier.Do(s.init) evt, err := s.d.eventProcessor.Generate(context.TODO(), name, nil, args...) if err != nil { return err @@ -47,6 +55,7 @@ func (s *stateGroup) IsTerminated(out StateType) bool { // it will error if there are underlying state store errors or if the parameters // do not match what is expected for the event name func (s *stateGroup) SendSync(ctx context.Context, id interface{}, name EventName, args ...interface{}) (err error) { + s.initNotifier.Do(s.init) returnChannel := make(chan error, 1) evt, err := s.d.eventProcessor.Generate(ctx, name, returnChannel, args...) if err != nil { @@ -65,3 +74,12 @@ func (s *stateGroup) SendSync(ctx context.Context, id interface{}, name EventNam return err } } + +func (s *stateGroup) Begin(id interface{}, userState interface{}) error { + s.initNotifier.Do(s.init) + return s.StateGroup.Begin(id, userState) +} +func (s *stateGroup) Stop(ctx context.Context) error { + close(s.closing) + return s.StateGroup.Stop(ctx) +} diff --git a/fsm/fsm_test.go b/fsm/fsm_test.go index ad63e32..743e784 100644 --- a/fsm/fsm_test.go +++ b/fsm/fsm_test.go @@ -488,9 +488,11 @@ func TestSerialNotification(t *testing.T) { var notifications []string wg := sync.WaitGroup{} + handleNotifications := make(chan struct{}) wg.Add(len(events)) var notifier fsm.Notifier = func(eventName fsm.EventName, state fsm.StateType) { + <-handleNotifications notifications = append(notifications, eventName.(string)) wg.Done() } @@ -512,7 +514,7 @@ func TestSerialNotification(t *testing.T) { err = smm.Send(uint64(2), eventName) require.NoError(t, err) } - + close(handleNotifications) wg.Wait() // Expect that notifications happened in the order that the events happened From 63f79b21e35aba1c42370079037e8a6b63fabb63 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 9 Jun 2020 17:03:26 -0700 Subject: [PATCH 2/3] fix(fsm): use mutex datastores in test Fix error running --count 100 due to concurrent map writes --- fsm/fsm_test.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/fsm/fsm_test.go b/fsm/fsm_test.go index 743e784..836e90c 100644 --- a/fsm/fsm_test.go +++ b/fsm/fsm_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/ipfs/go-datastore" + dss "github.com/ipfs/go-datastore/sync" logging "github.com/ipfs/go-log" "github.com/stretchr/testify/require" "gotest.tools/assert" @@ -57,7 +58,7 @@ var stateEntryFuncs = fsm.StateEntryFuncs{ } func TestTypeCheckingOnSetup(t *testing.T) { - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})} t.Run("Bad state field", func(t *testing.T) { smm, err := fsm.New(ds, fsm.Parameters{ @@ -350,7 +351,7 @@ func newFsm(ds datastore.Datastore, te *testEnvironment) (fsm.Group, error) { } func TestArgumentChecks(t *testing.T) { - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})} smm, err := newFsm(ds, te) close(te.proceed) @@ -372,7 +373,7 @@ func TestArgumentChecks(t *testing.T) { func TestBasic(t *testing.T) { for i := 0; i < 1000; i++ { // run a few times to expose any races - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})} close(te.proceed) @@ -389,7 +390,7 @@ func TestBasic(t *testing.T) { func TestPersist(t *testing.T) { for i := 0; i < 1000; i++ { // run a few times to expose any races - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})} smm, err := newFsm(ds, te) @@ -416,7 +417,7 @@ func TestPersist(t *testing.T) { func TestSyncEventHandling(t *testing.T) { ctx := context.Background() - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})} smm, err := newFsm(ds, te) @@ -448,7 +449,7 @@ func TestNotification(t *testing.T) { notifications++ } - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0} close(te.proceed) @@ -497,7 +498,7 @@ func TestSerialNotification(t *testing.T) { wg.Done() } - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) params := fsm.Parameters{ Environment: te, StateType: statemachine.TestState{}, @@ -522,7 +523,7 @@ func TestSerialNotification(t *testing.T) { } func TestNoChangeHandler(t *testing.T) { - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0} close(te.proceed) @@ -541,7 +542,7 @@ func TestNoChangeHandler(t *testing.T) { } func TestAllStateEvent(t *testing.T) { - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{}), universalCalls: 0} close(te.proceed) @@ -565,7 +566,7 @@ func TestFinalityStates(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() - ds := datastore.NewMapDatastore() + ds := dss.MutexWrap(datastore.NewMapDatastore()) te := &testEnvironment{t: t, done: make(chan struct{}), proceed: make(chan struct{})} smm, err := newFsm(ds, te) From fb55e0ce31293a90e3ddffa1b7c3a46d6945d604 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 9 Jun 2020 18:18:01 -0700 Subject: [PATCH 3/3] feat(statemachine): add abiltiy for Init on statehandler --- fsm/fsm.go | 4 +-- fsm/fsm_group.go | 22 ++---------- fsm/fsm_test.go | 8 +++-- group.go | 19 +++++++++- machine_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 117 insertions(+), 26 deletions(-) diff --git a/fsm/fsm.go b/fsm/fsm.go index bdb69be..b721003 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -119,9 +119,9 @@ func (d fsmHandler) reachedFinalityState(user interface{}) bool { return final } -// initNotifier will start up a goroutine which processes the notification queue +// Init will start up a goroutine which processes the notification queue // in order -func (d fsmHandler) initNotifier(closing <-chan struct{}) { +func (d fsmHandler) Init(closing <-chan struct{}) { if d.notifier != nil { queue := make([]notification, 0, NotificationQueueSize) toProcess := make(chan notification) diff --git a/fsm/fsm_group.go b/fsm/fsm_group.go index 71e37ac..9eff365 100644 --- a/fsm/fsm_group.go +++ b/fsm/fsm_group.go @@ -2,7 +2,6 @@ package fsm import ( "context" - "sync" "github.com/filecoin-project/go-statemachine" "github.com/ipfs/go-datastore" @@ -12,9 +11,7 @@ import ( // StateGroup manages a group of states with finite state machine logic type stateGroup struct { *statemachine.StateGroup - d fsmHandler - closing chan struct{} - initNotifier sync.Once + d fsmHandler } // New generates a new state group that operates like a finite state machine, @@ -27,18 +24,13 @@ func New(ds datastore.Datastore, parameters Parameters) (Group, error) { return nil, err } d := handler.(fsmHandler) - return &stateGroup{StateGroup: statemachine.New(ds, handler, parameters.StateType), d: d, closing: make(chan struct{})}, nil -} - -func (s *stateGroup) init() { - s.d.initNotifier(s.closing) + return &stateGroup{StateGroup: statemachine.New(ds, handler, parameters.StateType), d: d}, nil } // Send sends the given event name and parameters to the state specified by id // it will error if there are underlying state store errors or if the parameters // do not match what is expected for the event name func (s *stateGroup) Send(id interface{}, name EventName, args ...interface{}) (err error) { - s.initNotifier.Do(s.init) evt, err := s.d.eventProcessor.Generate(context.TODO(), name, nil, args...) if err != nil { return err @@ -55,7 +47,6 @@ func (s *stateGroup) IsTerminated(out StateType) bool { // it will error if there are underlying state store errors or if the parameters // do not match what is expected for the event name func (s *stateGroup) SendSync(ctx context.Context, id interface{}, name EventName, args ...interface{}) (err error) { - s.initNotifier.Do(s.init) returnChannel := make(chan error, 1) evt, err := s.d.eventProcessor.Generate(ctx, name, returnChannel, args...) if err != nil { @@ -74,12 +65,3 @@ func (s *stateGroup) SendSync(ctx context.Context, id interface{}, name EventNam return err } } - -func (s *stateGroup) Begin(id interface{}, userState interface{}) error { - s.initNotifier.Do(s.init) - return s.StateGroup.Begin(id, userState) -} -func (s *stateGroup) Stop(ctx context.Context) error { - close(s.closing) - return s.StateGroup.Stop(ctx) -} diff --git a/fsm/fsm_test.go b/fsm/fsm_test.go index 836e90c..fae0704 100644 --- a/fsm/fsm_test.go +++ b/fsm/fsm_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -443,10 +444,10 @@ func TestSyncEventHandling(t *testing.T) { } func TestNotification(t *testing.T) { - notifications := 0 + notifications := new(uint64) var notifier fsm.Notifier = func(eventName fsm.EventName, state fsm.StateType) { - notifications++ + atomic.AddUint64(notifications, 1) } ds := dss.MutexWrap(datastore.NewMapDatastore()) @@ -468,7 +469,8 @@ func TestNotification(t *testing.T) { require.NoError(t, err) <-te.done - require.Equal(t, notifications, 2) + total := atomic.LoadUint64(notifications) + require.Equal(t, total, uint64(2)) } func TestSerialNotification(t *testing.T) { diff --git a/group.go b/group.go index 04a5a1c..80a4a0e 100644 --- a/group.go +++ b/group.go @@ -15,12 +15,20 @@ type StateHandler interface { Plan(events []Event, user interface{}) (interface{}, uint64, error) } +type StateHandlerWithInit interface { + StateHandler + Init(<-chan struct{}) +} + // StateGroup manages a group of state machines sharing the same logic type StateGroup struct { sts *statestore.StateStore hnd StateHandler stateType reflect.Type + closing chan struct{} + initNotifier sync.Once + lk sync.Mutex sms map[datastore.Key]*StateMachine } @@ -31,8 +39,15 @@ func New(ds datastore.Datastore, hnd StateHandler, stateType interface{}) *State sts: statestore.New(ds), hnd: hnd, stateType: reflect.TypeOf(stateType), + closing: make(chan struct{}), + sms: map[datastore.Key]*StateMachine{}, + } +} - sms: map[datastore.Key]*StateMachine{}, +func (s *StateGroup) init() { + initter, ok := s.hnd.(StateHandlerWithInit) + if ok { + initter.Init(s.closing) } } @@ -85,6 +100,7 @@ func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) { } func (s *StateGroup) loadOrCreate(name interface{}, userState interface{}) (*StateMachine, error) { + s.initNotifier.Do(s.init) exists, err := s.sts.Has(name) if err != nil { return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err) @@ -130,6 +146,7 @@ func (s *StateGroup) Stop(ctx context.Context) error { } } + close(s.closing) return nil } diff --git a/machine_test.go b/machine_test.go index 689154c..ebb8dc1 100644 --- a/machine_test.go +++ b/machine_test.go @@ -352,6 +352,96 @@ func (t *testHandlerNoStateCB) step1(ctx Context, st TestState) error { return nil } +type testHandlerWithGoRoutine struct { + t *testing.T + event chan struct{} + proceed chan struct{} + done chan struct{} + notifDone chan struct{} + count uint64 +} + +func (t *testHandlerWithGoRoutine) Plan(events []Event, state interface{}) (interface{}, uint64, error) { + return t.plan(events, state.(*TestState)) +} + +func (t *testHandlerWithGoRoutine) Init(onClose <-chan struct{}) { + go func() { + for { + select { + case <-t.event: + t.count++ + case <-onClose: + close(t.notifDone) + return + } + } + }() +} + +func (t *testHandlerWithGoRoutine) plan(events []Event, state *TestState) (func(Context, TestState) error, uint64, error) { + for _, event := range events { + e := event.User.(*TestEvent) + switch e.A { + case "restart": + case "start": + state.A = 1 + case "b": + state.A = 2 + state.B = e.Val + } + } + + t.event <- struct{}{} + switch state.A { + case 1: + return t.step0, uint64(len(events)), nil + case 2: + return t.step1, uint64(len(events)), nil + default: + t.t.Fatal(state.A) + } + panic("how?") +} + +func (t *testHandlerWithGoRoutine) step0(ctx Context, st TestState) error { + ctx.Send(&TestEvent{A: "b", Val: 55}) // nolint: errcheck + <-t.proceed + return nil +} + +func (t *testHandlerWithGoRoutine) step1(ctx Context, st TestState) error { + assert.Equal(t.t, uint64(2), st.A) + + close(t.done) + return nil +} + +func TestInit(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + for i := 0; i < 1000; i++ { // run a few times to expose any races + ds := datastore.NewMapDatastore() + + th := &testHandlerWithGoRoutine{t: t, event: make(chan struct{}), notifDone: make(chan struct{}), done: make(chan struct{}), proceed: make(chan struct{})} + close(th.proceed) + smm := New(ds, th, TestState{}) + + if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil { + t.Fatalf("%+v", err) + } + + <-th.done + err := smm.Stop(ctx) + assert.NilError(t, err) + <-th.notifDone + assert.Equal(t, uint64(2), th.count) + } + +} + var _ StateHandler = &testHandler{} var _ StateHandler = &testHandlerPartial{} var _ StateHandler = &testHandlerNoStateCB{} +var _ StateHandler = &testHandlerWithGoRoutine{}