Skip to content

Commit

Permalink
event durability count and cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 5, 2020
1 parent 51fab59 commit 0ed92a6
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 28 deletions.
9 changes: 9 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ type ServerConfig struct {
// will generate events for its event stream.
EnableEventPublisher bool `hcl:"enable_event_publisher"`

// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount int `hcl:"durable_event_count"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
Expand Down Expand Up @@ -888,6 +892,7 @@ func DefaultConfig() *Config {
Server: &ServerConfig{
Enabled: false,
EnableEventPublisher: true,
DurableEventCount: 100,
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
Expand Down Expand Up @@ -1416,6 +1421,10 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.EnableEventPublisher = true
}

if b.DurableEventCount != 0 {
result.DurableEventCount = b.DurableEventCount
}

if b.DefaultSchedulerConfig != nil {
c := *b.DefaultSchedulerConfig
result.DefaultSchedulerConfig = &c
Expand Down
1 change: 1 addition & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ var basicConfig = &Config{
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventPublisher: true,
DurableEventCount: 100,
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ server {
encrypt = "abc"
raft_multiplier = 4
enable_event_publisher = true
durable_event_count = 100

server_join {
retry_join = ["1.1.1.1", "2.2.2.2"]
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
"deployment_gc_threshold": "12h",
"enabled": true,
"enable_event_publisher": true,
"durable_event_count": 100,
"enabled_schedulers": [
"test"
],
Expand Down
27 changes: 15 additions & 12 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,21 @@ type FSMConfig struct {
Region string

EnableEventPublisher bool

// Durable count specifies the amount of events generated by the state store
// to save to disk during snapshot generation. The most recent events
// limited to count will be saved.
DurableEventCount int
}

// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(config *FSMConfig) (*nomadFSM, error) {
// Create a state store
sconfig := &state.StateStoreConfig{
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventPublisher,
// TODO(drew) plumb cfg
EnableDurability: true,
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventPublisher,
DurableEventCount: config.DurableEventCount,
}
state, err := state.NewStateStore(sconfig)
if err != nil {
Expand Down Expand Up @@ -1269,11 +1273,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {

// Create a new state store
config := &state.StateStoreConfig{
Logger: n.config.Logger,
Region: n.config.Region,
EnablePublisher: n.config.EnableEventPublisher,
// TODO(drew) plumb cfg
EnableDurability: true,
Logger: n.config.Logger,
Region: n.config.Region,
EnablePublisher: n.config.EnableEventPublisher,
DurableEventCount: n.config.DurableEventCount,
}
newState, err := state.NewStateStore(config)
if err != nil {
Expand Down Expand Up @@ -2363,10 +2366,10 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink,

func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error {
var durableCount int
if s.snap.Config() != nil && !s.snap.Config().EnableDurability {
if s.snap.Config() != nil && s.snap.Config().DurableEventCount == 0 {
return nil
} else {
durableCount = s.snap.Config().DurableCount
durableCount = s.snap.Config().DurableEventCount
}

events, err := s.snap.LatestEventsReverse(nil)
Expand Down
7 changes: 3 additions & 4 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3206,9 +3206,8 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
cfg := state.Config()
cfg.EnableDurability = true
// DurableCount = 4 each mock events wrapper contains 2 events
cfg.DurableCount = 4
// DurableEventCount = 4 each mock events wrapper contains 2 events
cfg.DurableEventCount = 4

e1 := mock.Events(1000)
e2 := mock.Events(1001)
Expand Down Expand Up @@ -3251,7 +3250,7 @@ func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
cfg := state.Config()
cfg.EnableDurability = false
cfg.DurableEventCount = 0

e1 := mock.Events(1000)
e2 := mock.Events(1001)
Expand Down
6 changes: 2 additions & 4 deletions nomad/state/state_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ type changeTrackerDB struct {

// ChangeConfig
type ChangeConfig struct {
DurableEvents bool
DurableCount int
DurableEventCount int
}

func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor, cfg *ChangeConfig) *changeTrackerDB {
Expand All @@ -55,8 +54,7 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, chang
db: db,
publisher: publisher,
processChanges: changesFn,
durableEvents: cfg.DurableEvents,
durableCount: cfg.DurableCount,
durableCount: cfg.DurableEventCount,
}
}

Expand Down
9 changes: 5 additions & 4 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ type StateStoreConfig struct {
// Region is the region of the server embedding the state store.
Region string

// EnablePublisher is used to enable or disable the event publisher
EnablePublisher bool

EnableDurability bool
DurableCount int
// DurableEventCount is the amount of events to persist during the snapshot
// process.
DurableEventCount int
}

// The StateStore is responsible for maintaining all the Nomad
Expand Down Expand Up @@ -94,8 +96,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {

if config.EnablePublisher {
cfg := &ChangeConfig{
DurableEvents: config.EnableDurability,
DurableCount: 1000,
DurableEventCount: 1000,
}
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,
Expand Down
8 changes: 4 additions & 4 deletions nomad/state/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func TestStateStore(t testing.T) *StateStore {

func TestStateStorePublisher(t testing.T) *StateStoreConfig {
return &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
EnableDurability: true,
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
DurableEventCount: 100,
}
}
func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore {
Expand Down

0 comments on commit 0ed92a6

Please sign in to comment.