Skip to content

Commit

Permalink
Events/cfg enable publisher (#8916)
Browse files Browse the repository at this point in the history
* only enable publisher based on config

* add default prune tick

* back out state abandon changes on fsm close
  • Loading branch information
drewbailey committed Oct 7, 2020
1 parent 4ebe287 commit e61c522
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
1 change: 0 additions & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) {

// Close is used to cleanup resources associated with the FSM
func (n *nomadFSM) Close() error {
n.state.Abandon()
return nil
}

Expand Down
17 changes: 12 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type StateStoreConfig struct {

// Region is the region of the server embedding the state store.
Region string

EnablePublisher bool
}

// The StateStore is responsible for maintaining all the Nomad
Expand Down Expand Up @@ -86,11 +88,16 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
abandonCh: make(chan struct{}),
stopEventPublisher: cancel,
}
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,
EventBufferSize: 250,
})
s.db = NewChangeTrackerDB(db, publisher, processDBChanges)

if config.EnablePublisher {
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,
EventBufferSize: 250,
})
s.db = NewChangeTrackerDB(db, publisher, processDBChanges)
} else {
s.db = NewChangeTrackerDB(db, &noOpPublisher{}, processDBChanges)
}

// Initialize the state store with required enterprise objects
if err := s.enterpriseInit(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions nomad/stream/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish
if cfg.EventBufferTTL == 0 {
cfg.EventBufferTTL = 1 * time.Hour
}

buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL)
e := &EventPublisher{
eventBuf: buffer,
publishCh: make(chan changeEvents),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
pruneTick: 5 * time.Second,
}

go e.handleUpdates(ctx)
Expand Down

0 comments on commit e61c522

Please sign in to comment.