Skip to content

Commit

Permalink
add server join info to server and client
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed May 19, 2018
1 parent 504a910 commit 5498a3a
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 65 deletions.
12 changes: 6 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
if err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
}
}
Expand Down Expand Up @@ -620,7 +620,7 @@ func (c *Client) GetServers() []string {

// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(in []string) error {
func (c *Client) SetServers(in []string) (int, error) {
return c.setServersImpl(in, false)
}

Expand All @@ -630,7 +630,7 @@ func (c *Client) SetServers(in []string) error {
//
// Force should be used when setting the servers from the initial configuration
// since the server may be starting up in parallel and initial pings may fail.
func (c *Client) setServersImpl(in []string, force bool) error {
func (c *Client) setServersImpl(in []string, force bool) (int, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
Expand Down Expand Up @@ -670,13 +670,13 @@ func (c *Client) setServersImpl(in []string, force bool) error {
// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
return merr.ErrorOrNil()
return 0, merr.ErrorOrNil()
}
return noServersErr
return 0, noServersErr
}

c.servers.SetServers(endpoints)
return nil
return len(endpoints), nil
}

// restoreState is used to restore our state from the data dir
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,13 +975,13 @@ func TestClient_ServerList(t *testing.T) {
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
if _, err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)

// Set the servers list into the client
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
if err := client.SetServers(servers); err != nil {
if _, err := client.SetServers(servers); err != nil {
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())
Expand Down
48 changes: 42 additions & 6 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,49 @@ func (c *Command) Run(args []string) int {
// Start retry join process
c.retryJoinErrCh = make(chan struct{})

joiner := retryJoiner{
join: c.agent.server.Join,
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
if config.Server.Enabled && len(config.Server.RetryJoin) != 0 {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
serverEnabled: true,
}

// This is for backwards compatibility, this should be removed in Nomad
// 0.10 and only ServerJoinInfo should be declared on the server
serverJoinInfo := &ServerJoinInfo{
RetryJoin: config.Server.RetryJoin,
StartJoin: config.Server.StartJoin,
RetryMaxAttempts: config.Server.RetryMaxAttempts,
RetryInterval: config.Server.RetryInterval,
}
go joiner.RetryJoin(serverJoinInfo)
}

if config.Server.Enabled && config.Server.ServerJoinInfo != nil {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
serverEnabled: true,
}

go joiner.RetryJoin(config.Server.ServerJoinInfo)
}

if config.Client.Enabled && config.Client.ServerJoinInfo != nil {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
clientJoin: c.agent.client.SetServers,
clientEnabled: true,
}

go joiner.RetryJoin(config.Client.ServerJoinInfo)
}
go joiner.RetryJoin(config)

// Wait for exit
return c.handleSignals()
Expand Down
14 changes: 14 additions & 0 deletions command/agent/config-test-fixtures/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ advertise {
rpc = "127.0.0.3"
serf = "127.0.0.4"
}

client {
enabled = true
state_dir = "/tmp/client-state"
Expand All @@ -29,6 +30,13 @@ client {
foo = "bar"
baz = "zip"
}
server_join_info {
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
}

options {
foo = "bar"
baz = "zip"
Expand Down Expand Up @@ -86,6 +94,12 @@ server {
redundancy_zone = "foo"
upgrade_version = "0.8.0"
encrypt = "abc"
server_join_info {
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
}
}
acl {
enabled = true
Expand Down
44 changes: 44 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type ClientConfig struct {
// NoHostUUID disables using the host's UUID and will force generation of a
// random UUID.
NoHostUUID *bool `mapstructure:"no_host_uuid"`

// ServerJoinInfo contains information that is used to attempt to join servers
ServerJoinInfo *ServerJoinInfo `mapstructure:"server_join_info"`
}

// ACLConfig is configuration specific to the ACL system
Expand Down Expand Up @@ -311,19 +314,23 @@ type ServerConfig struct {
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
// Deprecated in Nomad 0.10
StartJoin []string `mapstructure:"start_join"`

// RetryJoin is a list of addresses to join with retry enabled.
// Deprecated in Nomad 0.10
RetryJoin []string `mapstructure:"retry_join"`

// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
// Deprecated in Nomad 0.10
RetryMaxAttempts int `mapstructure:"retry_max"`

// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
// Deprecated in Nomad 0.10
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`

Expand All @@ -346,6 +353,36 @@ type ServerConfig struct {

// Encryption key to use for the Serf communication
EncryptKey string `mapstructure:"encrypt" json:"-"`

// ServerJoinInfo contains information that is used to attempt to join servers
ServerJoinInfo *ServerJoinInfo `mapstructure:"server_join_info"`
}

// ServerJoinInfo is used in both clients and servers to bootstrap connections to
// servers
type ServerJoinInfo struct {
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
// Deprecated in Nomad 0.10
StartJoin []string `mapstructure:"start_join"`

// RetryJoin is a list of addresses to join with retry enabled.
// Deprecated in Nomad 0.10
RetryJoin []string `mapstructure:"retry_join"`

// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
// Deprecated in Nomad 0.10
RetryMaxAttempts int `mapstructure:"retry_max"`

// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
// Deprecated in Nomad 0.10
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`
}

// EncryptBytes returns the encryption key configured.
Expand Down Expand Up @@ -1055,6 +1092,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EncryptKey != "" {
result.EncryptKey = b.EncryptKey
}
if b.ServerJoinInfo != nil {
result.ServerJoinInfo = b.ServerJoinInfo
}

// Add the schedulers
result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...)
Expand Down Expand Up @@ -1162,6 +1202,10 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
result.ChrootEnv[k] = v
}

if b.ServerJoinInfo != nil {
result.ServerJoinInfo = b.ServerJoinInfo
}

return &result
}

Expand Down
72 changes: 68 additions & 4 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
"gc_parallel_destroys",
"gc_max_allocs",
"no_host_uuid",
"server_join_info",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
Expand All @@ -385,6 +386,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
delete(m, "chroot_env")
delete(m, "reserved")
delete(m, "stats")
delete(m, "server_join_info")

var config ClientConfig
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Expand Down Expand Up @@ -448,6 +450,13 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
}
}

