diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 6b49fcb533f9..8e0c2c33cab6 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() - consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false) + consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner - consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false) + consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() diff --git a/client/client.go b/client/client.go index c41b1b0ed29d..6f88f7144702 100644 --- a/client/client.go +++ b/client/client.go @@ -157,7 +157,7 @@ func (c *Client) setupConsulService() error { auth := c.config.Read("consul.auth") enableSSL := c.config.ReadBoolDefault("consul.ssl", false) verifySSL := c.config.ReadBoolDefault("consul.verifyssl", true) - if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL); err != nil { + if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL, c.config.Node); err != nil { return err } c.consulService = consulService diff --git a/client/consul.go b/client/consul.go index 2719187299af..3440064c7b79 100644 --- a/client/consul.go +++ b/client/consul.go @@ -72,6 +72,7 @@ type ConsulService struct { client consulApi logger *log.Logger shutdownCh chan struct{} + node *structs.Node trackedTasks map[string]*trackedTask serviceStates map[string]string @@ -80,7 +81,7 @@ type ConsulService struct { // A factory method to create new consul service func NewConsulService(logger *log.Logger, consulAddr string, token string, - auth string, enableSSL bool, verifySSL bool) (*ConsulService, error) { + auth string, enableSSL bool, verifySSL bool, node *structs.Node) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() @@ -122,6 +123,7 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string, consulService := ConsulService{ client: &consulApiClient{client: c}, logger: logger, + node: node, trackedTasks: make(map[string]*trackedTask), serviceStates: make(map[string]string), shutdownCh: make(chan struct{}), @@ -161,7 +163,7 @@ func (c *ConsulService) Deregister(task *structs.Task, allocID string) error { } c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name) if err := c.deregisterService(service.Id); err != nil { - c.logger.Printf("[DEBUG] consul: error in deregistering service %v from consul", service.Name) + c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name) mErr.Errors = append(mErr.Errors, err) } } @@ -193,8 +195,14 @@ func (c *ConsulService) SyncWithConsul() { // services which are no longer present in tasks func (c *ConsulService) performSync() { // Get the list of the services and that Consul knows about - consulServices, _ := c.client.Services() - consulChecks, _ := c.client.Checks() + consulServices, err := c.client.Services() + if err != nil { + return + } + consulChecks, err := c.client.Checks() + if err != nil { + return + } delete(consulServices, "consul") knownChecks := make(map[string]struct{}) @@ -207,14 +215,14 @@ func (c *ConsulService) performSync() { // Add new services which Consul agent isn't aware of knownServices[service.Id] = struct{}{} if _, ok := consulServices[service.Id]; !ok { - c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name) + c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name) c.registerService(service, trackedTask.task, trackedTask.allocID) continue } // If a service has changed, re-register it with Consul agent if service.Hash() != c.serviceStates[service.Id] { - c.logger.Printf("[INFO] consul: reregistering service %s with consul.", service.Name) + c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name) c.registerService(service, trackedTask.task, trackedTask.allocID) continue } @@ -242,7 +250,7 @@ func (c *ConsulService) performSync() { for _, consulService := range consulServices { if _, ok := knownServices[consulService.ID]; !ok { delete(c.serviceStates, consulService.ID) - c.logger.Printf("[INFO] consul: deregistering service %v with consul", consulService.Service) + c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service) c.deregisterService(consulService.ID) } } @@ -273,13 +281,13 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. } if err := c.client.ServiceRegister(asr); err != nil { - c.logger.Printf("[DEBUG] consul: error while registering service %v with consul: %v", service.Name, err) + c.printLogMessage("[DEBUG] consul: error while registering service %v with consul: %v", service.Name, err) mErr.Errors = append(mErr.Errors, err) } for _, check := range service.Checks { cr := c.makeCheck(service, check, host, port) if err := c.registerCheck(cr); err != nil { - c.logger.Printf("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err) + c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err) mErr.Errors = append(mErr.Errors, err) } @@ -289,13 +297,13 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. // registerCheck registers a check with Consul func (c *ConsulService) registerCheck(check *consul.AgentCheckRegistration) error { - c.logger.Printf("[INFO] consul: registering Check with ID: %v for service: %v", check.ID, check.ServiceID) + c.printLogMessage("[INFO] consul: registering check with ID: %v for service: %v", check.ID, check.ServiceID) return c.client.CheckRegister(check) } // deregisterCheck de-registers a check with a specific ID from Consul func (c *ConsulService) deregisterCheck(checkID string) error { - c.logger.Printf("[INFO] consul: removing check with ID: %v", checkID) + c.printLogMessage("[INFO] consul: removing check with ID: %v", checkID) return c.client.CheckDeregister(checkID) } @@ -336,3 +344,9 @@ func (c *ConsulService) makeCheck(service *structs.Service, check *structs.Servi } return cr } + +func (c *ConsulService) printLogMessage(message string, v ...interface{}) { + if _, ok := c.node.Attributes["consul.version"]; ok { + c.logger.Printf(message, v) + } +} diff --git a/client/consul_test.go b/client/consul_test.go index f756c078c379..9cb38ede702c 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -46,7 +46,7 @@ func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) { func newConsulService() *ConsulService { logger := log.New(os.Stdout, "logger: ", log.Lshortfile) - c, _ := NewConsulService(logger, "", "", "", false, false) + c, _ := NewConsulService(logger, "", "", "", false, false, &structs.Node{}) c.client = &mockConsulApiClient{} return c } diff --git a/client/fingerprint/consul.go b/client/fingerprint/consul.go index 8f826f912e07..1603fb2cafa8 100644 --- a/client/fingerprint/consul.go +++ b/client/fingerprint/consul.go @@ -12,16 +12,21 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + consulAvailable = "available" + consulUnavailable = "unavailable" +) + // ConsulFingerprint is used to fingerprint the architecture type ConsulFingerprint struct { - logger *log.Logger - client *consul.Client + logger *log.Logger + client *consul.Client + lastState string } // NewConsulFingerprint is used to create an OS fingerprint func NewConsulFingerprint(logger *log.Logger) Fingerprint { - f := &ConsulFingerprint{logger: logger} - return f + return &ConsulFingerprint{logger: logger, lastState: consulUnavailable} } func (f *ConsulFingerprint) Fingerprint(config *client.Config, node *structs.Node) (bool, error) { @@ -55,6 +60,13 @@ func (f *ConsulFingerprint) Fingerprint(config *client.Config, node *structs.Nod if err != nil { // Clear any attributes set by a previous fingerprint. f.clearConsulAttributes(node) + + // Print a message indicating that the Consul Agent is not available + // anymore + if f.lastState == consulAvailable { + f.logger.Printf("[INFO] fingerprint.consul: consul agent is unavailable") + } + f.lastState = consulUnavailable return false, nil } @@ -68,6 +80,12 @@ func (f *ConsulFingerprint) Fingerprint(config *client.Config, node *structs.Nod node.Attributes["consul.datacenter"], node.Attributes["consul.name"]) + // If the Consul Agent was previously unavailable print a message to + // indicate the Agent is available now + if f.lastState == consulUnavailable { + f.logger.Printf("[INFO] fingerprint.consul: consul agent is available") + } + f.lastState = consulAvailable return true, nil } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 4557b6b55303..96da69d4e554 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false) + consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false) + consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, consulClient)