diff --git a/CHANGELOG.md b/CHANGELOG.md index 68c17be6bc70..2cd9eac035db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ ## 0.7.1 (Unreleased) __BACKWARDS INCOMPATIBILITIES:__ + * client: The format of service IDs in Consul has changed. If you rely upon + Nomad's service IDs (*not* service names; those are stable), you will need + to update your code. [GH-3632] * config: Nomad no longer parses Atlas configuration stanzas. Atlas has been deprecated since earlier this year. If you have an Atlas stanza in your config file it will have to be removed. @@ -22,10 +25,13 @@ IMPROVEMENTS: * api: Environment variables are ignored during service name validation [GH-3532] * cli: Allocation create and modify times are displayed in a human readable relative format like `6 h ago` [GH-3449] + * client: Support `address_mode` on checks [GH-3619] * client: Sticky volume migrations are now atomic. [GH-3563] * client: Added metrics to track state transitions of allocations [GH-3061] * client: When `network_interface` is unspecified use interface attached to default route [GH-3546] + * client: Support numeric ports on services and checks when + `address_mode="driver"` [GH-3619] * driver/docker: Detect OOM kill event [GH-3459] * driver/docker: Adds support for adding host device to container via `--device` [GH-2938] @@ -54,9 +60,13 @@ BUG FIXES: explicitly [GH-3520] * cli: Fix passing Consul address via flags [GH-3504] * cli: Fix panic when running `keyring` commands [GH-3509] + * client: Fix advertising services with tags that require URL escaping + [GH-3632] * client: Fix a panic when restoring an allocation with a dead leader task [GH-3502] * client: Fix crash when following logs from a Windows node [GH-3608] + * client: Fix service/check updating when just interpolated variables change + [GH-3619] * client: Fix allocation accounting in GC and trigger GCs on allocation updates [GH-3445] * driver/rkt: Remove pods on shutdown [GH-3562] diff --git a/api/tasks.go b/api/tasks.go index 462d9f549319..7dc2950b187f 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -154,6 +154,7 @@ type ServiceCheck struct { Path string Protocol string PortLabel string `mapstructure:"port"` + AddressMode string `mapstructure:"address_mode"` Interval time.Duration Timeout time.Duration InitialStatus string `mapstructure:"initial_status"` diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 9bdb882a02c5..c2d59541c76a 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -209,12 +209,12 @@ func setupTaskEnv(t *testing.T, driver string) (*allocdir.TaskDir, map[string]st "NOMAD_PORT_two": "443", "NOMAD_HOST_PORT_two": "443", "NOMAD_ADDR_admin": "1.2.3.4:8081", - "NOMAD_ADDR_web_main": "192.168.0.100:5000", + "NOMAD_ADDR_web_admin": "192.168.0.100:5000", "NOMAD_ADDR_web_http": "192.168.0.100:2000", - "NOMAD_IP_web_main": "192.168.0.100", + "NOMAD_IP_web_admin": "192.168.0.100", "NOMAD_IP_web_http": "192.168.0.100", "NOMAD_PORT_web_http": "2000", - "NOMAD_PORT_web_main": "5000", + "NOMAD_PORT_web_admin": "5000", "NOMAD_IP_admin": "1.2.3.4", "NOMAD_PORT_admin": "8081", "NOMAD_HOST_PORT_admin": "8081", diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 7db3072b5fc2..492794854268 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -10,6 +10,7 @@ import ( "log" "os" "strconv" + "strings" "time" "github.com/mitchellh/mapstructure" @@ -61,6 +62,17 @@ type MockDriverConfig struct { // SignalErr is the error message that the task returns if signalled SignalErr string `mapstructure:"signal_error"` + + // DriverIP will be returned as the DriverNetwork.IP from Start() + DriverIP string `mapstructure:"driver_ip"` + + // DriverAdvertise will be returned as DriverNetwork.AutoAdvertise from + // Start(). + DriverAdvertise bool `mapstructure:"driver_advertise"` + + // DriverPortMap will parse a label:number pair and return it in + // DriverNetwork.PortMap from Start(). + DriverPortMap string `mapstructure:"driver_port_map"` } // MockDriver is a driver which is used for testing purposes @@ -114,6 +126,23 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse return nil, structs.NewRecoverableError(errors.New(driverConfig.StartErr), driverConfig.StartErrRecoverable) } + // Create the driver network + net := &cstructs.DriverNetwork{ + IP: driverConfig.DriverIP, + AutoAdvertise: driverConfig.DriverAdvertise, + } + if raw := driverConfig.DriverPortMap; len(raw) > 0 { + parts := strings.Split(raw, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("malformed port map: %q", raw) + } + port, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, fmt.Errorf("malformed port map: %q -- error: %v", raw, err) + } + net.PortMap = map[string]int{parts[0]: port} + } + h := mockDriverHandle{ taskName: task.Name, runFor: driverConfig.RunFor, @@ -133,7 +162,8 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse } m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name) go h.run() - return &StartResponse{Handle: &h}, nil + + return &StartResponse{Handle: &h, Network: net}, nil } // Cleanup deletes all keys except for Config.Options["cleanup_fail_on"] for @@ -265,10 +295,20 @@ func (h *mockDriverHandle) Kill() error { select { case <-h.doneCh: case <-time.After(h.killAfter): - close(h.doneCh) + select { + case <-h.doneCh: + // already closed + default: + close(h.doneCh) + } case <-time.After(h.killTimeout): h.logger.Printf("[DEBUG] driver.mock: terminating task %q", h.taskName) - close(h.doneCh) + select { + case <-h.doneCh: + // already closed + default: + close(h.doneCh) + } } return nil } @@ -286,7 +326,12 @@ func (h *mockDriverHandle) run() { for { select { case <-timer.C: - close(h.doneCh) + select { + case <-h.doneCh: + // already closed + default: + close(h.doneCh) + } case <-h.doneCh: h.logger.Printf("[DEBUG] driver.mock: finished running task %q", h.taskName) h.waitCh <- dstructs.NewWaitResult(h.exitCode, h.exitSignal, h.exitErr) diff --git a/client/task_runner.go b/client/task_runner.go index a6d230a62b81..e01cc34cdd62 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1631,7 +1631,12 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { // Merge in the task resources updatedTask.Resources = update.TaskResources[updatedTask.Name] - // Update the task's environment for interpolating in services/checks + // Interpolate the old task with the old env before updating the env as + // updating services in Consul need both the old and new interpolations + // to find differences. + oldInterpolatedTask := interpolateServices(r.envBuilder.Build(), r.task) + + // Now it's safe to update the environment r.envBuilder.UpdateTask(update, updatedTask) var mErr multierror.Error @@ -1650,7 +1655,8 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { } // Update services in Consul - if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil { + newInterpolatedTask := interpolateServices(r.envBuilder.Build(), updatedTask) + if err := r.updateServices(drv, r.handle, oldInterpolatedTask, newInterpolatedTask); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err)) } } @@ -1667,19 +1673,17 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { return mErr.ErrorOrNil() } -// updateServices and checks with Consul. -func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error { +// updateServices and checks with Consul. Tasks must be interpolated! +func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, oldTask, newTask *structs.Task) error { var exec driver.ScriptExecutor if d.Abilities().Exec { // Allow set the script executor if the driver supports it exec = h } - newInterpolatedTask := interpolateServices(r.envBuilder.Build(), new) - oldInterpolatedTask := interpolateServices(r.envBuilder.Build(), old) r.driverNetLock.Lock() net := r.driverNet.Copy() r.driverNetLock.Unlock() - return r.consul.UpdateTask(r.alloc.ID, oldInterpolatedTask, newInterpolatedTask, r, exec, net) + return r.consul.UpdateTask(r.alloc.ID, oldTask, newTask, r, exec, net) } // handleDestroy kills the task handle. In the case that killing fails, diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 5b99a1ca07b2..d91f5a96bbc4 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -73,15 +73,17 @@ func (m *MockTaskStateUpdater) String() string { } type taskRunnerTestCtx struct { - upd *MockTaskStateUpdater - tr *TaskRunner - allocDir *allocdir.AllocDir - vault *vaultclient.MockVaultClient - consul *mockConsulServiceClient + upd *MockTaskStateUpdater + tr *TaskRunner + allocDir *allocdir.AllocDir + vault *vaultclient.MockVaultClient + consul *consul.MockAgent + consulClient *consul.ServiceClient } // Cleanup calls Destroy on the task runner and alloc dir func (ctx *taskRunnerTestCtx) Cleanup() { + ctx.consulClient.Shutdown() ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled)) ctx.allocDir.Destroy() } @@ -117,9 +119,6 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat upd := &MockTaskStateUpdater{} task := alloc.Job.TaskGroups[0].Tasks[0] - // Initialize the port listing. This should be done by the offer process but - // we have a mock so that doesn't happen. - task.Resources.Networks[0].ReservedPorts = []structs.Port{{Label: "", Value: 80}} allocDir := allocdir.NewAllocDir(testLogger(), filepath.Join(conf.AllocDir, alloc.ID)) if err := allocDir.Build(); err != nil { @@ -143,17 +142,20 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat } vclient := vaultclient.NewMockVaultClient() - cclient := newMockConsulServiceClient() - tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, cclient) + cclient := consul.NewMockAgent() + serviceClient := consul.NewServiceClient(cclient, true, logger) + go serviceClient.Run() + tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient) if !restarts { tr.restartTracker = noRestartsTracker() } return &taskRunnerTestCtx{ - upd: upd, - tr: tr, - allocDir: allocDir, - vault: vclient, - consul: cclient, + upd: upd, + tr: tr, + allocDir: allocDir, + vault: vclient, + consul: cclient, + consulClient: serviceClient, } } @@ -339,7 +341,12 @@ func TestTaskRunner_Update(t *testing.T) { t.Parallel() alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - task.Services[0].Checks[0].Args[0] = "${NOMAD_META_foo}" + task.Services[0].Checks[0] = &structs.ServiceCheck{ + Name: "http-check", + Type: "http", + PortLabel: "http", + Path: "${NOMAD_META_foo}", + } task.Driver = "mock_driver" task.Config = map[string]interface{}{ "run_for": "100s", @@ -350,6 +357,8 @@ func TestTaskRunner_Update(t *testing.T) { go ctx.tr.Run() defer ctx.Cleanup() + testWaitForTaskToStart(t, ctx) + // Update the task definition updateAlloc := ctx.tr.alloc.Copy() @@ -363,10 +372,9 @@ func TestTaskRunner_Update(t *testing.T) { // Update meta to make sure service checks are interpolated correctly // #2180 - newTask.Meta["foo"] = "UPDATE" + newTask.Meta["foo"] = "/UPDATE" // Update the kill timeout - testWaitForTaskToStart(t, ctx) oldHandle := ctx.tr.handle.ID() newTask.KillTimeout = time.Hour ctx.tr.Update(updateAlloc) @@ -380,25 +388,22 @@ func TestTaskRunner_Update(t *testing.T) { return false, fmt.Errorf("Task not copied") } if ctx.tr.restartTracker.policy.Mode != newMode { - return false, fmt.Errorf("restart policy not ctx.updated") + return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.policy.Mode) } if ctx.tr.handle.ID() == oldHandle { return false, fmt.Errorf("handle not ctx.updated") } + // Make sure Consul services were interpolated correctly during // the update #2180 - consul := ctx.tr.consul.(*mockConsulServiceClient) - consul.mu.Lock() - defer consul.mu.Unlock() - if len(consul.ops) < 2 { - return false, fmt.Errorf("expected at least 2 consul ops found: %d", len(consul.ops)) + checks := ctx.consul.CheckRegs() + if n := len(checks); n != 1 { + return false, fmt.Errorf("expected 1 check but found %d", n) } - lastOp := consul.ops[len(consul.ops)-1] - if lastOp.op != "update" { - return false, fmt.Errorf("expected last consul op to be update not %q", lastOp.op) - } - if found := lastOp.task.Services[0].Checks[0].Args[0]; found != "UPDATE" { - return false, fmt.Errorf("expected consul check to be UPDATE but found: %q", found) + for _, check := range checks { + if found := check.HTTP; !strings.HasSuffix(found, "/UPDATE") { + return false, fmt.Errorf("expected consul check path to end with /UPDATE but found: %q", found) + } } return true, nil }, func(err error) { @@ -635,12 +640,16 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { } ctx := testTaskRunnerFromAlloc(t, true, alloc) + + // Use mockConsulServiceClient + consul := newMockConsulServiceClient() + ctx.tr.consul = consul + ctx.tr.MarkReceived() ctx.tr.Run() defer ctx.Cleanup() // Assert it is properly registered and unregistered - consul := ctx.tr.consul.(*mockConsulServiceClient) if expected := 4; len(consul.ops) != expected { t.Errorf("expected %d consul ops but found: %d", expected, len(consul.ops)) } @@ -1742,6 +1751,8 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] + task.Services[0].Tags = []string{"tag1"} + task.Services = task.Services[:1] // only need 1 for this test task.Driver = "mock_driver" task.Config = map[string]interface{}{ "run_for": "1000s", @@ -1758,34 +1769,39 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) { // Wait for the task to start testWaitForTaskToStart(t, ctx) + testutil.WaitForResult(func() (bool, error) { + services, _ := ctx.consul.Services() + if n := len(services); n != 1 { + return false, fmt.Errorf("expected 1 service found %d", n) + } + for _, s := range services { + if !reflect.DeepEqual(s.Tags, task.Services[0].Tags) { + return false, fmt.Errorf("expected tags=%q but found %q", + strings.Join(task.Services[0].Tags, ","), strings.Join(s.Tags, ",")) + } + } + return true, nil + }, func(err error) { + services, _ := ctx.consul.Services() + for _, s := range services { + t.Logf("Service: %#v", s) + } + t.Fatalf("err: %v", err) + }) + // Begin the tear down ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled)) destroyed := time.Now() - // Service should get removed quickly; loop until RemoveTask is called - found := false - deadline := destroyed.Add(task.ShutdownDelay) - for time.Now().Before(deadline) { - time.Sleep(5 * time.Millisecond) - - ctx.consul.mu.Lock() - n := len(ctx.consul.ops) - if n < 2 { - ctx.consul.mu.Unlock() - continue - } - - lastOp := ctx.consul.ops[n-1].op - ctx.consul.mu.Unlock() - - if lastOp == "remove" { - found = true - break + testutil.WaitForResult(func() (bool, error) { + services, _ := ctx.consul.Services() + if n := len(services); n == 1 { + return false, fmt.Errorf("expected 0 services found %d", n) } - } - if !found { - t.Errorf("task was not removed from Consul first") - } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) // Wait for actual exit select { @@ -1893,3 +1909,125 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { } } } + +// TestTaskRunner_DriverNetwork asserts that a driver's network is properly +// used in services and checks. +func TestTaskRunner_DriverNetwork(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "exit_code": 0, + "run_for": "100s", + "driver_ip": "10.1.2.3", + "driver_port_map": "http:80", + } + + // Create services and checks with custom address modes to exercise + // address detection logic + task.Services = []*structs.Service{ + { + Name: "host-service", + PortLabel: "http", + AddressMode: "host", + Checks: []*structs.ServiceCheck{ + { + Name: "driver-check", + Type: "tcp", + PortLabel: "1234", + AddressMode: "driver", + }, + }, + }, + { + Name: "driver-service", + PortLabel: "5678", + AddressMode: "driver", + Checks: []*structs.ServiceCheck{ + { + Name: "host-check", + Type: "tcp", + PortLabel: "http", + }, + { + Name: "driver-label-check", + Type: "tcp", + PortLabel: "http", + AddressMode: "driver", + }, + }, + }, + } + + ctx := testTaskRunnerFromAlloc(t, false, alloc) + ctx.tr.MarkReceived() + go ctx.tr.Run() + defer ctx.Cleanup() + + // Wait for the task to start + testWaitForTaskToStart(t, ctx) + + testutil.WaitForResult(func() (bool, error) { + services, _ := ctx.consul.Services() + if n := len(services); n != 2 { + return false, fmt.Errorf("expected 2 services, but found %d", n) + } + for _, s := range services { + switch s.Service { + case "host-service": + if expected := "192.168.0.100"; s.Address != expected { + return false, fmt.Errorf("expected host-service to have IP=%s but found %s", + expected, s.Address) + } + case "driver-service": + if expected := "10.1.2.3"; s.Address != expected { + return false, fmt.Errorf("expected driver-service to have IP=%s but found %s", + expected, s.Address) + } + if expected := 5678; s.Port != expected { + return false, fmt.Errorf("expected driver-service to have port=%d but found %d", + expected, s.Port) + } + default: + return false, fmt.Errorf("unexpected service: %q", s.Service) + } + + } + + checks := ctx.consul.CheckRegs() + if n := len(checks); n != 3 { + return false, fmt.Errorf("expected 3 checks, but found %d", n) + } + for _, check := range checks { + switch check.Name { + case "driver-check": + if expected := "10.1.2.3:1234"; check.TCP != expected { + return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP) + } + case "driver-label-check": + if expected := "10.1.2.3:80"; check.TCP != expected { + return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP) + } + case "host-check": + if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) { + return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP) + } + default: + return false, fmt.Errorf("unexpected check: %q", check.Name) + } + } + + return true, nil + }, func(err error) { + services, _ := ctx.consul.Services() + for _, s := range services { + t.Logf(pretty.Sprint("Serivce: ", s)) + } + for _, c := range ctx.consul.CheckRegs() { + t.Logf(pretty.Sprint("Check: ", c)) + } + t.Fatalf("error: %v", err) + }) +} diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index 6b28940f1144..e0a31f770496 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -80,6 +80,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) { return r, nil } +// Checks implements the Agent API Checks method. func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) { c.mu.Lock() defer c.mu.Unlock() @@ -98,6 +99,19 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) { return r, nil } +// CheckRegs returns the raw AgentCheckRegistrations registered with this mock +// agent. +func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { + c.mu.Lock() + defer c.mu.Unlock() + + regs := make([]*api.AgentCheckRegistration, 0, len(c.checks)) + for _, check := range c.checks { + regs = append(regs, check) + } + return regs +} + func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 57912b35bf99..93e001acbe7a 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -2,7 +2,10 @@ package consul import ( "context" + "crypto/sha1" + "encoding/base32" "fmt" + "io" "log" "net" "net/url" @@ -21,10 +24,14 @@ import ( ) const ( - // nomadServicePrefix is the first prefix that scopes all Nomad registered - // services + // nomadServicePrefix is the prefix that scopes all Nomad registered + // services (both agent and task entries). nomadServicePrefix = "_nomad" + // nomadTaskPrefix is the prefix that scopes Nomad registered services + // for tasks. + nomadTaskPrefix = nomadServicePrefix + "-task-" + // defaultRetryInterval is how quickly to retry syncing services and // checks to Consul when an error occurs. Will backoff up to a max. defaultRetryInterval = time.Second @@ -288,8 +295,13 @@ func (c *ServiceClient) Run() { if err := c.sync(); err != nil { if failures == 0 { + // Log on the first failure c.logger.Printf("[WARN] consul.sync: failed to update services in Consul: %v", err) + } else if failures%10 == 0 { + // Log every 10th consecutive failure + c.logger.Printf("[ERR] consul.sync: still unable to update services in Consul after %d failures; latest error: %v", failures, err) } + failures++ if !retryTimer.Stop() { // Timer already expired, since the timer may @@ -389,8 +401,14 @@ func (c *ServiceClient) sync() error { // Not managed by Nomad, skip continue } + // Unknown Nomad managed service; kill if err := c.client.ServiceDeregister(id); err != nil { + if isOldNomadService(id) { + // Don't hard-fail on old entries. See #3620 + continue + } + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) return err } @@ -398,29 +416,16 @@ func (c *ServiceClient) sync() error { metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1) } - // Track services whose ports have changed as their checks may also - // need updating - portsChanged := make(map[string]struct{}, len(c.services)) - // Add Nomad services missing from Consul for id, locals := range c.services { - if remotes, ok := consulServices[id]; ok { - // Make sure Port and Address are stable since - // PortLabel and AddressMode aren't included in the - // service ID. - if locals.Port == remotes.Port && locals.Address == remotes.Address { - // Already exists in Consul; skip - continue + if _, ok := consulServices[id]; !ok { + if err = c.client.ServiceRegister(locals); err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err } - // Port changed, reregister it and its checks - portsChanged[id] = struct{}{} - } - if err = c.client.ServiceRegister(locals); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + sreg++ + metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } - sreg++ - metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } // Remove Nomad checks in Consul but unknown locally @@ -433,8 +438,14 @@ func (c *ServiceClient) sync() error { // Service not managed by Nomad, skip continue } - // Unknown Nomad managed check; kill + + // Unknown Nomad managed check; remove if err := c.client.CheckDeregister(id); err != nil { + if isOldNomadService(check.ServiceID) { + // Don't hard-fail on old entries. + continue + } + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) return err } @@ -444,12 +455,11 @@ func (c *ServiceClient) sync() error { // Add Nomad checks missing from Consul for id, check := range c.checks { - if check, ok := consulChecks[id]; ok { - if _, changed := portsChanged[check.ServiceID]; !changed { - // Already in Consul and ports didn't change; skipping - continue - } + if _, ok := consulChecks[id]; ok { + // Already in Consul; skipping + continue } + if err := c.client.CheckRegister(check); err != nil { metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) return err @@ -569,23 +579,16 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st checkIDs: make(map[string]struct{}, len(service.Checks)), } - // Determine the address to advertise + // Service address modes default to auto addrMode := service.AddressMode - if addrMode == structs.AddressModeAuto { - if net.Advertise() { - addrMode = structs.AddressModeDriver - } else { - // No driver network or shouldn't default to driver's network - addrMode = structs.AddressModeHost - } + if addrMode == "" { + addrMode = structs.AddressModeAuto } - ip, port := task.Resources.Networks.Port(service.PortLabel) - if addrMode == structs.AddressModeDriver { - if net == nil { - return nil, fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name) - } - ip = net.IP - port = net.PortMap[service.PortLabel] + + // Determine the address to advertise based on the mode + ip, port, err := getAddress(addrMode, service.PortLabel, task.Resources.Networks, net) + if err != nil { + return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err) } // Build the Consul Service registration request @@ -639,15 +642,33 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se ops.scripts = append(ops.scripts, newScriptCheck( allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh)) + // Skip getAddress for script checks + checkReg, err := createCheckReg(serviceID, checkID, check, "", 0) + if err != nil { + return nil, fmt.Errorf("failed to add script check %q: %v", check.Name, err) + } + ops.regChecks = append(ops.regChecks, checkReg) + continue } - // Checks should always use the host ip:port + // Default to the service's port but allow check to override portLabel := check.PortLabel if portLabel == "" { // Default to the service's port label portLabel = service.PortLabel } - ip, port := task.Resources.Networks.Port(portLabel) + + // Checks address mode defaults to host for pre-#3380 backward compat + addrMode := check.AddressMode + if addrMode == "" { + addrMode = structs.AddressModeHost + } + + ip, port, err := getAddress(addrMode, portLabel, task.Resources.Networks, net) + if err != nil { + return nil, fmt.Errorf("unable to get address for check %q: %v", check.Name, err) + } + checkReg, err := createCheckReg(serviceID, checkID, check, ip, port) if err != nil { return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) @@ -709,8 +730,8 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { ops := &operations{} - t := new(TaskRegistration) - t.Services = make(map[string]*ServiceRegistration, len(newTask.Services)) + taskReg := new(TaskRegistration) + taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services)) existingIDs := make(map[string]*structs.Service, len(existing.Services)) for _, s := range existing.Services { @@ -740,22 +761,17 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta continue } + // Service exists and hasn't changed, don't re-add it later + delete(newIDs, existingID) + // Service still exists so add it to the task's registration sreg := &ServiceRegistration{ serviceID: existingID, checkIDs: make(map[string]struct{}, len(newSvc.Checks)), } - t.Services[existingID] = sreg - - // PortLabel and AddressMode aren't included in the ID, so we - // have to compare manually. - serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode - if serviceUnchanged { - // Service exists and hasn't changed, don't add it later - delete(newIDs, existingID) - } + taskReg.Services[existingID] = sreg - // Check to see what checks were updated + // See if any checks were updated existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks)) for _, check := range existingSvc.Checks { existingChecks[makeCheckID(existingID, check)] = check @@ -768,17 +784,16 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Check exists, so don't remove it delete(existingChecks, checkID) sreg.checkIDs[checkID] = struct{}{} - } else if serviceUnchanged { - // New check on an unchanged service; add them now - newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) - if err != nil { - return err - } + } - for _, checkID := range newCheckIDs { - sreg.checkIDs[checkID] = struct{}{} + // New check on an unchanged service; add them now + newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) + if err != nil { + return err + } - } + for _, checkID := range newCheckIDs { + sreg.checkIDs[checkID] = struct{}{} } @@ -806,11 +821,11 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta return err } - t.Services[sreg.serviceID] = sreg + taskReg.Services[sreg.serviceID] = sreg } // Add the task to the allocation's registration - c.addTaskRegistration(allocID, newTask.Name, t) + c.addTaskRegistration(allocID, newTask.Name, taskReg) c.commit(ops) @@ -988,36 +1003,27 @@ func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) { // // Agent service IDs are of the form: // -// {nomadServicePrefix}-{ROLE}-{Service.Name}-{Service.Tags...} -// Example Server ID: _nomad-server-nomad-serf -// Example Client ID: _nomad-client-nomad-client-http +// {nomadServicePrefix}-{ROLE}-b32(sha1({Service.Name}-{Service.Tags...}) +// Example Server ID: _nomad-server-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4 +// Example Client ID: _nomad-client-GGNJPGL7YN7RGMVXZILMPVRZZVRSZC7L // func makeAgentServiceID(role string, service *structs.Service) string { - parts := make([]string, len(service.Tags)+3) - parts[0] = nomadServicePrefix - parts[1] = role - parts[2] = service.Name - copy(parts[3:], service.Tags) - return strings.Join(parts, "-") + h := sha1.New() + io.WriteString(h, service.Name) + for _, tag := range service.Tags { + io.WriteString(h, tag) + } + b32 := base32.StdEncoding.EncodeToString(h.Sum(nil)) + return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, b32) } // makeTaskServiceID creates a unique ID for identifying a task service in -// Consul. -// -// Task service IDs are of the form: -// -// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...} -// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3 +// Consul. All structs.Service fields are included in the ID's hash except +// Checks. This allows updates to merely compare IDs. // +// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH func makeTaskServiceID(allocID, taskName string, service *structs.Service) string { - parts := make([]string, len(service.Tags)+5) - parts[0] = nomadServicePrefix - parts[1] = "executor" - parts[2] = allocID - parts[3] = taskName - parts[4] = service.Name - copy(parts[5:], service.Tags) - return strings.Join(parts, "-") + return nomadTaskPrefix + service.Hash(allocID, taskName) } // makeCheckID creates a unique ID for a check. @@ -1073,9 +1079,68 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host } // isNomadService returns true if the ID matches the pattern of a Nomad managed -// service. Agent services return false as independent client and server agents -// may be running on the same machine. #2827 +// service (new or old formats). Agent services return false as independent +// client and server agents may be running on the same machine. #2827 func isNomadService(id string) bool { + return strings.HasPrefix(id, nomadTaskPrefix) || isOldNomadService(id) +} + +// isOldNomadService returns true if the ID matches an old pattern managed by +// Nomad. +// +// Pre-0.7.1 task service IDs are of the form: +// +// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...} +// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3 +// +func isOldNomadService(id string) bool { const prefix = nomadServicePrefix + "-executor" return strings.HasPrefix(id, prefix) } + +// getAddress returns the ip and port to use for a service or check. An error +// is returned if an ip and port cannot be determined. +func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *cstructs.DriverNetwork) (string, int, error) { + switch addrMode { + case structs.AddressModeAuto: + if driverNet.Advertise() { + addrMode = structs.AddressModeDriver + } else { + addrMode = structs.AddressModeHost + } + return getAddress(addrMode, portLabel, networks, driverNet) + case structs.AddressModeHost: + // Default path: use host ip:port + ip, port := networks.Port(portLabel) + if ip == "" && port <= 0 { + return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel) + } + return ip, port, nil + + case structs.AddressModeDriver: + // Require a driver network if driver address mode is used + if driverNet == nil { + return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`) + } + + // If the port is a label, use the driver's port (not the host's) + if port, ok := driverNet.PortMap[portLabel]; ok { + return driverNet.IP, port, nil + } + + // If port isn't a label, try to parse it as a literal port number + port, err := strconv.Atoi(portLabel) + if err != nil { + return "", 0, fmt.Errorf("invalid port %q: %v", portLabel, err) + } + if port <= 0 { + return "", 0, fmt.Errorf("invalid port: %q: port 0 is invalid", portLabel) + } + + return driverNet.IP, port, nil + + default: + // Shouldn't happen due to validation, but enforce invariants + return "", 0, fmt.Errorf("invalid address mode %q", addrMode) + } +} diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 9295a696c9ad..ba9975532a03 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -116,7 +116,12 @@ func TestConsul_Integration(t *testing.T) { { Name: "httpd2", PortLabel: "http", - Tags: []string{"test", "http2"}, + Tags: []string{ + "test", + // Use URL-unfriendly tags to test #3620 + "public-test.ettaviation.com:80/ redirect=302,https://test.ettaviation.com", + "public-test.ettaviation.com:443/", + }, }, } diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 8bd3e08a5d8c..88acc4ee1523 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -16,6 +16,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" + "github.com/stretchr/testify/assert" ) const ( @@ -219,8 +220,8 @@ func TestConsul_ChangeTags(t *testing.T) { } // TestConsul_ChangePorts asserts that changing the ports on a service updates -// it in Consul. Since ports are not part of the service ID this is a slightly -// different code path than changing tags. +// it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a +// slightly different code path than changing tags. func TestConsul_ChangePorts(t *testing.T) { ctx := setupFake() ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -348,8 +349,8 @@ func TestConsul_ChangePorts(t *testing.T) { } for k, v := range ctx.FakeConsul.services { - if k != origServiceKey { - t.Errorf("unexpected key change; was: %q -- but found %q", origServiceKey, k) + if k == origServiceKey { + t.Errorf("expected key change; still: %q", k) } if v.Name != ctx.Task.Services[0].Name { t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) @@ -369,15 +370,15 @@ func TestConsul_ChangePorts(t *testing.T) { for k, v := range ctx.FakeConsul.checks { switch v.Name { case "c1": - if k != origTCPKey { - t.Errorf("unexpected key change for %s from %q to %q", v.Name, origTCPKey, k) + if k == origTCPKey { + t.Errorf("expected key change for %s from %q", v.Name, origTCPKey) } if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) } case "c2": - if k != origScriptKey { - t.Errorf("unexpected key change for %s from %q to %q", v.Name, origScriptKey, k) + if k == origScriptKey { + t.Errorf("expected key change for %s from %q", v.Name, origScriptKey) } select { case <-ctx.execs: @@ -1382,9 +1383,16 @@ func TestIsNomadService(t *testing.T) { }{ {"_nomad-client-nomad-client-http", false}, {"_nomad-server-nomad-serf", false}, + + // Pre-0.7.1 style IDs still match {"_nomad-executor-abc", true}, {"_nomad-executor", true}, + + // Post-0.7.1 style IDs match + {"_nomad-task-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4", true}, + {"not-nomad", false}, + {"_nomad", false}, } for _, test := range tests { @@ -1440,3 +1448,160 @@ func TestCreateCheckReg(t *testing.T) { t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n")) } } + +// TestGetAddress asserts Nomad uses the correct ip and port for services and +// checks depending on port labels, driver networks, and address mode. +func TestGetAddress(t *testing.T) { + const HostIP = "127.0.0.1" + + cases := []struct { + Name string + + // Parameters + Mode string + PortLabel string + Host map[string]int // will be converted to structs.Networks + Driver *cstructs.DriverNetwork + + // Results + IP string + Port int + ErrContains string + }{ + { + Name: "ExampleService", + Mode: structs.AddressModeAuto, + PortLabel: "db", + Host: map[string]int{"db": 12435}, + Driver: &cstructs.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + IP: HostIP, + Port: 12435, + }, + { + Name: "Host", + Mode: structs.AddressModeHost, + PortLabel: "db", + Host: map[string]int{"db": 12345}, + Driver: &cstructs.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + IP: HostIP, + Port: 12345, + }, + { + Name: "Driver", + Mode: structs.AddressModeDriver, + PortLabel: "db", + Host: map[string]int{"db": 12345}, + Driver: &cstructs.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + IP: "10.1.2.3", + Port: 6379, + }, + { + Name: "AutoDriver", + Mode: structs.AddressModeAuto, + PortLabel: "db", + Host: map[string]int{"db": 12345}, + Driver: &cstructs.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + AutoAdvertise: true, + }, + IP: "10.1.2.3", + Port: 6379, + }, + { + Name: "DriverCustomPort", + Mode: structs.AddressModeDriver, + PortLabel: "7890", + Host: map[string]int{"db": 12345}, + Driver: &cstructs.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + IP: "10.1.2.3", + Port: 7890, + }, + { + Name: "DriverWithoutNetwork", + Mode: structs.AddressModeDriver, + PortLabel: "db", + Host: map[string]int{"db": 12345}, + Driver: nil, + ErrContains: "no driver network exists", + }, + { + Name: "DriverBadPort", + Mode: structs.AddressModeDriver, + PortLabel: "bad-port-label", + Host: map[string]int{"db": 12345}, + Driver: &cstructs.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + ErrContains: "invalid port", + }, + { + Name: "DriverZeroPort", + Mode: structs.AddressModeDriver, + PortLabel: "0", + Driver: &cstructs.DriverNetwork{ + IP: "10.1.2.3", + }, + ErrContains: "invalid port", + }, + { + Name: "HostBadPort", + Mode: structs.AddressModeHost, + PortLabel: "bad-port-label", + ErrContains: "invalid port", + }, + { + Name: "InvalidMode", + Mode: "invalid-mode", + ErrContains: "invalid address mode", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + // convert host port map into a structs.Networks + networks := []*structs.NetworkResource{ + { + IP: HostIP, + ReservedPorts: make([]structs.Port, len(tc.Host)), + }, + } + + i := 0 + for label, port := range tc.Host { + networks[0].ReservedPorts[i].Label = label + networks[0].ReservedPorts[i].Value = port + i++ + } + + // Run getAddress + ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver) + + // Assert the results + assert.Equal(t, tc.IP, ip, "IP mismatch") + assert.Equal(t, tc.Port, port, "Port mismatch") + if tc.ErrContains == "" { + assert.Nil(t, err) + } else { + if err == nil { + t.Fatalf("expected error containing %q but err=nil", tc.ErrContains) + } else { + assert.Contains(t, err.Error(), tc.ErrContains) + } + } + }) + } +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 9a154679e95c..6b0e3a565482 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -710,6 +710,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { Path: check.Path, Protocol: check.Protocol, PortLabel: check.PortLabel, + AddressMode: check.AddressMode, Interval: check.Interval, Timeout: check.Timeout, InitialStatus: check.InitialStatus, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 8eb74a9b92bf..019a82ae059a 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1222,6 +1222,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Path: "/check", Protocol: "http", PortLabel: "foo", + AddressMode: "driver", Interval: 4 * time.Second, Timeout: 2 * time.Second, InitialStatus: "ok", @@ -1418,6 +1419,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Path: "/check", Protocol: "http", PortLabel: "foo", + AddressMode: "driver", Interval: 4 * time.Second, Timeout: 2 * time.Second, InitialStatus: "ok", diff --git a/jobspec/parse.go b/jobspec/parse.go index f44cb75406f1..d25f38bd0f8e 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -981,6 +981,7 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error { "header", "method", "check_restart", + "address_mode", } if err := helper.CheckHCLKeys(co.Val, valid); err != nil { return multierror.Prefix(err, "check ->") diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 5922edeff511..2dfc890d42ff 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -583,6 +583,54 @@ func TestParse(t *testing.T) { }, false, }, + { + "service-check-driver-address.hcl", + &api.Job{ + ID: helper.StringToPtr("address_mode_driver"), + Name: helper.StringToPtr("address_mode_driver"), + Type: helper.StringToPtr("service"), + TaskGroups: []*api.TaskGroup{ + { + Name: helper.StringToPtr("group"), + Tasks: []*api.Task{ + { + Name: "task", + Services: []*api.Service{ + { + Name: "http-service", + PortLabel: "http", + AddressMode: "auto", + Checks: []api.ServiceCheck{ + { + Name: "http-check", + Type: "http", + Path: "/", + PortLabel: "http", + AddressMode: "driver", + }, + }, + }, + { + Name: "random-service", + PortLabel: "9000", + AddressMode: "driver", + Checks: []api.ServiceCheck{ + { + Name: "random-check", + Type: "tcp", + PortLabel: "9001", + AddressMode: "driver", + }, + }, + }, + }, + }, + }, + }, + }, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/service-check-driver-address.hcl b/jobspec/test-fixtures/service-check-driver-address.hcl new file mode 100644 index 000000000000..a9fa88d422c6 --- /dev/null +++ b/jobspec/test-fixtures/service-check-driver-address.hcl @@ -0,0 +1,38 @@ +job "address_mode_driver" { + type = "service" + group "group" { + task "task" { + service { + name = "http-service" + port = "http" + + address_mode = "auto" + + check { + name = "http-check" + type = "http" + path = "/" + port = "http" + + address_mode = "driver" + } + } + + service { + name = "random-service" + port = "9000" + + address_mode = "driver" + + check { + name = "random-check" + type = "tcp" + port = "9001" + + address_mode = "driver" + } + } + } + } +} + diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 556dd0b8d05b..d0c889b09582 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -41,7 +41,7 @@ func Node() *structs.Node { { Device: "eth0", IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "main", Value: 22}}, + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, MBits: 1, }, }, @@ -128,8 +128,11 @@ func Job() *structs.Job { MemoryMB: 256, Networks: []*structs.NetworkResource{ { - MBits: 50, - DynamicPorts: []structs.Port{{Label: "http"}, {Label: "admin"}}, + MBits: 50, + DynamicPorts: []structs.Port{ + {Label: "http"}, + {Label: "admin"}, + }, }, }, }, @@ -273,7 +276,7 @@ func Alloc() *structs.Allocation { { Device: "eth0", IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "main", Value: 5000}}, + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, MBits: 50, DynamicPorts: []structs.Port{{Label: "http"}}, }, @@ -287,7 +290,7 @@ func Alloc() *structs.Allocation { { Device: "eth0", IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "main", Value: 5000}}, + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, MBits: 50, DynamicPorts: []structs.Port{{Label: "http"}}, }, diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 379f1a23995a..4574bcb64c30 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -3507,6 +3507,12 @@ func TestTaskDiff(t *testing.T) { Type: DiffTypeEdited, Name: "Check", Fields: []*FieldDiff{ + { + Type: DiffTypeNone, + Name: "AddressMode", + Old: "", + New: "", + }, { Type: DiffTypeNone, Name: "Command", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1836aae828f2..52c0f1410651 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "crypto/sha256" "crypto/sha512" + "encoding/base32" "encoding/hex" "errors" "fmt" @@ -44,6 +45,9 @@ var ( // validPolicyName is used to validate a policy name validPolicyName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$") + + // b32 is a lowercase base32 encoding for use in URL friendly service hashes + b32 = base32.NewEncoding(strings.ToLower("abcdefghijklmnopqrstuvwxyz234567")) ) type MessageType uint8 @@ -2866,6 +2870,7 @@ type ServiceCheck struct { Path string // path of the health check url for http type check Protocol string // Protocol to use if check is http, defaults to http PortLabel string // The port to use for tcp/http checks + AddressMode string // 'host' to use host ip:port or 'driver' to use driver's Interval time.Duration // Interval of the check Timeout time.Duration // Timeout of the response from the check before consul fails the check InitialStatus string // Initial status of the check @@ -2911,6 +2916,7 @@ func (sc *ServiceCheck) Canonicalize(serviceName string) { // validate a Service's ServiceCheck func (sc *ServiceCheck) validate() error { + // Validate Type switch strings.ToLower(sc.Type) { case ServiceCheckTCP: case ServiceCheckHTTP: @@ -2926,6 +2932,7 @@ func (sc *ServiceCheck) validate() error { return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type) } + // Validate interval and timeout if sc.Interval == 0 { return fmt.Errorf("missing required value interval. Interval cannot be less than %v", minCheckInterval) } else if sc.Interval < minCheckInterval { @@ -2938,9 +2945,9 @@ func (sc *ServiceCheck) validate() error { return fmt.Errorf("timeout (%v) is lower than required minimum timeout %v", sc.Timeout, minCheckInterval) } + // Validate InitialStatus switch sc.InitialStatus { case "": - // case api.HealthUnknown: TODO: Add when Consul releases 0.7.1 case api.HealthPassing: case api.HealthWarning: case api.HealthCritical: @@ -2949,6 +2956,16 @@ func (sc *ServiceCheck) validate() error { } + // Validate AddressMode + switch sc.AddressMode { + case "", AddressModeHost, AddressModeDriver: + // Ok + case AddressModeAuto: + return fmt.Errorf("invalid address_mode %q - %s only valid for services", sc.AddressMode, AddressModeAuto) + default: + return fmt.Errorf("invalid address_mode %q", sc.AddressMode) + } + return sc.CheckRestart.Validate() } @@ -3001,6 +3018,11 @@ func (sc *ServiceCheck) Hash(serviceID string) string { io.WriteString(h, strings.Join(headers, "")) } + // Only include AddressMode if set to maintain ID stability with Nomad <0.7.1 + if len(sc.AddressMode) > 0 { + io.WriteString(h, sc.AddressMode) + } + return fmt.Sprintf("%x", h.Sum(nil)) } @@ -3122,15 +3144,24 @@ func (s *Service) ValidateName(name string) error { return nil } -// Hash calculates the hash of the check based on it's content and the service -// which owns it -func (s *Service) Hash() string { +// Hash returns a base32 encoded hash of a Service's contents excluding checks +// as they're hashed independently. +func (s *Service) Hash(allocID, taskName string) string { h := sha1.New() + io.WriteString(h, allocID) + io.WriteString(h, taskName) io.WriteString(h, s.Name) - io.WriteString(h, strings.Join(s.Tags, "")) io.WriteString(h, s.PortLabel) io.WriteString(h, s.AddressMode) - return fmt.Sprintf("%x", h.Sum(nil)) + for _, tag := range s.Tags { + io.WriteString(h, tag) + } + + // Base32 is used for encoding the hash as sha1 hashes can always be + // encoded without padding, only 4 bytes larger than base64, and saves + // 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice + // to have a reasonably compact URL-safe representation. + return b32.EncodeToString(h.Sum(nil)) } const ( @@ -3439,7 +3470,13 @@ func validateServices(t *Task) error { // Ensure that services don't ask for non-existent ports and their names are // unique. - servicePorts := make(map[string][]string) + servicePorts := make(map[string]map[string]struct{}) + addServicePort := func(label, service string) { + if _, ok := servicePorts[label]; !ok { + servicePorts[label] = map[string]struct{}{} + } + servicePorts[label][service] = struct{}{} + } knownServices := make(map[string]struct{}) for i, service := range t.Services { if err := service.Validate(); err != nil { @@ -3455,16 +3492,63 @@ func validateServices(t *Task) error { knownServices[service.Name+service.PortLabel] = struct{}{} if service.PortLabel != "" { - servicePorts[service.PortLabel] = append(servicePorts[service.PortLabel], service.Name) + if service.AddressMode == "driver" { + // Numeric port labels are valid for address_mode=driver + _, err := strconv.Atoi(service.PortLabel) + if err != nil { + // Not a numeric port label, add it to list to check + addServicePort(service.PortLabel, service.Name) + } + } else { + addServicePort(service.PortLabel, service.Name) + } } - // Ensure that check names are unique. + // Ensure that check names are unique and have valid ports knownChecks := make(map[string]struct{}) for _, check := range service.Checks { if _, ok := knownChecks[check.Name]; ok { mErr.Errors = append(mErr.Errors, fmt.Errorf("check %q is duplicate", check.Name)) } knownChecks[check.Name] = struct{}{} + + if !check.RequiresPort() { + // No need to continue validating check if it doesn't need a port + continue + } + + effectivePort := check.PortLabel + if effectivePort == "" { + // Inherits from service + effectivePort = service.PortLabel + } + + if effectivePort == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("check %q is missing a port", check.Name)) + continue + } + + isNumeric := false + portNumber, err := strconv.Atoi(effectivePort) + if err == nil { + isNumeric = true + } + + // Numeric ports are fine for address_mode = "driver" + if check.AddressMode == "driver" && isNumeric { + if portNumber <= 0 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("check %q has invalid numeric port %d", check.Name, portNumber)) + } + continue + } + + if isNumeric { + mErr.Errors = append(mErr.Errors, fmt.Errorf(`check %q cannot use a numeric port %d without setting address_mode="driver"`, check.Name, portNumber)) + continue + } + + // PortLabel must exist, report errors by its parent service + addServicePort(effectivePort, service.Name) } } @@ -3483,7 +3567,14 @@ func validateServices(t *Task) error { for servicePort, services := range servicePorts { _, ok := portLabels[servicePort] if !ok { - joined := strings.Join(services, ", ") + names := make([]string, 0, len(services)) + for name := range services { + names = append(names, name) + } + + // Keep order deterministic + sort.Strings(names) + joined := strings.Join(names, ", ") err := fmt.Errorf("port label %q referenced by services %v does not exist", servicePort, joined) mErr.Errors = append(mErr.Errors, err) } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 3840b9cf40ed..07db00439ce6 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1180,6 +1180,102 @@ func TestTask_Validate_Service_Check(t *testing.T) { } } +// TestTask_Validate_Service_Check_AddressMode asserts that checks do not +// inherit address mode but do inherit ports. +func TestTask_Validate_Service_Check_AddressMode(t *testing.T) { + task := &Task{ + Resources: &Resources{ + Networks: []*NetworkResource{ + { + DynamicPorts: []Port{ + { + Label: "http", + Value: 9999, + }, + }, + }, + }, + }, + Services: []*Service{ + { + Name: "invalid-driver", + PortLabel: "80", + AddressMode: "host", + }, + { + Name: "http-driver", + PortLabel: "80", + AddressMode: "driver", + Checks: []*ServiceCheck{ + { + // Should fail + Name: "invalid-check-1", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + { + // Should fail + Name: "invalid-check-2", + Type: "tcp", + PortLabel: "80", + Interval: time.Second, + Timeout: time.Second, + }, + { + // Should fail + Name: "invalid-check-3", + Type: "tcp", + PortLabel: "missing-port-label", + Interval: time.Second, + Timeout: time.Second, + }, + { + // Should pass + Name: "valid-script-check", + Type: "script", + Command: "ok", + Interval: time.Second, + Timeout: time.Second, + }, + { + // Should pass + Name: "valid-host-check", + Type: "tcp", + PortLabel: "http", + Interval: time.Second, + Timeout: time.Second, + }, + { + // Should pass + Name: "valid-driver-check", + Type: "tcp", + AddressMode: "driver", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + }, + } + err := validateServices(task) + if err == nil { + t.Fatalf("expected errors but task validated successfully") + } + errs := err.(*multierror.Error).Errors + if expected := 4; len(errs) != expected { + for i, err := range errs { + t.Logf("errs[%d] -> %s", i, err) + } + t.Fatalf("expected %d errors but found %d", expected, len(errs)) + } + + assert.Contains(t, errs[0].Error(), `check "invalid-check-1" cannot use a numeric port`) + assert.Contains(t, errs[1].Error(), `check "invalid-check-2" cannot use a numeric port`) + assert.Contains(t, errs[2].Error(), `port label "80" referenced`) + assert.Contains(t, errs[3].Error(), `port label "missing-port-label" referenced`) +} + func TestTask_Validate_Service_Check_CheckRestart(t *testing.T) { invalidCheckRestart := &CheckRestart{ Limit: -1, diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index cdc415eb12df..02296ff76a89 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1893,7 +1893,7 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) // Verify the network did not change - rp := structs.Port{Label: "main", Value: 5000} + rp := structs.Port{Label: "admin", Value: 5000} for _, alloc := range out { for _, resources := range alloc.TaskResources { if resources.Networks[0].ReservedPorts[0] != rp { diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 2b25fbb56c5d..be43026a1a54 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -777,7 +777,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) // Verify the network did not change - rp := structs.Port{Label: "main", Value: 5000} + rp := structs.Port{Label: "admin", Value: 5000} for _, alloc := range out { for _, resources := range alloc.TaskResources { if resources.Networks[0].ReservedPorts[0] != rp { diff --git a/website/source/docs/job-specification/service.html.md b/website/source/docs/job-specification/service.html.md index 9f96cac4fb8c..9707d8ec47f9 100644 --- a/website/source/docs/job-specification/service.html.md +++ b/website/source/docs/job-specification/service.html.md @@ -98,20 +98,32 @@ does not automatically enable service discovery. - `port` `(string: )` - Specifies the label of the port on which this service is running. Note this is the _label_ of the port and not the port - number. The port label must match one defined in the [`network`][network] - stanza. + number unless `address_mode = driver`. The port label must match one defined + in the [`network`][network] stanza unless you're also using + `address_mode="driver"`. Numeric ports may be used when in driver addressing + mode. - `tags` `(array: [])` - Specifies the list of tags to associate with this service. If this is not supplied, no tags will be assigned to the service when it is registered. - `address_mode` `(string: "auto")` - Specifies what address (host or - driver-specific) this service should advertise. `host` indicates the host IP - and port. `driver` advertises the IP used in the driver (e.g. Docker's internal - IP) and uses the ports specified in the port map. The default is `auto` which - behaves the same as `host` unless the driver determines its IP should be used. - This setting supported Docker since Nomad 0.6 and rkt since Nomad 0.7. It - will advertise the container IP if a network plugin is used (e.g. weave). + driver-specific) this service should advertise. This setting is supported in + Docker since Nomad 0.6 and rkt since Nomad 0.7. See [below for + examples.](#using-driver-address-mode) Valid options are: + + - `auto` - Allows the driver to determine whether the host or driver address + should be used. Defaults to `host` and only implemented by Docker. If you + use a Docker network plugin such as weave, Docker will automatically use + its address. + + - `driver` - Use the IP specified by the driver, and the port specified in a + port map. A numeric port may be specified since port maps aren't required + by all network plugins. Useful for advertising SDN and overlay network + addresses. Task will fail if driver network cannot be determined. Only + implemented for Docker and rkt. + + - `host` - Use the host IP and port. ### `check` Parameters @@ -120,6 +132,13 @@ the script will run inside the Docker container. If your task is running in a chroot, it will run in the chroot. Please keep this in mind when authoring check scripts. +- `address_mode` `(string: "host")` - Same as `address_mode` on `service`. + Unlike services, checks do not have an `auto` address mode as there's no way + for Nomad to know which is the best address to use for checks. Consul needs + access to the address for any HTTP or TCP checks. Added in Nomad 0.7.1. See + [below for details.](#using-driver-address-mode) Unlike `port`, this setting + is *not* inherited from the `service`. + - `args` `(array: [])` - Specifies additional arguments to the `command`. This only applies to script-based health checks. @@ -157,11 +176,13 @@ scripts. - `port` `(string: )` - Specifies the label of the port on which the check will be performed. Note this is the _label_ of the port and not the port - number. The port label must match one defined in the [`network`][network] - stanza. If a port value was declared on the `service`, this will inherit from - that value if not supplied. If supplied, this value takes precedence over the - `service.port` value. This is useful for services which operate on multiple - ports. Checks will *always use the host IP and ports*. + number unless `address_mode = driver`. The port label must match one defined + in the [`network`][network] stanza. If a port value was declared on the + `service`, this will inherit from that value if not supplied. If supplied, + this value takes precedence over the `service.port` value. This is useful for + services which operate on multiple ports. Checks will use the host IP and + ports by default. In Nomad 0.7.1 or later numeric ports may be used if + `address_mode="driver"` is set on the check. - `protocol` `(string: "http")` - Specifies the protocol for the http-based health checks. Valid options are `http` and `https`. @@ -324,6 +345,123 @@ service { } ``` +### Using Driver Address Mode + +The [Docker](/docs/drivers/docker.html#network_mode) and +[rkt](/docs/drivers/rkt.html#net) drivers support the `driver` setting for the +`address_mode` parameter in both `service` and `check` stanzas. The driver +address mode allows advertising and health checking the IP and port assigned to +a task by the driver. This way if you're using a network plugin like Weave with +Docker, you can advertise the Weave address in Consul instead of the host's +address. + +For example if you were running the example Redis job in an environment with +Weave but Consul was running on the host you could use the following +configuration: + +```hcl +job "example" { + datacenters = ["dc1"] + group "cache" { + + task "redis" { + driver = "docker" + + config { + image = "redis:3.2" + network_mode = "weave" + port_map { + db = 6379 + } + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + mbits = 10 + port "db" {} + } + } + + service { + name = "weave-redis" + port = "db" + check { + name = "host-redis-check" + type = "tcp" + interval = "10s" + timeout = "2s" + } + } + } + } +} +``` + +No explicit `address_mode` required! + +Services default to the `auto` address mode. When a Docker network mode other +than "host" or "bridge" is used, services will automatically advertise the +driver's address (in this case Weave's). The service will advertise the +container's port: 6379. + +However since Consul is often run on the host without access to the Weave +network, `check` stanzas default to `host` address mode. The TCP check will run +against the host's IP and the dynamic host port assigned by Nomad. + +Note that the `check` still inherits the `service` stanza's `db` port label, +but each will resolve the port label according to their address mode. + +If Consul has access to the Weave network the job could be configured like +this: + +```hcl +job "example" { + datacenters = ["dc1"] + group "cache" { + + task "redis" { + driver = "docker" + + config { + image = "redis:3.2" + network_mode = "weave" + # No port map required! + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + mbits = 10 + } + } + + service { + name = "weave-redis" + port = 6379 + address_mode = "driver" + check { + name = "host-redis-check" + type = "tcp" + interval = "10s" + timeout = "2s" + port = 6379 + + address_mode = "driver" + } + } + } + } +} +``` + +In this case Nomad doesn't need to assign Redis any host ports. The `service` +and `check` stanzas can both specify the port number to advertise and check +directly since Nomad isn't managing any port assignments. + + - - - 1 Script checks are not supported for the