diff --git a/command/agent/agent.go b/command/agent/agent.go index 918a424f5dac..e7e8e2783b13 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -243,11 +243,11 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { if agentConfig.Server.EnableEventBroker != nil { conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker } - if agentConfig.Server.EventBufferSize > 0 { - conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize) + if agentConfig.Server.EventBufferSize != nil { + conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize) } - if agentConfig.Server.DurableEventCount > 0 { - conf.DurableEventCount = int64(agentConfig.Server.DurableEventCount) + if agentConfig.Server.DurableEventCount != nil { + conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount) } if agentConfig.Autopilot != nil { if agentConfig.Autopilot.CleanupDeadServers != nil { diff --git a/command/agent/config.go b/command/agent/config.go index da00ef799766..5a3df36e8853 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -491,11 +491,11 @@ type ServerConfig struct { // EventBufferSize configure the amount of events to be held in memory. // If EnableEventBroker is set to true, the minimum allowable value // for the EventBufferSize is 1. - EventBufferSize int `hcl:"event_buffer_size"` + EventBufferSize *int `hcl:"event_buffer_size"` // DurableEventCount specifies the amount of events to persist during snapshot generation. // A count of 0 signals that no events should be persisted. - DurableEventCount int `hcl:"durable_event_count"` + DurableEventCount *int `hcl:"durable_event_count"` // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` @@ -897,8 +897,8 @@ func DefaultConfig() *Config { Server: &ServerConfig{ Enabled: false, EnableEventBroker: helper.BoolToPtr(true), - EventBufferSize: 100, - DurableEventCount: 100, + EventBufferSize: helper.IntToPtr(100), + DurableEventCount: helper.IntToPtr(100), StartJoin: []string{}, ServerJoin: &ServerJoin{ RetryJoin: []string{}, @@ -1427,11 +1427,11 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.EnableEventBroker = b.EnableEventBroker } - if b.EventBufferSize != 0 { + if b.EventBufferSize != nil { result.EventBufferSize = b.EventBufferSize } - if b.DurableEventCount != 0 { + if b.DurableEventCount != nil { result.DurableEventCount = b.DurableEventCount } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 942d00cba4ae..acaded234ed7 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -123,8 +123,8 @@ var basicConfig = &Config{ UpgradeVersion: "0.8.0", EncryptKey: "abc", EnableEventBroker: helper.BoolToPtr(false), - EventBufferSize: 200, - DurableEventCount: 0, + EventBufferSize: helper.IntToPtr(200), + DurableEventCount: helper.IntToPtr(0), ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index b833e5dab3f3..359fde76683c 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -141,7 +141,8 @@ func TestConfig_Merge(t *testing.T) { RedundancyZone: "foo", UpgradeVersion: "foo", EnableEventBroker: helper.BoolToPtr(false), - DurableEventCount: 0, + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), }, ACL: &ACLConfig{ Enabled: true, @@ -1174,37 +1175,54 @@ func TestTelemetry_Parse(t *testing.T) { func TestEventBroker_Parse(t *testing.T) { require := require.New(t) - { a := &ServerConfig{ EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server b.EnableEventBroker = nil + b.EventBufferSize = nil + b.DurableEventCount = nil result := a.Merge(b) require.Equal(false, *result.EnableEventBroker) + require.Equal(0, *result.EventBufferSize) + require.Equal(0, *result.DurableEventCount) } { a := &ServerConfig{ EnableEventBroker: helper.BoolToPtr(true), + EventBufferSize: helper.IntToPtr(5000), + DurableEventCount: helper.IntToPtr(200), } b := DefaultConfig().Server b.EnableEventBroker = nil + b.EventBufferSize = nil + b.DurableEventCount = nil result := a.Merge(b) require.Equal(true, *result.EnableEventBroker) + require.Equal(5000, *result.EventBufferSize) + require.Equal(200, *result.DurableEventCount) } { a := &ServerConfig{ EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server b.EnableEventBroker = helper.BoolToPtr(true) + b.EventBufferSize = helper.IntToPtr(20000) + b.DurableEventCount = helper.IntToPtr(1000) result := a.Merge(b) require.Equal(true, *result.EnableEventBroker) + require.Equal(20000, *result.EventBufferSize) + require.Equal(1000, *result.DurableEventCount) } } diff --git a/nomad/config.go b/nomad/config.go index f30c3d09714e..ff5292717d67 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -433,6 +433,8 @@ func DefaultConfig() *Config { SentinelGCInterval: 30 * time.Second, LicenseConfig: &LicenseConfig{}, EnableEventBroker: true, + EventBufferSize: 100, + DurableEventCount: 100, AutopilotConfig: &structs.AutopilotConfig{ CleanupDeadServers: true, LastContactThreshold: 200 * time.Millisecond, diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index c7b7406a79cf..3b980981496a 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "io" + "io/ioutil" "time" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -49,16 +49,8 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } - // authToken is passed to the subscribe request so the event stream - // can handle closing a subscription if the authToken expires. - // If ACLs are disabled, a random token is generated and it will - // never be closed due to expiry. - authToken := args.AuthToken - if authToken == "" { - authToken = uuid.Generate() - } subReq := &stream.SubscribeRequest{ - Token: authToken, + Token: args.AuthToken, Topics: args.Topics, Index: uint64(args.Index), Namespace: args.Namespace, @@ -96,20 +88,8 @@ func (e *Event) stream(conn io.ReadWriteCloser) { // goroutine to detect remote side closing go func() { - for { - if _, err := conn.Read(nil); err != nil { - if err == io.EOF || err == io.ErrClosedPipe { - // One end of the pipe was explicitly closed, exit cleanly - cancel() - return - } - select { - case errCh <- err: - case <-ctx.Done(): - return - } - } - } + io.Copy(ioutil.Discard, conn) + cancel() }() go func() { diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 1a3fa5c8fe70..c7a2d9634e34 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -29,7 +29,7 @@ type Changes struct { // all write transactions. When the transaction is committed the changes are // sent to the EventBroker which will create and emit change events. type changeTrackerDB struct { - db *memdb.MemDB + memdb *memdb.MemDB durableCount int64 publisher *stream.EventBroker processChanges func(ReadTxn, Changes) (*structs.Events, error) @@ -37,7 +37,7 @@ type changeTrackerDB struct { func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor, durableCount int64) *changeTrackerDB { return &changeTrackerDB{ - db: db, + memdb: db, publisher: publisher, processChanges: changesFn, durableCount: durableCount, @@ -54,7 +54,7 @@ func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil, // TODO: this could return a regular memdb.Txn if all the state functions accepted // the ReadTxn interface func (c *changeTrackerDB) ReadTxn() *txn { - return &txn{Txn: c.db.Txn(false)} + return &txn{Txn: c.memdb.Txn(false)} } // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. @@ -69,7 +69,7 @@ func (c *changeTrackerDB) ReadTxn() *txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: idx, publish: c.publish, } @@ -82,7 +82,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) t := &txn{ msgType: msgType, - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: idx, publish: c.publish, persistChanges: persistChanges, @@ -92,7 +92,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) } func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { - readOnlyTx := c.db.Txn(false) + readOnlyTx := c.memdb.Txn(false) defer readOnlyTx.Abort() events, err := c.processChanges(readOnlyTx, changes) @@ -114,7 +114,7 @@ func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { // written across many indexes. func (c *changeTrackerDB) WriteTxnRestore() *txn { return &txn{ - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: 0, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9dce76e6b2ea..57f554c1106d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -100,7 +100,6 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { if config.EnablePublisher { // Create new event publisher using provided config broker := stream.NewEventBroker(ctx, stream.EventBrokerCfg{ - EventBufferTTL: 1 * time.Hour, EventBufferSize: config.EventBufferSize, Logger: config.Logger, OnEvict: s.eventBrokerEvict, @@ -138,7 +137,7 @@ func (s *StateStore) eventBrokerEvict(events *structs.Events) { } func (s *StateStore) deleteEvent(events *structs.Events) error { - txn := s.db.db.Txn(true) + txn := s.db.memdb.Txn(true) defer txn.Abort() if err := txn.Delete("events", events); err != nil { @@ -158,7 +157,7 @@ func (s *StateStore) Config() *StateStoreConfig { // we use MemDB, we just need to snapshot the state of the underlying // database. func (s *StateStore) Snapshot() (*StateSnapshot, error) { - memDBSnap := s.db.db.Snapshot() + memDBSnap := s.db.memdb.Snapshot() store := StateStore{ logger: s.logger, diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index 3b3dbd860a17..419b27f98371 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -50,19 +50,17 @@ type eventBuffer struct { head atomic.Value tail atomic.Value - maxSize int64 - maxItemTTL time.Duration - onEvict EvictCallbackFn + maxSize int64 + onEvict EvictCallbackFn } // newEventBuffer creates an eventBuffer ready for use. -func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackFn) *eventBuffer { +func newEventBuffer(size int64, onEvict EvictCallbackFn) *eventBuffer { zero := int64(0) b := &eventBuffer{ - maxSize: size, - size: &zero, - maxItemTTL: maxItemTTL, - onEvict: onEvict, + maxSize: size, + size: &zero, + onEvict: onEvict, } item := newBufferItem(&structs.Events{Index: 0, Events: nil}) @@ -104,7 +102,7 @@ func (b *eventBuffer) appendItem(item *bufferItem) { } func newSentinelItem() *bufferItem { - return newBufferItem(&structs.Events{Index: 0, Events: nil}) + return newBufferItem(&structs.Events{}) } // advanceHead drops the current Head buffer item and notifies readers @@ -191,25 +189,6 @@ func (b *eventBuffer) Len() int { return int(atomic.LoadInt64(b.size)) } -// prune advances the head of the buffer until the head buffer item TTL -// is no longer expired. It should be externally synchronized as it mutates -// the buffer of items. -func (b *eventBuffer) prune() { - now := time.Now() - for { - head := b.Head() - if b.Len() == 0 { - return - } - - if now.Sub(head.createdAt) > b.maxItemTTL { - b.advanceHead() - } else { - return - } - } -} - // bufferItem represents a set of events published by a single raft operation. // The first item returned by a newly constructed buffer will have nil Events. // It is a sentinel value which is used to wait on the next events via Next. diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go index 84f0b8524733..45501d12372c 100644 --- a/nomad/stream/event_buffer_test.go +++ b/nomad/stream/event_buffer_test.go @@ -17,7 +17,7 @@ func TestEventBufferFuzz(t *testing.T) { nReaders := 1000 nMessages := 1000 - b := newEventBuffer(1000, DefaultTTL, nil) + b := newEventBuffer(1000, nil) // Start a write goroutine that will publish 10000 messages with sequential // indexes and some jitter in timing (to allow clients to "catch up" and block @@ -85,7 +85,7 @@ func TestEventBufferFuzz(t *testing.T) { } func TestEventBuffer_Slow_Reader(t *testing.T) { - b := newEventBuffer(10, DefaultTTL, nil) + b := newEventBuffer(10, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -114,7 +114,7 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { } func TestEventBuffer_Size(t *testing.T) { - b := newEventBuffer(100, DefaultTTL, nil) + b := newEventBuffer(100, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -126,11 +126,11 @@ func TestEventBuffer_Size(t *testing.T) { require.Equal(t, 10, b.Len()) } -// TestEventBuffer_Prune_AllOld tests the behavior when all items -// are past their TTL, the event buffer should prune down to the last message -// and hold onto the last item. -func TestEventBuffer_Prune_AllOld(t *testing.T) { - b := newEventBuffer(100, 1*time.Second, nil) +// TestEventBuffer_Emptying_Buffer tests the behavior when all items +// are removed, the event buffer should advance its head down to the last message +// and insert a placeholder sentinel value. +func TestEventBuffer_Emptying_Buffer(t *testing.T) { + b := newEventBuffer(10, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -141,11 +141,11 @@ func TestEventBuffer_Prune_AllOld(t *testing.T) { require.Equal(t, 10, int(b.Len())) - time.Sleep(1 * time.Second) - - // prune old messages, which will bring the event buffer down + // empty the buffer, which will bring the event buffer down // to a single sentinel value - b.prune() + for i := 0; i < 11; i++ { + b.advanceHead() + } // head and tail are now a sentinel value head := b.Head() @@ -203,7 +203,7 @@ func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) { } // buffer starts at index 11 goes to 100 - b := newEventBuffer(100, 1*time.Hour, nil) + b := newEventBuffer(100, nil) for i := 11; i <= 100; i++ { e := structs.Event{ @@ -226,7 +226,7 @@ func TestEventBuffer_OnEvict(t *testing.T) { testOnEvict := func(events *structs.Events) { close(called) } - b := newEventBuffer(2, DefaultTTL, testOnEvict) + b := newEventBuffer(2, testOnEvict) // start at 1 since new event buffer is built with a starting sentinel value for i := 1; i < 4; i++ { diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index 1de7adb8f7f5..bcf8a8fc52b0 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-hclog" @@ -17,7 +18,6 @@ const ( type EventBrokerCfg struct { EventBufferSize int64 - EventBufferTTL time.Duration Logger hclog.Logger OnEvict EvictCallbackFn } @@ -29,14 +29,14 @@ type EventBroker struct { // eventBuf stores a configurable amount of events in memory eventBuf *eventBuffer - logger hclog.Logger - subscriptions *subscriptions // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. publishCh chan *structs.Events + + logger hclog.Logger } // NewEventBroker returns an EventBroker for publishing change events. @@ -44,10 +44,6 @@ type EventBroker struct { // Cancelling the context will shutdown the goroutine to free resources, and stop // all publishing. func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker { - if cfg.EventBufferTTL == 0 { - cfg.EventBufferTTL = 1 * time.Hour - } - if cfg.Logger == nil { cfg.Logger = hclog.NewNullLogger() } @@ -57,7 +53,7 @@ func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker { cfg.EventBufferSize = 100 } - buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL, cfg.OnEvict) + buffer := newEventBuffer(cfg.EventBufferSize, cfg.OnEvict) e := &EventBroker{ logger: cfg.Logger.Named("event_broker"), eventBuf: buffer, @@ -112,6 +108,7 @@ func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) { if offset > 0 && req.StartExactlyAtIndex { return nil, fmt.Errorf("requested index not in buffer") } else if offset > 0 { + metrics.SetGauge([]string{"nomad", "event_broker", "subscription", "request_offset"}, float32(offset)) e.logger.Debug("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index)) } @@ -203,6 +200,15 @@ func (s *subscriptions) unsubscribeFn(req *SubscribeRequest) func() { if !ok { return } + + sub := subsByToken[req] + if sub == nil { + return + } + + // close the subscription + sub.forceClose() + delete(subsByToken, req) if len(subsByToken) == 0 { delete(s.byToken, req.Token) diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go index 856a26e3e08e..0587a81f520c 100644 --- a/nomad/stream/event_publisher_test.go +++ b/nomad/stream/event_publisher_test.go @@ -2,6 +2,7 @@ package stream import ( "context" + "sync/atomic" "testing" "time" @@ -13,13 +14,13 @@ import ( func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) { subscription := &SubscribeRequest{ Topics: map[structs.Topic][]string{ - "Test": []string{"sub-key"}, + "Test": {"sub-key"}, }, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100, EventBufferTTL: DefaultTTL}) + publisher := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100}) sub, err := publisher.Subscribe(subscription) require.NoError(t, err) eventCh := consumeSubscription(ctx, sub) @@ -82,6 +83,32 @@ func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) { require.Equal(t, err, ErrSubscriptionClosed) } +// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription +// hanlding behavior when ACLs are disabled (request Token is empty). +// Subscriptions are mapped by their request token. when that token is empty, +// the subscriptions should still be handled indeppendtly of each other when +// unssubscribing. +func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + publisher := NewEventBroker(ctx, EventBrokerCfg{}) + + // first subscription, empty token + sub1, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub1.Unsubscribe() + + // second subscription, empty token + sub2, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + require.NotNil(t, sub2) + + sub1.Unsubscribe() + + require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) +} + func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index b3ccfbc634b4..7e7ad0928104 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -19,9 +18,6 @@ var ( // JsonStream is used to send new line delimited JSON and heartbeats // to a destination (out channel) type JsonStream struct { - // mu locks during send and protects running - mu sync.Mutex - // ctx is a passed in context used to notify the json stream // when it should terminate ctx context.Context @@ -71,21 +67,18 @@ func (n *JsonStream) heartbeat() { // Send encodes an object into Newline delimited json. An error is returned // if json encoding fails or if the stream is no longer running. func (n *JsonStream) Send(v interface{}) error { - n.mu.Lock() - defer n.mu.Unlock() - if n.ctx.Err() != nil { return n.ctx.Err() } buf, err := json.Marshal(v) if err != nil { - return fmt.Errorf("marshaling json for stream: %w", err) + return fmt.Errorf("error marshaling json for stream: %w", err) } select { case <-n.ctx.Done(): - return fmt.Errorf("stream is no longer running") + return fmt.Errorf("error stream is no longer running: %w", err) case n.outCh <- &structs.EventJson{Data: buf}: }