// Parse ServerJoinInfo config
if o := listVal.Filter("server_join_info"); len(o.Items) > 0 {
if err := parseServerJoinInfo(&config.ServerJoinInfo, o); err != nil {
return multierror.Prefix(err, "server_join_info->")
}
}

*result = &config
return nil
}
Expand Down Expand Up @@ -531,16 +540,20 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
"heartbeat_grace",
"min_heartbeat_ttl",
"max_heartbeats_per_second",
"start_join",
"retry_join",
"retry_max",
"retry_interval",
"rejoin_after_leave",
"encrypt",
"authoritative_region",
"non_voting_server",
"redundancy_zone",
"upgrade_version",

"server_join_info",

// For backwards compatibility
"start_join",
"retry_join",
"retry_max",
"retry_interval",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
Expand All @@ -551,6 +564,8 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
return err
}

delete(m, "server_join_info")

var config ServerConfig
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Expand All @@ -570,10 +585,59 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
}
}

// Parse ServerJoinInfo config
if o := listVal.Filter("server_join_info"); len(o.Items) > 0 {
if err := parseServerJoinInfo(&config.ServerJoinInfo, o); err != nil {
return multierror.Prefix(err, "server_join_info->")
}
}

*result = &config
return nil
}

func parseServerJoinInfo(result **ServerJoinInfo, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'server_info_join' block allowed")
}

// Get our object
listVal := list.Items[0].Val

// Check for invalid keys
valid := []string{
"start_join",
"retry_join",
"retry_max",
"retry_interval",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
}

var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}

var serverJoinInfo ServerJoinInfo
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &serverJoinInfo,
})
if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}

*result = &serverJoinInfo
return nil
}

func parseACL(result **ACLConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
Expand Down
Loading

0 comments on commit 5498a3a

Please sign in to comment.