Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Connect Sidecar Checks #10524

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ func (cc *ConsulConnect) Canonicalize() {
// ConsulSidecarService represents a Consul Connect SidecarService jobspec
// stanza.
type ConsulSidecarService struct {
Tags []string `hcl:"tags,optional"`
Port string `hcl:"port,optional"`
Proxy *ConsulProxy `hcl:"proxy,block"`
Tags []string `hcl:"tags,optional"`
Port string `hcl:"port,optional"`
Proxy *ConsulProxy `hcl:"proxy,block"`
Checks []ServiceCheck `hcl:"check,block"`
}

func (css *ConsulSidecarService) Canonicalize() {
Expand Down
29 changes: 22 additions & 7 deletions command/agent/consul/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,9 @@ func connectSidecarRegistration(serviceId string, css *structs.ConsulSidecarServ
return nil, err
}

return &api.AgentServiceRegistration{
Tags: helper.CopySliceString(css.Tags),
Port: cMapping.Value,
Address: cMapping.HostIP,
Proxy: proxy,
Checks: api.AgentServiceChecks{
var checks []*api.AgentServiceCheck
if len(css.Checks) == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, there is no way to actually disable sidecar checks. The job author must specify a check or we insert the default TCP ones. This seems a bit reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the only option if registering with Consul directly; this approach is just following suite

checks = api.AgentServiceChecks{
{
Name: "Connect Sidecar Listening",
TCP: net.JoinHostPort(cMapping.HostIP, strconv.Itoa(cMapping.Value)),
Expand All @@ -120,7 +117,25 @@ func connectSidecarRegistration(serviceId string, css *structs.ConsulSidecarServ
Name: "Connect Sidecar Aliasing " + serviceId,
AliasService: serviceId,
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear what the value of this Aliasing service id - do we need to insert it even when checks are overriden?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alias basically means "if you request health for service X, send the request to me instead" - which is useful for connect, where requests must go through the sidecar proxy. I think yes, this should always be there

},
}
} else {
checks = make([]*api.AgentServiceCheck, len(css.Checks))
for i, c := range css.Checks {
check, err := createCheckReg(serviceId, "", c, cMapping.HostIP, cMapping.Value, "")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm doing this correctly, and what host/port should be used. When I tried, I can successfully set a custom TCP check; but cannot seem to get an HTTP to pass. The default envoyproxy seems to reject HTTP connection?!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of imagined being able to create checks that somehow made use of NOMAD_ENVOY_ADMIN_ADDR_<service>, where envoy provides some default paths one might query https://www.envoyproxy.io/docs/envoy/latest/operations/admin to represent the envoy's health.

The only way that will work is if the Consul agent is able to make a connection to that ip:port, which is going to be 127.0.0.1:1900x inside the network namespace - which means Nomad would need to inject a new dynamic port mapping to that envoy admin port.

if err != nil {
return nil, fmt.Errorf("failed to register check %v: %w", c.Name, err)
}

checks[i] = &check.AgentServiceCheck
}
}

return &api.AgentServiceRegistration{
Tags: helper.CopySliceString(css.Tags),
Port: cMapping.Value,
Address: cMapping.HostIP,
Proxy: proxy,
Checks: checks,
}, nil
}

Expand Down
93 changes: 50 additions & 43 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,51 +1291,57 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service {
Meta: helper.CopyMapStringString(s.Meta),
CanaryMeta: helper.CopyMapStringString(s.CanaryMeta),
OnUpdate: s.OnUpdate,
Checks: apiServiceChecksToStructs(s.Checks, s.OnUpdate), // Inherit OnUpdate from service by default
Connect: ApiConsulConnectToStructs(s.Connect),
}

if l := len(s.Checks); l != 0 {
out[i].Checks = make([]*structs.ServiceCheck, l)
for j, check := range s.Checks {
onUpdate := s.OnUpdate // Inherit from service as default
if check.OnUpdate != "" {
onUpdate = check.OnUpdate
}
out[i].Checks[j] = &structs.ServiceCheck{
Name: check.Name,
Type: check.Type,
Command: check.Command,
Args: check.Args,
Path: check.Path,
Protocol: check.Protocol,
PortLabel: check.PortLabel,
Expose: check.Expose,
AddressMode: check.AddressMode,
Interval: check.Interval,
Timeout: check.Timeout,
InitialStatus: check.InitialStatus,
TLSSkipVerify: check.TLSSkipVerify,
Header: check.Header,
Method: check.Method,
Body: check.Body,
GRPCService: check.GRPCService,
GRPCUseTLS: check.GRPCUseTLS,
TaskName: check.TaskName,
OnUpdate: onUpdate,
}
if check.CheckRestart != nil {
out[i].Checks[j].CheckRestart = &structs.CheckRestart{
Limit: check.CheckRestart.Limit,
Grace: *check.CheckRestart.Grace,
IgnoreWarnings: check.CheckRestart.IgnoreWarnings,
}
}
}
}
}

if s.Connect != nil {
out[i].Connect = ApiConsulConnectToStructs(s.Connect)
return out
}

func apiServiceChecksToStructs(in []api.ServiceCheck, defaultOnUpdate string) []*structs.ServiceCheck {
if len(in) == 0 {
return nil
}

out := make([]*structs.ServiceCheck, len(in))
for i, inc := range in {
onUpdate := defaultOnUpdate
if inc.OnUpdate != "" {
onUpdate = inc.OnUpdate
}
check := &structs.ServiceCheck{
Name: inc.Name,
Type: inc.Type,
Command: inc.Command,
Args: inc.Args,
Path: inc.Path,
Protocol: inc.Protocol,
PortLabel: inc.PortLabel,
Expose: inc.Expose,
AddressMode: inc.AddressMode,
Interval: inc.Interval,
Timeout: inc.Timeout,
InitialStatus: inc.InitialStatus,
TLSSkipVerify: inc.TLSSkipVerify,
Header: inc.Header,
Method: inc.Method,
Body: inc.Body,
GRPCService: inc.GRPCService,
GRPCUseTLS: inc.GRPCUseTLS,
TaskName: inc.TaskName,
OnUpdate: onUpdate,
}
if check.CheckRestart != nil {
check.CheckRestart = &structs.CheckRestart{
Limit: inc.CheckRestart.Limit,
Grace: *inc.CheckRestart.Grace,
IgnoreWarnings: inc.CheckRestart.IgnoreWarnings,
}
}

out[i] = check
}

return out
Expand Down Expand Up @@ -1499,9 +1505,10 @@ func apiConnectSidecarServiceToStructs(in *api.ConsulSidecarService) *structs.Co
return nil
}
return &structs.ConsulSidecarService{
Port: in.Port,
Tags: helper.CopySliceString(in.Tags),
Proxy: apiConnectSidecarServiceProxyToStructs(in.Proxy),
Port: in.Port,
Tags: helper.CopySliceString(in.Tags),
Proxy: apiConnectSidecarServiceProxyToStructs(in.Proxy),
Checks: apiServiceChecksToStructs(in.Checks, ""),
}
}

Expand Down
60 changes: 34 additions & 26 deletions jobspec/parse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ func parseService(o *ast.ObjectItem) (*api.Service, error) {
}

if co := listVal.Filter("check"); len(co.Items) > 0 {
if err := parseChecks(&service, co); err != nil {
checks, err := parseChecks(co)
if err != nil {
return nil, multierror.Prefix(err, fmt.Sprintf("'%s',", service.Name))
}
service.Checks = checks
}

// Filter check_restart
Expand Down Expand Up @@ -550,6 +552,7 @@ func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
"port",
"proxy",
"tags",
"check",
}

if err := checkHCLKeys(o.Val, valid); err != nil {
Expand All @@ -562,6 +565,7 @@ func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
return nil, err
}

delete(m, "check")
delete(m, "proxy")

dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Expand All @@ -576,27 +580,31 @@ func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
return nil, fmt.Errorf("sidecar_service: %v", err)
}

