diff --git a/CHANGELOG.md b/CHANGELOG.md index 049a3e48592f..9420f27619ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ IMPROVEMENTS: * deployment: Emit task events explaining unhealthy allocations[GH-3025] * deployment: Better description when a deployment should auto-revert but there is no target [GH-3024] + * discovery: Add HTTP header and method support to checks [GH-3031] * driver/docker: Added DNS options [GH-2992] * driver/rkt: support read-only volume mounts [GH-2883] * jobspec: Add `shutdown_delay` so tasks can delay shutdown after diff --git a/api/tasks.go b/api/tasks.go index f91ca621ff5b..d736f71d5b85 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -94,6 +94,8 @@ type ServiceCheck struct { Timeout time.Duration InitialStatus string `mapstructure:"initial_status"` TLSSkipVerify bool `mapstructure:"tls_skip_verify"` + Header map[string][]string + Method string } // The Service model represents a Consul service definition diff --git a/client/task_runner.go b/client/task_runner.go index 987578e843d5..c953a76c8f8c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1450,6 +1450,18 @@ func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) *structs.Task check.Protocol = taskEnv.ReplaceEnv(check.Protocol) check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel) check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus) + check.Method = taskEnv.ReplaceEnv(check.Method) + if len(check.Header) > 0 { + header := make(map[string][]string, len(check.Header)) + for k, vs := range check.Header { + newVals := make([]string, len(vs)) + for i, v := range vs { + newVals[i] = taskEnv.ReplaceEnv(v) + } + header[taskEnv.ReplaceEnv(k)] = newVals + } + check.Header = header + } } service.Name = taskEnv.ReplaceEnv(service.Name) service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel) diff --git a/client/task_runner_test.go b/client/task_runner_test.go index dffcc423842a..6894115e3376 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "syscall" "testing" "time" @@ -17,11 +18,13 @@ import ( "github.com/golang/snappy" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" ) func testLogger() *log.Logger { @@ -1615,6 +1618,87 @@ func TestTaskRunner_Pre06ScriptCheck(t *testing.T) { t.Run(run("0.5.6", "mock_driver", "tcp", false)) } +func TestTaskRunner_interpolateServices(t *testing.T) { + t.Parallel() + task := &structs.Task{ + Services: []*structs.Service{ + { + Name: "${name}", + PortLabel: "${portlabel}", + Tags: []string{"${tags}"}, + Checks: []*structs.ServiceCheck{ + { + Name: "${checkname}", + Type: "${checktype}", + Command: "${checkcmd}", + Args: []string{"${checkarg}"}, + Path: "${checkstr}", + Protocol: "${checkproto}", + PortLabel: "${checklabel}", + InitialStatus: "${checkstatus}", + Method: "${checkmethod}", + Header: map[string][]string{ + "${checkheaderk}": {"${checkheaderv}"}, + }, + }, + }, + }, + }, + } + + env := &env.TaskEnv{ + EnvMap: map[string]string{ + "name": "name", + "portlabel": "portlabel", + "tags": "tags", + "checkname": "checkname", + "checktype": "checktype", + "checkcmd": "checkcmd", + "checkarg": "checkarg", + "checkstr": "checkstr", + "checkpath": "checkpath", + "checkproto": "checkproto", + "checklabel": "checklabel", + "checkstatus": "checkstatus", + "checkmethod": "checkmethod", + "checkheaderk": "checkheaderk", + "checkheaderv": "checkheaderv", + }, + } + + interpTask := interpolateServices(env, task) + + exp := &structs.Task{ + Services: []*structs.Service{ + { + Name: "name", + PortLabel: "portlabel", + Tags: []string{"tags"}, + Checks: []*structs.ServiceCheck{ + { + Name: "checkname", + Type: "checktype", + Command: "checkcmd", + Args: []string{"checkarg"}, + Path: "checkstr", + Protocol: "checkproto", + PortLabel: "checklabel", + InitialStatus: "checkstatus", + Method: "checkmethod", + Header: map[string][]string{ + "checkheaderk": {"checkheaderv"}, + }, + }, + }, + }, + }, + } + + if diff := pretty.Diff(interpTask, exp); len(diff) > 0 { + t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n")) + } +} + func TestTaskRunner_ShutdownDelay(t *testing.T) { t.Parallel() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index ef556b31b923..8285785fbde0 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -1002,6 +1002,8 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host } url := base.ResolveReference(relative) chkReg.HTTP = url.String() + chkReg.Method = check.Method + chkReg.Header = check.Header case structs.ServiceCheckTCP: chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port)) case structs.ServiceCheckScript: diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 22973e740291..2a83d2989e9b 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -7,6 +7,7 @@ import ( "log" "os" "reflect" + "strings" "sync" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/hashicorp/consul/api" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" + "github.com/kr/pretty" ) const ( @@ -1352,3 +1354,47 @@ func TestIsNomadService(t *testing.T) { }) } } + +// TestCreateCheckReg asserts Nomad ServiceCheck structs are properly converted +// to Consul API AgentCheckRegistrations. +func TestCreateCheckReg(t *testing.T) { + check := &structs.ServiceCheck{ + Name: "name", + Type: "http", + Path: "/path", + PortLabel: "label", + Method: "POST", + Header: map[string][]string{ + "Foo": {"bar"}, + }, + } + + serviceID := "testService" + checkID := check.Hash(serviceID) + host := "localhost" + port := 41111 + + expected := &api.AgentCheckRegistration{ + ID: checkID, + Name: "name", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + Timeout: "0s", + Interval: "0s", + HTTP: fmt.Sprintf("http://%s:%d/path", host, port), + Method: "POST", + Header: map[string][]string{ + "Foo": {"bar"}, + }, + }, + } + + actual, err := createCheckReg(serviceID, checkID, check, host, port) + if err != nil { + t.Fatalf("err: %v", err) + } + + if diff := pretty.Diff(actual, expected); len(diff) > 0 { + t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n")) + } +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 02c26bbabb87..75a67ae3604b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -699,6 +699,8 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { Timeout: check.Timeout, InitialStatus: check.InitialStatus, TLSSkipVerify: check.TLSSkipVerify, + Header: check.Header, + Method: check.Method, } } } diff --git a/helper/funcs.go b/helper/funcs.go index ae26849164b4..0b079605911e 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -212,6 +212,21 @@ func CopyMapStringFloat64(m map[string]float64) map[string]float64 { return c } +// CopyMapStringSliceString copies a map of strings to string slices such as +// http.Header +func CopyMapStringSliceString(m map[string][]string) map[string][]string { + l := len(m) + if l == 0 { + return nil + } + + c := make(map[string][]string, l) + for k, v := range m { + c[k] = CopySliceString(v) + } + return c +} + func CopySliceString(s []string) []string { l := len(s) if l == 0 { diff --git a/helper/funcs_test.go b/helper/funcs_test.go index 507e948ae487..03bca8a1fb23 100644 --- a/helper/funcs_test.go +++ b/helper/funcs_test.go @@ -36,6 +36,24 @@ func TestMapStringStringSliceValueSet(t *testing.T) { } } +func TestCopyMapStringSliceString(t *testing.T) { + m := map[string][]string{ + "x": []string{"a", "b", "c"}, + "y": []string{"1", "2", "3"}, + "z": nil, + } + + c := CopyMapStringSliceString(m) + if !reflect.DeepEqual(c, m) { + t.Fatalf("%#v != %#v", m, c) + } + + c["x"][1] = "---" + if reflect.DeepEqual(c, m) { + t.Fatalf("Shared slices: %#v == %#v", m["x"], c["x"]) + } +} + func TestClearEnvVar(t *testing.T) { type testCase struct { input string diff --git a/jobspec/parse.go b/jobspec/parse.go index 795839309e70..14815ab6d6bc 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -962,6 +962,8 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error { "args", "initial_status", "tls_skip_verify", + "header", + "method", } if err := checkHCLKeys(co.Val, valid); err != nil { return multierror.Prefix(err, "check ->") @@ -972,6 +974,37 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error { if err := hcl.DecodeObject(&cm, co.Val); err != nil { return err } + + // HCL allows repeating stanzas so merge 'header' into a single + // map[string][]string. + if headerI, ok := cm["header"]; ok { + headerRaw, ok := headerI.([]map[string]interface{}) + if !ok { + return fmt.Errorf("check -> header -> expected a []map[string][]string but found %T", headerI) + } + m := map[string][]string{} + for _, rawm := range headerRaw { + for k, vI := range rawm { + vs, ok := vI.([]interface{}) + if !ok { + return fmt.Errorf("check -> header -> %q expected a []string but found %T", k, vI) + } + for _, vI := range vs { + v, ok := vI.(string) + if !ok { + return fmt.Errorf("check -> header -> %q expected a string but found %T", k, vI) + } + m[k] = append(m[k], v) + } + } + } + + check.Header = m + + // Remove "header" as it has been parsed + delete(cm, "header") + } + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ DecodeHook: mapstructure.StringToTimeDurationHookFunc(), WeaklyTypedInput: true, diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 80b67d4db90b..249f835985e1 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -450,9 +450,14 @@ func TestParse(t *testing.T) { { Name: "check-name", Type: "http", + Path: "/", Interval: 10 * time.Second, Timeout: 2 * time.Second, InitialStatus: capi.HealthPassing, + Method: "POST", + Header: map[string][]string{ + "Authorization": {"Basic ZWxhc3RpYzpjaGFuZ2VtZQ=="}, + }, }, }, }, @@ -464,6 +469,16 @@ func TestParse(t *testing.T) { }, false, }, + { + "service-check-bad-header.hcl", + nil, + true, + }, + { + "service-check-bad-header-2.hcl", + nil, + true, + }, { // TODO This should be pushed into the API "vault_inheritance.hcl", diff --git a/jobspec/test-fixtures/service-check-bad-header-2.hcl b/jobspec/test-fixtures/service-check-bad-header-2.hcl new file mode 100644 index 000000000000..c566925b3c19 --- /dev/null +++ b/jobspec/test-fixtures/service-check-bad-header-2.hcl @@ -0,0 +1,28 @@ +job "check_bad_header" { + type = "service" + group "group" { + count = 1 + + task "task" { + service { + tags = ["bar"] + port = "http" + + check { + name = "check-name" + type = "http" + path = "/" + method = "POST" + interval = "10s" + timeout = "2s" + initial_status = "passing" + + header { + Authorization = ["ok", 840] + } + } + } + } + } +} + diff --git a/jobspec/test-fixtures/service-check-bad-header.hcl b/jobspec/test-fixtures/service-check-bad-header.hcl new file mode 100644 index 000000000000..6ad3f13ce5a1 --- /dev/null +++ b/jobspec/test-fixtures/service-check-bad-header.hcl @@ -0,0 +1,28 @@ +job "check_bad_header" { + type = "service" + group "group" { + count = 1 + + task "task" { + service { + tags = ["bar"] + port = "http" + + check { + name = "check-name" + type = "http" + path = "/" + method = "POST" + interval = "10s" + timeout = "2s" + initial_status = "passing" + + header { + Authorization = "Should be a []string!" + } + } + } + } + } +} + diff --git a/jobspec/test-fixtures/service-check-initial-status.hcl b/jobspec/test-fixtures/service-check-initial-status.hcl index 3b919aeafdde..e275001becd9 100644 --- a/jobspec/test-fixtures/service-check-initial-status.hcl +++ b/jobspec/test-fixtures/service-check-initial-status.hcl @@ -11,9 +11,15 @@ job "check_initial_status" { check { name = "check-name" type = "http" + path = "/" + method = "POST" interval = "10s" timeout = "2s" initial_status = "passing" + + header { + Authorization = ["Basic ZWxhc3RpYzpjaGFuZ2VtZQ=="] + } } } } diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 1d8dc9ea38d5..b8ced9180fd6 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -592,6 +592,32 @@ func serviceCheckDiff(old, new *ServiceCheck, contextual bool) *ObjectDiff { // Diff the primitive fields. diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) + + // Diff Header + if headerDiff := checkHeaderDiff(old.Header, new.Header, contextual); headerDiff != nil { + diff.Objects = append(diff.Objects, headerDiff) + } + + return diff +} + +// checkHeaderDiff returns the diff of two service check header objects. If +// contextual diff is enabled, all fields will be returned, even if no diff +// occurred. +func checkHeaderDiff(old, new map[string][]string, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Header"} + if reflect.DeepEqual(old, new) { + return nil + } else if len(old) == 0 { + diff.Type = DiffTypeAdded + } else if len(new) == 0 { + diff.Type = DiffTypeDeleted + } else { + diff.Type = DiffTypeEdited + } + oldFlat := flatmap.Flatten(old, nil, false) + newFlat := flatmap.Flatten(new, nil, false) + diff.Fields = fieldDiffs(oldFlat, newFlat, contextual) return diff } @@ -609,17 +635,17 @@ func serviceCheckDiffs(old, new []*ServiceCheck, contextual bool) []*ObjectDiff } var diffs []*ObjectDiff - for name, oldService := range oldMap { + for name, oldCheck := range oldMap { // Diff the same, deleted and edited - if diff := serviceCheckDiff(oldService, newMap[name], contextual); diff != nil { + if diff := serviceCheckDiff(oldCheck, newMap[name], contextual); diff != nil { diffs = append(diffs, diff) } } - for name, newService := range newMap { + for name, newCheck := range newMap { // Diff the added if old, ok := oldMap[name]; !ok { - if diff := serviceCheckDiff(old, newService, contextual); diff != nil { + if diff := serviceCheckDiff(old, newCheck, contextual); diff != nil { diffs = append(diffs, diff) } } diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 1281eaa6b655..f41692ba3dde 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -3410,6 +3410,9 @@ func TestTaskDiff(t *testing.T) { Interval: 1 * time.Second, Timeout: 1 * time.Second, InitialStatus: "critical", + Header: map[string][]string{ + "Foo": {"bar"}, + }, }, }, }, @@ -3430,6 +3433,11 @@ func TestTaskDiff(t *testing.T) { Interval: 1 * time.Second, Timeout: 1 * time.Second, InitialStatus: "passing", + Method: "POST", + Header: map[string][]string{ + "Foo": {"bar", "baz"}, + "Eggs": {"spam"}, + }, }, }, }, @@ -3484,6 +3492,12 @@ func TestTaskDiff(t *testing.T) { Old: "1000000000", New: "1000000000", }, + { + Type: DiffTypeAdded, + Name: "Method", + Old: "", + New: "POST", + }, { Type: DiffTypeNone, Name: "Name", @@ -3527,6 +3541,32 @@ func TestTaskDiff(t *testing.T) { New: "tcp", }, }, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Header", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Eggs[0]", + Old: "", + New: "spam", + }, + { + Type: DiffTypeNone, + Name: "Foo[0]", + Old: "bar", + New: "bar", + }, + { + Type: DiffTypeAdded, + Name: "Foo[1]", + Old: "", + New: "baz", + }, + }, + }, + }, }, }, }, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cff045fe320a..1eeb4f74c488 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2669,17 +2669,19 @@ const ( // The ServiceCheck data model represents the consul health check that // Nomad registers for a Task type ServiceCheck struct { - Name string // Name of the check, defaults to id - Type string // Type of the check - tcp, http, docker and script - Command string // Command is the command to run for script checks - Args []string // Args is a list of argumes for script checks - Path string // path of the health check url for http type check - Protocol string // Protocol to use if check is http, defaults to http - PortLabel string // The port to use for tcp/http checks - Interval time.Duration // Interval of the check - Timeout time.Duration // Timeout of the response from the check before consul fails the check - InitialStatus string // Initial status of the check - TLSSkipVerify bool // Skip TLS verification when Protocol=https + Name string // Name of the check, defaults to id + Type string // Type of the check - tcp, http, docker and script + Command string // Command is the command to run for script checks + Args []string // Args is a list of argumes for script checks + Path string // path of the health check url for http type check + Protocol string // Protocol to use if check is http, defaults to http + PortLabel string // The port to use for tcp/http checks + Interval time.Duration // Interval of the check + Timeout time.Duration // Timeout of the response from the check before consul fails the check + InitialStatus string // Initial status of the check + TLSSkipVerify bool // Skip TLS verification when Protocol=https + Method string // HTTP Method to use (GET by default) + Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks } func (sc *ServiceCheck) Copy() *ServiceCheck { @@ -2688,16 +2690,28 @@ func (sc *ServiceCheck) Copy() *ServiceCheck { } nsc := new(ServiceCheck) *nsc = *sc + nsc.Args = helper.CopySliceString(sc.Args) + nsc.Header = helper.CopyMapStringSliceString(sc.Header) return nsc } func (sc *ServiceCheck) Canonicalize(serviceName string) { - // Ensure empty slices are treated as null to avoid scheduling issues when - // using DeepEquals. + // Ensure empty maps/slices are treated as null to avoid scheduling + // issues when using DeepEquals. if len(sc.Args) == 0 { sc.Args = nil } + if len(sc.Header) == 0 { + sc.Header = nil + } else { + for k, v := range sc.Header { + if len(v) == 0 { + sc.Header[k] = nil + } + } + } + if sc.Name == "" { sc.Name = fmt.Sprintf("service: %q check", serviceName) } @@ -2772,10 +2786,23 @@ func (sc *ServiceCheck) Hash(serviceID string) string { io.WriteString(h, sc.PortLabel) io.WriteString(h, sc.Interval.String()) io.WriteString(h, sc.Timeout.String()) + io.WriteString(h, sc.Method) // Only include TLSSkipVerify if set to maintain ID stability with Nomad <0.6 if sc.TLSSkipVerify { io.WriteString(h, "true") } + + // Since map iteration order isn't stable we need to write k/v pairs to + // a slice and sort it before hashing. + if len(sc.Header) > 0 { + headers := make([]string, 0, len(sc.Header)) + for k, v := range sc.Header { + headers = append(headers, k+strings.Join(v, "")) + } + sort.Strings(headers) + io.WriteString(h, strings.Join(headers, "")) + } + return fmt.Sprintf("%x", h.Sum(nil)) } diff --git a/vendor/github.com/hashicorp/consul/api/acl.go b/vendor/github.com/hashicorp/consul/api/acl.go index 15d1f9f5aa9f..6ea0a752e581 100644 --- a/vendor/github.com/hashicorp/consul/api/acl.go +++ b/vendor/github.com/hashicorp/consul/api/acl.go @@ -42,6 +42,24 @@ func (c *Client) ACL() *ACL { return &ACL{c} } +// Bootstrap is used to perform a one-time ACL bootstrap operation on a cluster +// to get the first management token. +func (a *ACL) Bootstrap() (string, *WriteMeta, error) { + r := a.c.newRequest("PUT", "/v1/acl/bootstrap") + rtt, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return "", nil, err + } + defer resp.Body.Close() + + wm := &WriteMeta{RequestTime: rtt} + var out struct{ ID string } + if err := decodeBody(resp, &out); err != nil { + return "", nil, err + } + return out.ID, wm, nil +} + // Create is used to generate a new token with the given parameters func (a *ACL) Create(acl *ACLEntry, q *WriteOptions) (string, *WriteMeta, error) { r := a.c.newRequest("PUT", "/v1/acl/create") diff --git a/vendor/github.com/hashicorp/consul/api/agent.go b/vendor/github.com/hashicorp/consul/api/agent.go index 35c0676b517b..be4e00ff6b3b 100644 --- a/vendor/github.com/hashicorp/consul/api/agent.go +++ b/vendor/github.com/hashicorp/consul/api/agent.go @@ -67,17 +67,19 @@ type AgentCheckRegistration struct { // AgentServiceCheck is used to define a node or service level check type AgentServiceCheck struct { - Script string `json:",omitempty"` - DockerContainerID string `json:",omitempty"` - Shell string `json:",omitempty"` // Only supported for Docker. - Interval string `json:",omitempty"` - Timeout string `json:",omitempty"` - TTL string `json:",omitempty"` - HTTP string `json:",omitempty"` - TCP string `json:",omitempty"` - Status string `json:",omitempty"` - Notes string `json:",omitempty"` - TLSSkipVerify bool `json:",omitempty"` + Script string `json:",omitempty"` + DockerContainerID string `json:",omitempty"` + Shell string `json:",omitempty"` // Only supported for Docker. + Interval string `json:",omitempty"` + Timeout string `json:",omitempty"` + TTL string `json:",omitempty"` + HTTP string `json:",omitempty"` + Header map[string][]string `json:",omitempty"` + Method string `json:",omitempty"` + TCP string `json:",omitempty"` + Status string `json:",omitempty"` + Notes string `json:",omitempty"` + TLSSkipVerify bool `json:",omitempty"` // In Consul 0.7 and later, checks that are associated with a service // may also contain this optional DeregisterCriticalServiceAfter field, @@ -89,6 +91,47 @@ type AgentServiceCheck struct { } type AgentServiceChecks []*AgentServiceCheck +// AgentToken is used when updating ACL tokens for an agent. +type AgentToken struct { + Token string +} + +// Metrics info is used to store different types of metric values from the agent. +type MetricsInfo struct { + Timestamp string + Gauges []GaugeValue + Points []PointValue + Counters []SampledValue + Samples []SampledValue +} + +// GaugeValue stores one value that is updated as time goes on, such as +// the amount of memory allocated. +type GaugeValue struct { + Name string + Value float32 + Labels map[string]string +} + +// PointValue holds a series of points for a metric. +type PointValue struct { + Name string + Points []float32 +} + +// SampledValue stores info about a metric that is incremented over time, +// such as the number of requests to an HTTP endpoint. +type SampledValue struct { + Name string + Count int + Sum float64 + Min float64 + Max float64 + Mean float64 + Stddev float64 + Labels map[string]string +} + // Agent can be used to query the Agent endpoints type Agent struct { c *Client @@ -119,6 +162,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) { return out, nil } +// Metrics is used to query the agent we are speaking to for +// its current internal metric data +func (a *Agent) Metrics() (*MetricsInfo, error) { + r := a.c.newRequest("GET", "/v1/agent/metrics") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out *MetricsInfo + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +} + // Reload triggers a configuration reload for the agent we are connected to. func (a *Agent) Reload() error { r := a.c.newRequest("PUT", "/v1/agent/reload") @@ -439,8 +499,9 @@ func (a *Agent) DisableNodeMaintenance() error { // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop the -// log stream -func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions) (chan string, error) { +// log stream. An empty string will be sent down the given channel when there's +// nothing left to stream, after which the caller should close the stopCh. +func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { r := a.c.newRequest("GET", "/v1/agent/monitor") r.setQueryOptions(q) if loglevel != "" { @@ -464,10 +525,61 @@ func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions) default: } if scanner.Scan() { - logCh <- scanner.Text() + // An empty string signals to the caller that + // the scan is done, so make sure we only emit + // that when the scanner says it's done, not if + // we happen to ingest an empty line. + if text := scanner.Text(); text != "" { + logCh <- text + } else { + logCh <- " " + } + } else { + logCh <- "" } } }() return logCh, nil } + +// UpdateACLToken updates the agent's "acl_token". See updateToken for more +// details. +func (c *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) { + return c.updateToken("acl_token", token, q) +} + +// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken +// for more details. +func (c *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) { + return c.updateToken("acl_agent_token", token, q) +} + +// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See +// updateToken for more details. +func (c *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) { + return c.updateToken("acl_agent_master_token", token, q) +} + +// UpdateACLReplicationToken updates the agent's "acl_replication_token". See +// updateToken for more details. +func (c *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) { + return c.updateToken("acl_replication_token", token, q) +} + +// updateToken can be used to update an agent's ACL token after the agent has +// started. The tokens are not persisted, so will need to be updated again if +// the agent is restarted. +func (c *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) { + r := c.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target)) + r.setWriteOptions(q) + r.obj = &AgentToken{Token: token} + rtt, resp, err := requireOK(c.c.doRequest(r)) + if err != nil { + return nil, err + } + resp.Body.Close() + + wm := &WriteMeta{RequestTime: rtt} + return wm, nil +} diff --git a/vendor/github.com/hashicorp/consul/api/api.go b/vendor/github.com/hashicorp/consul/api/api.go index 6f90ed5d60e6..0a62b4f68d7c 100644 --- a/vendor/github.com/hashicorp/consul/api/api.go +++ b/vendor/github.com/hashicorp/consul/api/api.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "log" "net" "net/http" @@ -104,6 +105,26 @@ type QueryOptions struct { // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 + + // ctx is an optional context pass through to the underlying HTTP + // request layer. Use Context() and WithContext() to manage this. + ctx context.Context +} + +func (o *QueryOptions) Context() context.Context { + if o != nil && o.ctx != nil { + return o.ctx + } + return context.Background() +} + +func (o *QueryOptions) WithContext(ctx context.Context) *QueryOptions { + o2 := new(QueryOptions) + if o != nil { + *o2 = *o + } + o2.ctx = ctx + return o2 } // WriteOptions are used to parameterize a write @@ -120,6 +141,26 @@ type WriteOptions struct { // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 + + // ctx is an optional context pass through to the underlying HTTP + // request layer. Use Context() and WithContext() to manage this. + ctx context.Context +} + +func (o *WriteOptions) Context() context.Context { + if o != nil && o.ctx != nil { + return o.ctx + } + return context.Background() +} + +func (o *WriteOptions) WithContext(ctx context.Context) *WriteOptions { + o2 := new(WriteOptions) + if o != nil { + *o2 = *o + } + o2.ctx = ctx + return o2 } // QueryMeta is used to return meta data about a query @@ -456,6 +497,7 @@ type request struct { body io.Reader header http.Header obj interface{} + ctx context.Context } // setQueryOptions is used to annotate the request with @@ -493,6 +535,7 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.RelayFactor != 0 { r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor))) } + r.ctx = q.ctx } // durToMsec converts a duration to a millisecond specified string. If the @@ -537,6 +580,7 @@ func (r *request) setWriteOptions(q *WriteOptions) { if q.RelayFactor != 0 { r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor))) } + r.ctx = q.ctx } // toHTTP converts the request to an HTTP request @@ -568,8 +612,11 @@ func (r *request) toHTTP() (*http.Request, error) { if r.config.HttpAuth != nil { req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) } - - return req, nil + if r.ctx != nil { + return req.WithContext(r.ctx), nil + } else { + return req, nil + } } // newRequest is used to create a new request @@ -648,6 +695,8 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (* if err := decodeBody(resp, &out); err != nil { return nil, err } + } else if _, err := ioutil.ReadAll(resp.Body); err != nil { + return nil, err } return wm, nil } diff --git a/vendor/github.com/hashicorp/consul/api/operator_autopilot.go b/vendor/github.com/hashicorp/consul/api/operator_autopilot.go index 59471ecf996e..0fa9d1604030 100644 --- a/vendor/github.com/hashicorp/consul/api/operator_autopilot.go +++ b/vendor/github.com/hashicorp/consul/api/operator_autopilot.go @@ -39,6 +39,10 @@ type AutopilotConfiguration struct { // cluster before promoting them to voters. DisableUpgradeMigration bool + // (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when + // performing upgrade migrations. If left blank, the Consul version will be used. + UpgradeVersionTag string + // CreateIndex holds the index corresponding the creation of this configuration. // This is a read-only field. CreateIndex uint64 diff --git a/vendor/github.com/hashicorp/consul/api/session.go b/vendor/github.com/hashicorp/consul/api/session.go index 36e99a389e8d..1613f11a60cc 100644 --- a/vendor/github.com/hashicorp/consul/api/session.go +++ b/vendor/github.com/hashicorp/consul/api/session.go @@ -145,7 +145,9 @@ func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, // RenewPeriodic is used to periodically invoke Session.Renew on a // session until a doneCh is closed. This is meant to be used in a long running // goroutine to ensure a session stays valid. -func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error { +func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh <-chan struct{}) error { + ctx := q.Context() + ttl, err := time.ParseDuration(initialTTL) if err != nil { return err @@ -179,6 +181,11 @@ func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, d // Attempt a session destroy s.Destroy(id, q) return nil + + case <-ctx.Done(): + // Bail immediately since attempting the destroy would + // use the canceled context in q, which would just bail. + return ctx.Err() } } } diff --git a/vendor/github.com/hashicorp/consul/lib/cluster.go b/vendor/github.com/hashicorp/consul/lib/cluster.go index a95232c5737b..d65938e27324 100644 --- a/vendor/github.com/hashicorp/consul/lib/cluster.go +++ b/vendor/github.com/hashicorp/consul/lib/cluster.go @@ -31,7 +31,7 @@ func DurationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter return min, max } -// Returns a random stagger interval between 0 and the duration +// RandomStagger returns an interval between 0 and the duration func RandomStagger(intv time.Duration) time.Duration { if intv == 0 { return 0 diff --git a/vendor/github.com/hashicorp/consul/testutil/README.md b/vendor/github.com/hashicorp/consul/testutil/README.md index da5d682ca338..bd84f822d3df 100644 --- a/vendor/github.com/hashicorp/consul/testutil/README.md +++ b/vendor/github.com/hashicorp/consul/testutil/README.md @@ -58,6 +58,9 @@ func TestFoo_bar(t *testing.T) { // Create a service srv1.AddService(t, "redis", structs.HealthPassing, []string{"master"}) + // Create a service that will be accessed in target source code + srv1.AddAccessibleService("redis", structs.HealthPassing, "127.0.0.1", 6379, []string{"master"}) + // Create a service check srv1.AddCheck(t, "service:redis", "redis", structs.HealthPassing) diff --git a/vendor/github.com/hashicorp/consul/testutil/io.go b/vendor/github.com/hashicorp/consul/testutil/io.go new file mode 100644 index 000000000000..7d0ca6effcc5 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/testutil/io.go @@ -0,0 +1,61 @@ +package testutil + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + "testing" +) + +// tmpdir is the base directory for all temporary directories +// and files created with TempDir and TempFile. This could be +// achieved by setting a system environment variable but then +// the test execution would depend on whether or not the +// environment variable is set. +// +// On macOS the temp base directory is quite long and that +// triggers a problem with some tests that bind to UNIX sockets +// where the filename seems to be too long. Using a shorter name +// fixes this and makes the paths more readable. +// +// It also provides a single base directory for cleanup. +var tmpdir = "/tmp/consul-test" + +func init() { + if err := os.MkdirAll(tmpdir, 0755); err != nil { + fmt.Printf("Cannot create %s. Reverting to /tmp\n", tmpdir) + tmpdir = "/tmp" + } +} + +// TempDir creates a temporary directory within tmpdir +// with the name 'testname-name'. If the directory cannot +// be created t.Fatal is called. +func TempDir(t *testing.T, name string) string { + if t != nil && t.Name() != "" { + name = t.Name() + "-" + name + } + name = strings.Replace(name, "/", "_", -1) + d, err := ioutil.TempDir(tmpdir, name) + if err != nil { + t.Fatalf("err: %s", err) + } + return d +} + +// TempFile creates a temporary file within tmpdir +// with the name 'testname-name'. If the file cannot +// be created t.Fatal is called. If a temporary directory +// has been created before consider storing the file +// inside this directory to avoid double cleanup. +func TempFile(t *testing.T, name string) *os.File { + if t != nil && t.Name() != "" { + name = t.Name() + "-" + name + } + f, err := ioutil.TempFile(tmpdir, name) + if err != nil { + t.Fatalf("err: %s", err) + } + return f +} diff --git a/vendor/github.com/hashicorp/consul/testutil/retry/retry.go b/vendor/github.com/hashicorp/consul/testutil/retry/retry.go new file mode 100644 index 000000000000..cfbdde3c9db3 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/testutil/retry/retry.go @@ -0,0 +1,197 @@ +// Package retry provides support for repeating operations in tests. +// +// A sample retry operation looks like this: +// +// func TestX(t *testing.T) { +// retry.Run(t, func(r *retry.R) { +// if err := foo(); err != nil { +// r.Fatal("f: ", err) +// } +// }) +// } +// +package retry + +import ( + "bytes" + "fmt" + "runtime" + "strings" + "sync" + "time" +) + +// Failer is an interface compatible with testing.T. +type Failer interface { + // Log is called for the final test output + Log(args ...interface{}) + + // FailNow is called when the retrying is abandoned. + FailNow() +} + +// R provides context for the retryer. +type R struct { + fail bool + output []string +} + +func (r *R) FailNow() { + r.fail = true + runtime.Goexit() +} + +func (r *R) Fatal(args ...interface{}) { + r.log(fmt.Sprint(args...)) + r.FailNow() +} + +func (r *R) Fatalf(format string, args ...interface{}) { + r.log(fmt.Sprintf(format, args...)) + r.FailNow() +} + +func (r *R) Error(args ...interface{}) { + r.log(fmt.Sprint(args...)) + r.fail = true +} + +func (r *R) Check(err error) { + if err != nil { + r.log(err.Error()) + r.FailNow() + } +} + +func (r *R) log(s string) { + r.output = append(r.output, decorate(s)) +} + +func decorate(s string) string { + _, file, line, ok := runtime.Caller(3) + if ok { + n := strings.LastIndex(file, "/") + if n >= 0 { + file = file[n+1:] + } + } else { + file = "???" + line = 1 + } + return fmt.Sprintf("%s:%d: %s", file, line, s) +} + +func Run(t Failer, f func(r *R)) { + run(TwoSeconds(), t, f) +} + +func RunWith(r Retryer, t Failer, f func(r *R)) { + run(r, t, f) +} + +func dedup(a []string) string { + if len(a) == 0 { + return "" + } + m := map[string]int{} + for _, s := range a { + m[s] = m[s] + 1 + } + var b bytes.Buffer + for _, s := range a { + if _, ok := m[s]; ok { + b.WriteString(s) + b.WriteRune('\n') + delete(m, s) + } + } + return string(b.Bytes()) +} + +func run(r Retryer, t Failer, f func(r *R)) { + rr := &R{} + fail := func() { + out := dedup(rr.output) + if out != "" { + t.Log(out) + } + t.FailNow() + } + for r.NextOr(fail) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + f(rr) + }() + wg.Wait() + if rr.fail { + rr.fail = false + continue + } + break + } +} + +// TwoSeconds repeats an operation for two seconds and waits 25ms in between. +func TwoSeconds() *Timer { + return &Timer{Timeout: 2 * time.Second, Wait: 25 * time.Millisecond} +} + +// ThreeTimes repeats an operation three times and waits 25ms in between. +func ThreeTimes() *Counter { + return &Counter{Count: 3, Wait: 25 * time.Millisecond} +} + +// Retryer provides an interface for repeating operations +// until they succeed or an exit condition is met. +type Retryer interface { + // NextOr returns true if the operation should be repeated. + // Otherwise, it calls fail and returns false. + NextOr(fail func()) bool +} + +// Counter repeats an operation a given number of +// times and waits between subsequent operations. +type Counter struct { + Count int + Wait time.Duration + + count int +} + +func (r *Counter) NextOr(fail func()) bool { + if r.count == r.Count { + fail() + return false + } + if r.count > 0 { + time.Sleep(r.Wait) + } + r.count++ + return true +} + +// Timer repeats an operation for a given amount +// of time and waits between subsequent operations. +type Timer struct { + Timeout time.Duration + Wait time.Duration + + // stop is the timeout deadline. + // Set on the first invocation of Next(). + stop time.Time +} + +func (r *Timer) NextOr(fail func()) bool { + if r.stop.IsZero() { + r.stop = time.Now().Add(r.Timeout) + return true + } + if time.Now().After(r.stop) { + fail() + return false + } + time.Sleep(r.Wait) + return true +} diff --git a/vendor/github.com/hashicorp/consul/testutil/server.go b/vendor/github.com/hashicorp/consul/testutil/server.go index 47c12c709423..969d06a58481 100644 --- a/vendor/github.com/hashicorp/consul/testutil/server.go +++ b/vendor/github.com/hashicorp/consul/testutil/server.go @@ -21,9 +21,13 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "strconv" "strings" + "testing" + "time" + "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-uuid" "github.com/pkg/errors" @@ -56,32 +60,36 @@ type TestAddressConfig struct { // TestServerConfig is the main server configuration struct. type TestServerConfig struct { - NodeName string `json:"node_name"` - NodeID string `json:"node_id"` - NodeMeta map[string]string `json:"node_meta,omitempty"` - Performance *TestPerformanceConfig `json:"performance,omitempty"` - Bootstrap bool `json:"bootstrap,omitempty"` - Server bool `json:"server,omitempty"` - DataDir string `json:"data_dir,omitempty"` - Datacenter string `json:"datacenter,omitempty"` - DisableCheckpoint bool `json:"disable_update_check"` - LogLevel string `json:"log_level,omitempty"` - Bind string `json:"bind_addr,omitempty"` - Addresses *TestAddressConfig `json:"addresses,omitempty"` - Ports *TestPortConfig `json:"ports,omitempty"` - RaftProtocol int `json:"raft_protocol,omitempty"` - ACLMasterToken string `json:"acl_master_token,omitempty"` - ACLDatacenter string `json:"acl_datacenter,omitempty"` - ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` - ACLEnforceVersion8 bool `json:"acl_enforce_version_8"` - Encrypt string `json:"encrypt,omitempty"` - CAFile string `json:"ca_file,omitempty"` - CertFile string `json:"cert_file,omitempty"` - KeyFile string `json:"key_file,omitempty"` - VerifyIncoming bool `json:"verify_incoming,omitempty"` - VerifyOutgoing bool `json:"verify_outgoing,omitempty"` - Stdout, Stderr io.Writer `json:"-"` - Args []string `json:"-"` + NodeName string `json:"node_name"` + NodeID string `json:"node_id"` + NodeMeta map[string]string `json:"node_meta,omitempty"` + Performance *TestPerformanceConfig `json:"performance,omitempty"` + Bootstrap bool `json:"bootstrap,omitempty"` + Server bool `json:"server,omitempty"` + DataDir string `json:"data_dir,omitempty"` + Datacenter string `json:"datacenter,omitempty"` + DisableCheckpoint bool `json:"disable_update_check"` + LogLevel string `json:"log_level,omitempty"` + Bind string `json:"bind_addr,omitempty"` + Addresses *TestAddressConfig `json:"addresses,omitempty"` + Ports *TestPortConfig `json:"ports,omitempty"` + RaftProtocol int `json:"raft_protocol,omitempty"` + ACLMasterToken string `json:"acl_master_token,omitempty"` + ACLDatacenter string `json:"acl_datacenter,omitempty"` + ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` + ACLEnforceVersion8 bool `json:"acl_enforce_version_8"` + Encrypt string `json:"encrypt,omitempty"` + CAFile string `json:"ca_file,omitempty"` + CertFile string `json:"cert_file,omitempty"` + KeyFile string `json:"key_file,omitempty"` + VerifyIncoming bool `json:"verify_incoming,omitempty"` + VerifyIncomingRPC bool `json:"verify_incoming_rpc,omitempty"` + VerifyIncomingHTTPS bool `json:"verify_incoming_https,omitempty"` + VerifyOutgoing bool `json:"verify_outgoing,omitempty"` + EnableScriptChecks bool `json:"enable_script_checks,omitempty"` + ReadyTimeout time.Duration `json:"-"` + Stdout, Stderr io.Writer `json:"-"` + Args []string `json:"-"` } // ServerConfigCallback is a function interface which can be @@ -117,6 +125,7 @@ func defaultServerConfig() *TestServerConfig { Server: randomPort(), RPC: randomPort(), }, + ReadyTimeout: 10 * time.Second, } } @@ -162,68 +171,75 @@ type TestServer struct { LANAddr string WANAddr string - HttpClient *http.Client + HTTPClient *http.Client + + tmpdir string } // NewTestServer is an easy helper method to create a new Consul // test server with the most basic configuration. func NewTestServer() (*TestServer, error) { - return NewTestServerConfig(nil) + return NewTestServerConfigT(nil, nil) +} + +func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) { + return NewTestServerConfigT(nil, cb) } // NewTestServerConfig creates a new TestServer, and makes a call to an optional // callback function to modify the configuration. If there is an error // configuring or starting the server, the server will NOT be running when the // function returns (thus you do not need to stop it). -func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) { - if path, err := exec.LookPath("consul"); err != nil || path == "" { +func NewTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, error) { + var server *TestServer + retry.Run(t, func(r *retry.R) { + var err error + server, err = newTestServerConfigT(t, cb) + if err != nil { + r.Fatalf("failed starting test server: %v", err) + } + }) + return server, nil +} + +// newTestServerConfigT is the internal helper for NewTestServerConfigT. +func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, error) { + path, err := exec.LookPath("consul") + if err != nil || path == "" { return nil, fmt.Errorf("consul not found on $PATH - download and install " + "consul or skip this test") } - dataDir, err := ioutil.TempDir("", "consul") - if err != nil { - return nil, errors.Wrap(err, "failed creating tempdir") - } - - configFile, err := ioutil.TempFile(dataDir, "config") - if err != nil { - defer os.RemoveAll(dataDir) - return nil, errors.Wrap(err, "failed creating temp config") - } - - consulConfig := defaultServerConfig() - consulConfig.DataDir = dataDir - + tmpdir := TempDir(t, "consul") + cfg := defaultServerConfig() + cfg.DataDir = filepath.Join(tmpdir, "data") if cb != nil { - cb(consulConfig) + cb(cfg) } - configContent, err := json.Marshal(consulConfig) + b, err := json.Marshal(cfg) if err != nil { return nil, errors.Wrap(err, "failed marshaling json") } - if _, err := configFile.Write(configContent); err != nil { - defer configFile.Close() - defer os.RemoveAll(dataDir) + configFile := filepath.Join(tmpdir, "config.json") + if err := ioutil.WriteFile(configFile, b, 0644); err != nil { + defer os.RemoveAll(tmpdir) return nil, errors.Wrap(err, "failed writing config content") } - configFile.Close() stdout := io.Writer(os.Stdout) - if consulConfig.Stdout != nil { - stdout = consulConfig.Stdout + if cfg.Stdout != nil { + stdout = cfg.Stdout } - stderr := io.Writer(os.Stderr) - if consulConfig.Stderr != nil { - stderr = consulConfig.Stderr + if cfg.Stderr != nil { + stderr = cfg.Stderr } // Start the server - args := []string{"agent", "-config-file", configFile.Name()} - args = append(args, consulConfig.Args...) + args := []string{"agent", "-config-file", configFile} + args = append(args, cfg.Args...) cmd := exec.Command("consul", args...) cmd.Stdout = stdout cmd.Stderr = stderr @@ -231,86 +247,89 @@ func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) { return nil, errors.Wrap(err, "failed starting command") } - var httpAddr string - var client *http.Client - if strings.HasPrefix(consulConfig.Addresses.HTTP, "unix://") { - httpAddr = consulConfig.Addresses.HTTP - trans := cleanhttp.DefaultTransport() - trans.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", httpAddr[7:]) - } - client = &http.Client{ - Transport: trans, + httpAddr := fmt.Sprintf("127.0.0.1:%d", cfg.Ports.HTTP) + client := cleanhttp.DefaultClient() + if strings.HasPrefix(cfg.Addresses.HTTP, "unix://") { + httpAddr = cfg.Addresses.HTTP + tr := cleanhttp.DefaultTransport() + tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", httpAddr[len("unix://"):]) } - } else { - httpAddr = fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.HTTP) - client = cleanhttp.DefaultClient() + client = &http.Client{Transport: tr} } server := &TestServer{ - Config: consulConfig, + Config: cfg, cmd: cmd, HTTPAddr: httpAddr, - HTTPSAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.HTTPS), - LANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfLan), - WANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfWan), + HTTPSAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.HTTPS), + LANAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.SerfLan), + WANAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.SerfWan), + + HTTPClient: client, - HttpClient: client, + tmpdir: tmpdir, } // Wait for the server to be ready - var startErr error - if consulConfig.Bootstrap { - startErr = server.waitForLeader() + if cfg.Bootstrap { + err = server.waitForLeader() } else { - startErr = server.waitForAPI() + err = server.waitForAPI() } - if startErr != nil { + if err != nil { defer server.Stop() - return nil, errors.Wrap(startErr, "failed waiting for server to start") + return nil, errors.Wrap(err, "failed waiting for server to start") } - return server, nil } // Stop stops the test Consul server, and removes the Consul data // directory once we are done. func (s *TestServer) Stop() error { - defer os.RemoveAll(s.Config.DataDir) + defer os.RemoveAll(s.tmpdir) - if s.cmd != nil { - if s.cmd.Process != nil { - if err := s.cmd.Process.Kill(); err != nil { - return errors.Wrap(err, "failed to kill consul server") - } - } + // There was no process + if s.cmd == nil { + return nil + } - // wait for the process to exit to be sure that the data dir can be - // deleted on all platforms. - return s.cmd.Wait() + if s.cmd.Process != nil { + if err := s.cmd.Process.Signal(os.Interrupt); err != nil { + return errors.Wrap(err, "failed to kill consul server") + } } - // There was no process - return nil + // wait for the process to exit to be sure that the data dir can be + // deleted on all platforms. + return s.cmd.Wait() } +type failer struct { + failed bool +} + +func (f *failer) Log(args ...interface{}) { fmt.Println(args) } +func (f *failer) FailNow() { f.failed = true } + // waitForAPI waits for only the agent HTTP endpoint to start // responding. This is an indication that the agent has started, // but will likely return before a leader is elected. func (s *TestServer) waitForAPI() error { - if err := WaitForResult(func() (bool, error) { - resp, err := s.HttpClient.Get(s.url("/v1/agent/self")) + f := &failer{} + retry.Run(f, func(r *retry.R) { + resp, err := s.HTTPClient.Get(s.url("/v1/agent/self")) if err != nil { - return false, errors.Wrap(err, "failed http get") + r.Fatal(err) } defer resp.Body.Close() if err := s.requireOK(resp); err != nil { - return false, errors.Wrap(err, "failed OK response") + r.Fatal("failed OK respose", err) } - return true, nil - }); err != nil { - return errors.Wrap(err, "failed waiting for API") + }) + if f.failed { + return errors.New("failed waiting for API") } return nil } @@ -320,50 +339,55 @@ func (s *TestServer) waitForAPI() error { // 1 or more to be observed to confirm leader election is done. // It then waits to ensure the anti-entropy sync has completed. func (s *TestServer) waitForLeader() error { + f := &failer{} + timer := &retry.Timer{ + Timeout: s.Config.ReadyTimeout, + Wait: 250 * time.Millisecond, + } var index int64 - if err := WaitForResult(func() (bool, error) { + retry.RunWith(timer, f, func(r *retry.R) { // Query the API and check the status code. - url := s.url(fmt.Sprintf("/v1/catalog/nodes?index=%d&wait=2s", index)) - resp, err := s.HttpClient.Get(url) + url := s.url(fmt.Sprintf("/v1/catalog/nodes?index=%d", index)) + resp, err := s.HTTPClient.Get(url) if err != nil { - return false, errors.Wrap(err, "failed http get") + r.Fatal("failed http get", err) } defer resp.Body.Close() if err := s.requireOK(resp); err != nil { - return false, errors.Wrap(err, "failed OK response") + r.Fatal("failed OK response", err) } // Ensure we have a leader and a node registration. if leader := resp.Header.Get("X-Consul-KnownLeader"); leader != "true" { - return false, fmt.Errorf("Consul leader status: %#v", leader) + r.Fatalf("Consul leader status: %#v", leader) } index, err = strconv.ParseInt(resp.Header.Get("X-Consul-Index"), 10, 64) if err != nil { - return false, errors.Wrap(err, "bad consul index") + r.Fatal("bad consul index", err) } if index == 0 { - return false, fmt.Errorf("consul index is 0") + r.Fatal("consul index is 0") } // Watch for the anti-entropy sync to finish. - var parsed []map[string]interface{} + var v []map[string]interface{} dec := json.NewDecoder(resp.Body) - if err := dec.Decode(&parsed); err != nil { - return false, err + if err := dec.Decode(&v); err != nil { + r.Fatal(err) } - if len(parsed) < 1 { - return false, fmt.Errorf("No nodes") + if len(v) < 1 { + r.Fatal("No nodes") } - taggedAddresses, ok := parsed[0]["TaggedAddresses"].(map[string]interface{}) + taggedAddresses, ok := v[0]["TaggedAddresses"].(map[string]interface{}) if !ok { - return false, fmt.Errorf("Missing tagged addresses") + r.Fatal("Missing tagged addresses") } if _, ok := taggedAddresses["lan"]; !ok { - return false, fmt.Errorf("No lan tagged addresses") + r.Fatal("No lan tagged addresses") } - return true, nil - }); err != nil { - return errors.Wrap(err, "failed waiting for leader") + }) + if f.failed { + return errors.New("failed waiting for leader") } return nil } diff --git a/vendor/github.com/hashicorp/consul/testutil/server_methods.go b/vendor/github.com/hashicorp/consul/testutil/server_methods.go index 3c74e140cb43..8f4b067ad617 100644 --- a/vendor/github.com/hashicorp/consul/testutil/server_methods.go +++ b/vendor/github.com/hashicorp/consul/testutil/server_methods.go @@ -11,10 +11,18 @@ import ( "net/http" "testing" - "github.com/hashicorp/consul/consul/structs" "github.com/pkg/errors" ) +// copied from testutil to break circular dependency +const ( + HealthAny = "any" + HealthPassing = "passing" + HealthWarning = "warning" + HealthCritical = "critical" + HealthMaint = "maintenance" +) + // JoinLAN is used to join local datacenters together. func (s *TestServer) JoinLAN(t *testing.T, addr string) { resp := s.get(t, "/v1/agent/join/"+addr) @@ -101,9 +109,20 @@ func (s *TestServer) ListKV(t *testing.T, prefix string) []string { // automatically adds a health check with the given status, which // can be one of "passing", "warning", or "critical". func (s *TestServer) AddService(t *testing.T, name, status string, tags []string) { + s.AddAddressableService(t, name, status, "", 0, tags) // set empty address and 0 as port for non-accessible service +} + +// AddAddressableService adds a new service to the Consul instance by +// passing "address" and "port". It is helpful when you need to prepare a fakeService +// that maybe accessed with in target source code. +// It also automatically adds a health check with the given status, which +// can be one of "passing", "warning", or "critical", just like `AddService` does. +func (s *TestServer) AddAddressableService(t *testing.T, name, status, address string, port int, tags []string) { svc := &TestService{ - Name: name, - Tags: tags, + Name: name, + Tags: tags, + Address: address, + Port: port, } payload, err := s.encodePayload(svc) if err != nil { @@ -124,11 +143,11 @@ func (s *TestServer) AddService(t *testing.T, name, status string, tags []string s.put(t, "/v1/agent/check/register", payload) switch status { - case structs.HealthPassing: + case HealthPassing: s.put(t, "/v1/agent/check/pass/"+chkName, nil) - case structs.HealthWarning: + case HealthWarning: s.put(t, "/v1/agent/check/warn/"+chkName, nil) - case structs.HealthCritical: + case HealthCritical: s.put(t, "/v1/agent/check/fail/"+chkName, nil) default: t.Fatalf("Unrecognized status: %s", status) @@ -155,11 +174,11 @@ func (s *TestServer) AddCheck(t *testing.T, name, serviceID, status string) { s.put(t, "/v1/agent/check/register", payload) switch status { - case structs.HealthPassing: + case HealthPassing: s.put(t, "/v1/agent/check/pass/"+name, nil) - case structs.HealthWarning: + case HealthWarning: s.put(t, "/v1/agent/check/warn/"+name, nil) - case structs.HealthCritical: + case HealthCritical: s.put(t, "/v1/agent/check/fail/"+name, nil) default: t.Fatalf("Unrecognized status: %s", status) @@ -172,7 +191,7 @@ func (s *TestServer) put(t *testing.T, path string, body io.Reader) *http.Respon if err != nil { t.Fatalf("failed to create PUT request: %s", err) } - resp, err := s.HttpClient.Do(req) + resp, err := s.HTTPClient.Do(req) if err != nil { t.Fatalf("failed to make PUT request: %s", err) } @@ -185,7 +204,7 @@ func (s *TestServer) put(t *testing.T, path string, body io.Reader) *http.Respon // get performs a new HTTP GET request. func (s *TestServer) get(t *testing.T, path string) *http.Response { - resp, err := s.HttpClient.Get(s.url(path)) + resp, err := s.HTTPClient.Get(s.url(path)) if err != nil { t.Fatalf("failed to create GET request: %s", err) } diff --git a/vendor/github.com/hashicorp/consul/testutil/server_wrapper.go b/vendor/github.com/hashicorp/consul/testutil/server_wrapper.go index 0402f17a226d..17615da8d183 100644 --- a/vendor/github.com/hashicorp/consul/testutil/server_wrapper.go +++ b/vendor/github.com/hashicorp/consul/testutil/server_wrapper.go @@ -1,8 +1,6 @@ package testutil -import ( - "testing" -) +import "testing" type WrappedServer struct { s *TestServer @@ -19,78 +17,49 @@ type WrappedServer struct { // This is useful when you are calling multiple functions and save the wrapped // value as another variable to reduce the inclusion of "t". func (s *TestServer) Wrap(t *testing.T) *WrappedServer { - return &WrappedServer{ - s: s, - t: t, - } + return &WrappedServer{s, t} } -// See Also -// -// TestServer.JoinLAN() func (w *WrappedServer) JoinLAN(addr string) { w.s.JoinLAN(w.t, addr) } -// See Also -// -// TestServer.JoinWAN() func (w *WrappedServer) JoinWAN(addr string) { w.s.JoinWAN(w.t, addr) } -// See Also -// -// TestServer.SetKV() func (w *WrappedServer) SetKV(key string, val []byte) { w.s.SetKV(w.t, key, val) } -// See Also -// -// TestServer.SetKVString() func (w *WrappedServer) SetKVString(key string, val string) { w.s.SetKVString(w.t, key, val) } -// See Also -// -// TestServer.GetKV() func (w *WrappedServer) GetKV(key string) []byte { return w.s.GetKV(w.t, key) } -// See Also -// -// TestServer.GetKVString() func (w *WrappedServer) GetKVString(key string) string { return w.s.GetKVString(w.t, key) } -// See Also -// -// TestServer.PopulateKV() func (w *WrappedServer) PopulateKV(data map[string][]byte) { w.s.PopulateKV(w.t, data) } -// See Also -// -// TestServer.ListKV() func (w *WrappedServer) ListKV(prefix string) []string { return w.s.ListKV(w.t, prefix) } -// See Also -// -// TestServer.AddService() func (w *WrappedServer) AddService(name, status string, tags []string) { w.s.AddService(w.t, name, status, tags) } -// See Also -// -// TestServer.AddCheck() +func (w *WrappedServer) AddAddressableService(name, status, address string, port int, tags []string) { + w.s.AddAddressableService(w.t, name, status, address, port, tags) +} + func (w *WrappedServer) AddCheck(name, serviceID, status string) { w.s.AddCheck(w.t, name, serviceID, status) } diff --git a/vendor/github.com/hashicorp/consul/testutil/wait.go b/vendor/github.com/hashicorp/consul/testutil/wait.go deleted file mode 100644 index 9854f15cd16c..000000000000 --- a/vendor/github.com/hashicorp/consul/testutil/wait.go +++ /dev/null @@ -1,66 +0,0 @@ -package testutil - -import ( - "fmt" - "testing" - "time" - - "github.com/hashicorp/consul/consul/structs" - "github.com/pkg/errors" -) - -type testFn func() (bool, error) - -const ( - baseWait = 1 * time.Millisecond - maxWait = 100 * time.Millisecond -) - -func WaitForResult(try testFn) error { - var err error - wait := baseWait - for retries := 100; retries > 0; retries-- { - var success bool - success, err = try() - if success { - time.Sleep(25 * time.Millisecond) - return nil - } - - time.Sleep(wait) - wait *= 2 - if wait > maxWait { - wait = maxWait - } - } - if err != nil { - return errors.Wrap(err, "timed out with error") - } else { - return fmt.Errorf("timed out") - } -} - -type rpcFn func(string, interface{}, interface{}) error - -func WaitForLeader(t *testing.T, rpc rpcFn, dc string) structs.IndexedNodes { - var out structs.IndexedNodes - if err := WaitForResult(func() (bool, error) { - // Ensure we have a leader and a node registration. - args := &structs.DCSpecificRequest{ - Datacenter: dc, - } - if err := rpc("Catalog.ListNodes", args, &out); err != nil { - return false, fmt.Errorf("Catalog.ListNodes failed: %v", err) - } - if !out.QueryMeta.KnownLeader { - return false, fmt.Errorf("No leader") - } - if out.Index == 0 { - return false, fmt.Errorf("Consul index is 0") - } - return true, nil - }); err != nil { - t.Fatalf("failed to find leader: %v", err) - } - return out -} diff --git a/vendor/vendor.json b/vendor/vendor.json index 9cafd3045c10..0e2186a0cc70 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -653,14 +653,14 @@ { "checksumSHA1": "jfELEMRhiTcppZmRH+ZwtkVS5Uw=", "path": "github.com/hashicorp/consul/acl", - "revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a", - "revisionTime": "2017-04-17T18:01:43Z" + "revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4", + "revisionTime": "2017-08-10T00:46:41Z" }, { - "checksumSHA1": "RmhTKLvlDtxNPKZFnPYnfG/HzrI=", + "checksumSHA1": "AgdPCR9/+M0kkVpaeZ0PnATxfSc=", "path": "github.com/hashicorp/consul/api", - "revision": "eea8f4ce75e8e6ff97c9913d89f687e8f8489ce6", - "revisionTime": "2017-05-30T15:52:51Z" + "revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4", + "revisionTime": "2017-08-10T00:46:41Z" }, { "checksumSHA1": "Z1N3jX/5B7GbLNfNp5GTxrsJItc=", @@ -669,22 +669,28 @@ "revisionTime": "2017-04-17T18:01:43Z" }, { - "checksumSHA1": "XTA8JEhsuJGTUTchjM++oEG7B14=", + "checksumSHA1": "X3kV+a3rz+kw6SJupDVjHmUEUCQ=", "path": "github.com/hashicorp/consul/lib", - "revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a", - "revisionTime": "2017-04-17T18:01:43Z" + "revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4", + "revisionTime": "2017-08-10T00:46:41Z" }, { - "checksumSHA1": "yG7c7ZInBE36kw8IPvYb0smctRw=", + "checksumSHA1": "++0PVBxbpylmllyCxSa7cdc6dDc=", "path": "github.com/hashicorp/consul/testutil", - "revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a", - "revisionTime": "2017-04-17T18:01:43Z" + "revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4", + "revisionTime": "2017-08-10T00:46:41Z" + }, + { + "checksumSHA1": "J8TTDc84MvAyXE/FrfgS+xc/b6s=", + "path": "github.com/hashicorp/consul/testutil/retry", + "revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4", + "revisionTime": "2017-08-10T00:46:41Z" }, { "checksumSHA1": "bYK/7DsyTM3YDjvc0RRUH4I+jic=", "path": "github.com/hashicorp/consul/types", - "revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a", - "revisionTime": "2017-04-17T18:01:43Z" + "revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4", + "revisionTime": "2017-08-10T00:46:41Z" }, { "path": "github.com/hashicorp/errwrap", diff --git a/website/source/api/json-jobs.html.md b/website/source/api/json-jobs.html.md index b730170d69fd..3409baf5bb7b 100644 --- a/website/source/api/json-jobs.html.md +++ b/website/source/api/json-jobs.html.md @@ -58,6 +58,8 @@ Below is the JSON representation of the job outputed by `$ nomad init`: "Type": "tcp", "Command": "", "Args": null, + "Header": {}, + "Method": "", "Path": "", "Protocol": "", "PortLabel": "", @@ -344,12 +346,18 @@ The `Task` object supports the following keys: - `Name`: The name of the health check. + - `Header`: Headers for HTTP checks. Should be an object where the + values are an array of values. Headers will be written once for each + value. + - `Interval`: This indicates the frequency of the health checks that Consul will perform. - `Timeout`: This indicates how long Consul will wait for a health check query to succeed. + - `Method`: The HTTP method to use for HTTP checks. Defaults to GET. + - `Path`: The path of the HTTP endpoint which Consul will query to query the health of a service if the type of the check is `http`. Nomad will add the IP of the service and the port, users are only required diff --git a/website/source/docs/job-specification/service.html.md b/website/source/docs/job-specification/service.html.md index 5a75c80b7ead..cef747b0dd65 100644 --- a/website/source/docs/job-specification/service.html.md +++ b/website/source/docs/job-specification/service.html.md @@ -130,6 +130,9 @@ scripts. that Consul will perform. This is specified using a label suffix like "30s" or "1h". This must be greater than or equal to "1s" +- `method` `(string: "GET")` - Specifies the HTTP method to use for HTTP + checks. + - `name` `(string: "service: check")` - Specifies the name of the health check. @@ -159,6 +162,27 @@ scripts. - `tls_skip_verify` `(bool: false)` - Skip verifying TLS certificates for HTTPS checks. Requires Consul >= 0.7.2. +#### `header` Stanza + +HTTP checks may include a `header` stanza to set HTTP headers. The `header` +stanza parameters have lists of strings as values. Multiple values will cause +the header to be set multiple times, once for each value. + +```hcl +service { + check { + type = "http" + port = "lb" + path = "/_healthz" + interval = "5s" + timeout = "2s" + header { + Authorization = ["Basic ZWxhc3RpYzpjaGFuZ2VtZQ=="] + } + } +} +``` + ## `service` Examples @@ -232,8 +256,8 @@ argument provided as a value to the `args` array. This example shows a service with an HTTP health check. This will query the service on the IP and port registered with Nomad at `/_healthz` every 5 seconds, -giving the service a maximum of 2 seconds to return a response. Any non-2xx code -is considered a failure. +giving the service a maximum of 2 seconds to return a response, and include an +Authorization header. Any non-2xx code is considered a failure. ```hcl service { @@ -243,6 +267,9 @@ service { path = "/_healthz" interval = "5s" timeout = "2s" + header { + Authorization = ["Basic ZWxhc3RpYzpjaGFuZ2VtZQ=="] + } } } ``` @@ -269,6 +296,7 @@ service { path = "/_healthz" interval = "5s" timeout = "2s" + method = "POST" } check {