Skip to content

Commit

Permalink
address pr feedback
Browse files Browse the repository at this point in the history
use pointers for config sizes, remove unused ttl, simplify closed conn logic
  • Loading branch information
drewbailey committed Oct 14, 2020
1 parent f293a14 commit 4b64007
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 109 deletions.
8 changes: 4 additions & 4 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.EnableEventBroker != nil {
conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker
}
if agentConfig.Server.EventBufferSize > 0 {
conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize)
if agentConfig.Server.EventBufferSize != nil {
conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize)
}
if agentConfig.Server.DurableEventCount > 0 {
conf.DurableEventCount = int64(agentConfig.Server.DurableEventCount)
if agentConfig.Server.DurableEventCount != nil {
conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount)
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {
Expand Down
12 changes: 6 additions & 6 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,11 @@ type ServerConfig struct {
// EventBufferSize configure the amount of events to be held in memory.
// If EnableEventBroker is set to true, the minimum allowable value
// for the EventBufferSize is 1.
EventBufferSize int `hcl:"event_buffer_size"`
EventBufferSize *int `hcl:"event_buffer_size"`

// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount int `hcl:"durable_event_count"`
DurableEventCount *int `hcl:"durable_event_count"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
Expand Down Expand Up @@ -897,8 +897,8 @@ func DefaultConfig() *Config {
Server: &ServerConfig{
Enabled: false,
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: 100,
DurableEventCount: 100,
EventBufferSize: helper.IntToPtr(100),
DurableEventCount: helper.IntToPtr(100),
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
Expand Down Expand Up @@ -1427,11 +1427,11 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.EnableEventBroker = b.EnableEventBroker
}

if b.EventBufferSize != 0 {
if b.EventBufferSize != nil {
result.EventBufferSize = b.EventBufferSize
}

if b.DurableEventCount != 0 {
if b.DurableEventCount != nil {
result.DurableEventCount = b.DurableEventCount
}

Expand Down
4 changes: 2 additions & 2 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ var basicConfig = &Config{
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: 200,
DurableEventCount: 0,
EventBufferSize: helper.IntToPtr(200),
DurableEventCount: helper.IntToPtr(0),
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
22 changes: 20 additions & 2 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func TestConfig_Merge(t *testing.T) {
RedundancyZone: "foo",
UpgradeVersion: "foo",
EnableEventBroker: helper.BoolToPtr(false),
DurableEventCount: 0,
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -1174,37 +1175,54 @@ func TestTelemetry_Parse(t *testing.T) {
func TestEventBroker_Parse(t *testing.T) {

require := require.New(t)

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(false, *result.EnableEventBroker)
require.Equal(0, *result.EventBufferSize)
require.Equal(0, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(5000),
DurableEventCount: helper.IntToPtr(200),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(5000, *result.EventBufferSize)
require.Equal(200, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = helper.BoolToPtr(true)
b.EventBufferSize = helper.IntToPtr(20000)
b.DurableEventCount = helper.IntToPtr(1000)

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(20000, *result.EventBufferSize)
require.Equal(1000, *result.DurableEventCount)
}
}
2 changes: 2 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ func DefaultConfig() *Config {
SentinelGCInterval: 30 * time.Second,
LicenseConfig: &LicenseConfig{},
EnableEventBroker: true,
EventBufferSize: 100,
DurableEventCount: 100,
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
Expand Down
28 changes: 4 additions & 24 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -49,16 +49,8 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}

// authToken is passed to the subscribe request so the event stream
// can handle closing a subscription if the authToken expires.
// If ACLs are disabled, a random token is generated and it will
// never be closed due to expiry.
authToken := args.AuthToken
if authToken == "" {
authToken = uuid.Generate()
}
subReq := &stream.SubscribeRequest{
Token: authToken,
Token: args.AuthToken,
Topics: args.Topics,
Index: uint64(args.Index),
Namespace: args.Namespace,
Expand Down Expand Up @@ -96,20 +88,8 @@ func (e *Event) stream(conn io.ReadWriteCloser) {

// goroutine to detect remote side closing
go func() {
for {
if _, err := conn.Read(nil); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
select {
case errCh <- err:
case <-ctx.Done():
return
}
}
}
io.Copy(ioutil.Discard, conn)
cancel()
}()

go func() {
Expand Down
14 changes: 7 additions & 7 deletions nomad/state/state_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ type Changes struct {
// all write transactions. When the transaction is committed the changes are
// sent to the EventBroker which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
memdb *memdb.MemDB
durableCount int64
publisher *stream.EventBroker
processChanges func(ReadTxn, Changes) (*structs.Events, error)
}

func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor, durableCount int64) *changeTrackerDB {
return &changeTrackerDB{
db: db,
memdb: db,
publisher: publisher,
processChanges: changesFn,
durableCount: durableCount,
Expand All @@ -54,7 +54,7 @@ func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil,
// TODO: this could return a regular memdb.Txn if all the state functions accepted
// the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)}
return &txn{Txn: c.memdb.Txn(false)}
}

// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
Expand All @@ -69,7 +69,7 @@ func (c *changeTrackerDB) ReadTxn() *txn {
// data directly into the DB. These cases may use WriteTxnRestore.
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: c.db.Txn(true),
Txn: c.memdb.Txn(true),
Index: idx,
publish: c.publish,
}
Expand All @@ -82,7 +82,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64)

t := &txn{
msgType: msgType,
Txn: c.db.Txn(true),
Txn: c.memdb.Txn(true),
Index: idx,
publish: c.publish,
persistChanges: persistChanges,
Expand All @@ -92,7 +92,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64)
}

func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) {
readOnlyTx := c.db.Txn(false)
readOnlyTx := c.memdb.Txn(false)
defer readOnlyTx.Abort()

events, err := c.processChanges(readOnlyTx, changes)
Expand All @@ -114,7 +114,7 @@ func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) {
// written across many indexes.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
Txn: c.db.Txn(true),
Txn: c.memdb.Txn(true),
Index: 0,
}
}
Expand Down
5 changes: 2 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
if config.EnablePublisher {
// Create new event publisher using provided config
broker := stream.NewEventBroker(ctx, stream.EventBrokerCfg{
EventBufferTTL: 1 * time.Hour,
EventBufferSize: config.EventBufferSize,
Logger: config.Logger,
OnEvict: s.eventBrokerEvict,
Expand Down Expand Up @@ -138,7 +137,7 @@ func (s *StateStore) eventBrokerEvict(events *structs.Events) {
}

func (s *StateStore) deleteEvent(events *structs.Events) error {
txn := s.db.db.Txn(true)
txn := s.db.memdb.Txn(true)
defer txn.Abort()

if err := txn.Delete("events", events); err != nil {
Expand All @@ -158,7 +157,7 @@ func (s *StateStore) Config() *StateStoreConfig {
// we use MemDB, we just need to snapshot the state of the underlying
// database.
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
memDBSnap := s.db.db.Snapshot()
memDBSnap := s.db.memdb.Snapshot()

store := StateStore{
logger: s.logger,
Expand Down
35 changes: 7 additions & 28 deletions nomad/stream/event_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,17 @@ type eventBuffer struct {
head atomic.Value
tail atomic.Value

maxSize int64
maxItemTTL time.Duration
onEvict EvictCallbackFn
maxSize int64
onEvict EvictCallbackFn
}

// newEventBuffer creates an eventBuffer ready for use.
func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackFn) *eventBuffer {
func newEventBuffer(size int64, onEvict EvictCallbackFn) *eventBuffer {
zero := int64(0)
b := &eventBuffer{
maxSize: size,
size: &zero,
maxItemTTL: maxItemTTL,
onEvict: onEvict,
maxSize: size,
size: &zero,
onEvict: onEvict,
}

item := newBufferItem(&structs.Events{Index: 0, Events: nil})
Expand Down Expand Up @@ -104,7 +102,7 @@ func (b *eventBuffer) appendItem(item *bufferItem) {
}

func newSentinelItem() *bufferItem {
return newBufferItem(&structs.Events{Index: 0, Events: nil})
return newBufferItem(&structs.Events{})
}

// advanceHead drops the current Head buffer item and notifies readers
Expand Down Expand Up @@ -191,25 +189,6 @@ func (b *eventBuffer) Len() int {
return int(atomic.LoadInt64(b.size))
}

// prune advances the head of the buffer until the head buffer item TTL
// is no longer expired. It should be externally synchronized as it mutates
// the buffer of items.
func (b *eventBuffer) prune() {
now := time.Now()
for {
head := b.Head()
if b.Len() == 0 {
return
}

if now.Sub(head.createdAt) > b.maxItemTTL {
b.advanceHead()
} else {
return
}
}
}

// bufferItem represents a set of events published by a single raft operation.
// The first item returned by a newly constructed buffer will have nil Events.
// It is a sentinel value which is used to wait on the next events via Next.
Expand Down
Loading

0 comments on commit 4b64007

Please sign in to comment.