var proxyList *ast.ObjectList
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
proxyList = ot.List
listVal = ot.List
} else {
return nil, fmt.Errorf("sidecar_service: should be an object")
}

// Parse the proxy
po := proxyList.Filter("proxy")
if len(po.Items) == 0 {
return &sidecar, nil
}
if len(po.Items) > 1 {
if po := listVal.Filter("proxy"); len(po.Items) > 1 {
return nil, fmt.Errorf("only one 'proxy' block allowed per task")
} else if len(po.Items) == 1 {
r, err := parseProxy(po.Items[0])
if err != nil {
return nil, fmt.Errorf("proxy, %v", err)
}
sidecar.Proxy = r
}

r, err := parseProxy(po.Items[0])
if err != nil {
return nil, fmt.Errorf("proxy, %v", err)
if co := listVal.Filter("check"); len(co.Items) > 0 {
checks, err := parseChecks(co)
if err != nil {
return nil, multierror.Prefix(err, "'service_sidecar',")
}
sidecar.Checks = checks
}
sidecar.Proxy = r

return &sidecar, nil
}
Expand Down Expand Up @@ -833,8 +841,8 @@ func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) {
return &upstream, nil
}

func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error {
service.Checks = make([]api.ServiceCheck, len(checkObjs.Items))
func parseChecks(checkObjs *ast.ObjectList) ([]api.ServiceCheck, error) {
checks := make([]api.ServiceCheck, len(checkObjs.Items))
for idx, co := range checkObjs.Items {
// Check for invalid keys
valid := []string{
Expand All @@ -861,33 +869,33 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error {
"failures_before_critical",
}
if err := checkHCLKeys(co.Val, valid); err != nil {
return multierror.Prefix(err, "check ->")
return nil, multierror.Prefix(err, "check ->")
}

var check api.ServiceCheck
var cm map[string]interface{}
if err := hcl.DecodeObject(&cm, co.Val); err != nil {
return err
return nil, err
}

// HCL allows repeating stanzas so merge 'header' into a single
// map[string][]string.
if headerI, ok := cm["header"]; ok {
headerRaw, ok := headerI.([]map[string]interface{})
if !ok {
return fmt.Errorf("check -> header -> expected a []map[string][]string but found %T", headerI)
return nil, fmt.Errorf("check -> header -> expected a []map[string][]string but found %T", headerI)
}
m := map[string][]string{}
for _, rawm := range headerRaw {
for k, vI := range rawm {
vs, ok := vI.([]interface{})
if !ok {
return fmt.Errorf("check -> header -> %q expected a []string but found %T", k, vI)
return nil, fmt.Errorf("check -> header -> %q expected a []string but found %T", k, vI)
}
for _, vI := range vs {
v, ok := vI.(string)
if !ok {
return fmt.Errorf("check -> header -> %q expected a string but found %T", k, vI)
return nil, fmt.Errorf("check -> header -> %q expected a string but found %T", k, vI)
}
m[k] = append(m[k], v)
}
Expand All @@ -908,35 +916,35 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error {
Result: &check,
})
if err != nil {
return err
return nil, err
}
if err := dec.Decode(cm); err != nil {
return err
return nil, err
}

// Filter check_restart
var checkRestartList *ast.ObjectList
if ot, ok := co.Val.(*ast.ObjectType); ok {
checkRestartList = ot.List
} else {
return fmt.Errorf("check_restart '%s': should be an object", check.Name)
return nil, fmt.Errorf("check_restart '%s': should be an object", check.Name)
}

if cro := checkRestartList.Filter("check_restart"); len(cro.Items) > 0 {
if len(cro.Items) > 1 {
return fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", check.Name)
return nil, fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", check.Name)
}
cr, err := parseCheckRestart(cro.Items[0])
if err != nil {
return multierror.Prefix(err, fmt.Sprintf("check: '%s',", check.Name))
return nil, multierror.Prefix(err, fmt.Sprintf("check: '%s',", check.Name))
}
check.CheckRestart = cr
}

service.Checks[idx] = check
checks[idx] = check
}

return nil
return checks, nil
}

func parseCheckRestart(cro *ast.ObjectItem) (*api.CheckRestart, error) {
Expand Down
Loading