Skip to content

Commit

Permalink
consul/connect: interpolate connect block
Browse files Browse the repository at this point in the history
This PR enables job submitters to use interpolation in the connect
block of jobs making use of consul connect. Before, only the name of
the connect service would be interpolated, and only for a few select
identifiers related to the job itself (#6853). Now, all connect fields
can be interpolated using the full spectrum of runtime parameters.

Note that the service name is interpolated at job-submission time,
and cannot make use of values known only at runtime.

Fixes #7221
  • Loading branch information
shoenig committed Dec 9, 2020
1 parent 399d49b commit ec8b42a
Show file tree
Hide file tree
Showing 5 changed files with 552 additions and 68 deletions.
15 changes: 12 additions & 3 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package allocrunner

import (
"fmt"
"sync"
"time"

Expand All @@ -19,7 +20,7 @@ type networkStatusGetter interface {
// groupServiceHook manages task group Consul service registration and
// deregistration.
type groupServiceHook struct {
allocID string
alloc *structs.Allocation
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
Expand Down Expand Up @@ -60,7 +61,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
}

h := &groupServiceHook{
allocID: cfg.alloc.ID,
alloc: cfg.alloc, // YOU ARE HERE, pass in the alloc?
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
Expand All @@ -87,6 +88,8 @@ func (*groupServiceHook) Name() string {
}

func (h *groupServiceHook) Prerun() error {
fmt.Println("gsh.Prerun alloc:", h.alloc.ID)

h.mu.Lock()
defer func() {
// Mark prerun as true to unblock Updates
Expand All @@ -95,16 +98,21 @@ func (h *groupServiceHook) Prerun() error {
}()

if len(h.services) == 0 {
fmt.Println(" zero services")
return nil
}

// let the thing do its thing
h.taskEnvBuilder.UpdateTask(h.alloc, nil) // if only

services := h.getWorkloadServices()
return h.consulClient.RegisterWorkload(services)
}

func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
fmt.Println("gsh.Update alloc:", h.alloc.ID)

oldWorkloadServices := h.getWorkloadServices()

Expand All @@ -131,6 +139,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.services = tg.Services
h.canary = canary
h.delay = shutdown

h.taskEnvBuilder.UpdateTask(req.Alloc, nil)

// Create new task services struct with those new values
Expand Down Expand Up @@ -200,7 +209,7 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {

// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
AllocID: h.alloc.ID,
Group: h.group,
Restarter: h.restarter,
Services: interpolatedServices,
Expand Down
184 changes: 158 additions & 26 deletions client/taskenv/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc
interpolated := make([]*structs.Service, len(services))

for i, origService := range services {
// Create a copy as we need to reinterpolate every time the
// environment changes
// Create a copy as we need to re-interpolate every time the
// environment changes.
service := origService.Copy()

for _, check := range service.Checks {
Expand All @@ -31,42 +31,174 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
newVals := make([]string, len(vs))
for i, v := range vs {
newVals[i] = taskEnv.ReplaceEnv(v)
}
header[taskEnv.ReplaceEnv(k)] = newVals
}
check.Header = header
}
check.Header = interpolateMapStringSliceString(taskEnv, check.Header)
}

service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
service.Meta = interpolateMapStringString(taskEnv, service.Meta)
service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta)
service.Connect = interpolateConnect(taskEnv, service.Connect)

if len(service.Meta) > 0 {
meta := make(map[string]string, len(service.Meta))
for k, v := range service.Meta {
meta[k] = taskEnv.ReplaceEnv(v)
}
service.Meta = meta
interpolated[i] = service
}

return interpolated
}

func interpolateMapStringSliceString(taskEnv *TaskEnv, orig map[string][]string) map[string][]string {
if len(orig) == 0 {
return nil
}

m := make(map[string][]string, len(orig))
for k, vs := range orig {
m[taskEnv.ReplaceEnv(k)] = taskEnv.ParseAndReplace(vs)
}
return m
}

func interpolateMapStringString(taskEnv *TaskEnv, orig map[string]string) map[string]string {
if len(orig) == 0 {
return nil
}

m := make(map[string]string, len(orig))
for k, v := range orig {
m[taskEnv.ReplaceEnv(k)] = taskEnv.ReplaceEnv(v)
}
return m
}

func interpolateMapStringInterface(taskEnv *TaskEnv, orig map[string]interface{}) map[string]interface{} {
if len(orig) == 0 {
return nil
}

m := make(map[string]interface{}, len(orig))
for k, v := range orig {
m[taskEnv.ReplaceEnv(k)] = v
}
return m
}

func interpolateConnect(taskEnv *TaskEnv, orig *structs.ConsulConnect) *structs.ConsulConnect {
if orig == nil {
return nil
}

// make one copy and interpolate in-place on that
modified := orig.Copy()
interpolateConnectSidecarService(taskEnv, modified.SidecarService)
interpolateConnectSidecarTask(taskEnv, modified.SidecarTask)
if modified.Gateway != nil {
interpolateConnectGatewayProxy(taskEnv, modified.Gateway.Proxy)
interpolateConnectGatewayIngress(taskEnv, modified.Gateway.Ingress)
}
return modified
}

func interpolateConnectGatewayProxy(taskEnv *TaskEnv, proxy *structs.ConsulGatewayProxy) {
if proxy == nil {
return
}

m := make(map[string]*structs.ConsulGatewayBindAddress, len(proxy.EnvoyGatewayBindAddresses))
for k, v := range proxy.EnvoyGatewayBindAddresses {
m[taskEnv.ReplaceEnv(k)] = &structs.ConsulGatewayBindAddress{
Address: taskEnv.ReplaceEnv(v.Address),
Port: v.Port,
}
}

proxy.EnvoyGatewayBindAddresses = m
proxy.Config = interpolateMapStringInterface(taskEnv, proxy.Config)
}

func interpolateConnectGatewayIngress(taskEnv *TaskEnv, ingress *structs.ConsulIngressConfigEntry) {
if ingress == nil {
return
}

for _, listener := range ingress.Listeners {
listener.Protocol = taskEnv.ReplaceEnv(listener.Protocol)
for _, service := range listener.Services {
service.Name = taskEnv.ReplaceEnv(service.Name)
service.Hosts = taskEnv.ParseAndReplace(service.Hosts)
}
}
}

func interpolateConnectSidecarService(taskEnv *TaskEnv, sidecar *structs.ConsulSidecarService) {
if sidecar == nil {
return
}

if len(service.CanaryMeta) > 0 {
canaryMeta := make(map[string]string, len(service.CanaryMeta))
for k, v := range service.CanaryMeta {
canaryMeta[k] = taskEnv.ReplaceEnv(v)
sidecar.Port = taskEnv.ReplaceEnv(sidecar.Port)
sidecar.Tags = taskEnv.ParseAndReplace(sidecar.Tags)
if sidecar.Proxy != nil {
sidecar.Proxy.LocalServiceAddress = taskEnv.ReplaceEnv(sidecar.Proxy.LocalServiceAddress)
if sidecar.Proxy.Expose != nil {
for i := 0; i < len(sidecar.Proxy.Expose.Paths); i++ {
sidecar.Proxy.Expose.Paths[i].Protocol = taskEnv.ReplaceEnv(sidecar.Proxy.Expose.Paths[i].Protocol)
sidecar.Proxy.Expose.Paths[i].ListenerPort = taskEnv.ReplaceEnv(sidecar.Proxy.Expose.Paths[i].ListenerPort)
sidecar.Proxy.Expose.Paths[i].Path = taskEnv.ReplaceEnv(sidecar.Proxy.Expose.Paths[i].Path)
}
service.CanaryMeta = canaryMeta
}
for i := 0; i < len(sidecar.Proxy.Upstreams); i++ {
sidecar.Proxy.Upstreams[i].Datacenter = taskEnv.ReplaceEnv(sidecar.Proxy.Upstreams[i].Datacenter)
sidecar.Proxy.Upstreams[i].DestinationName = taskEnv.ReplaceEnv(sidecar.Proxy.Upstreams[i].DestinationName)
}
sidecar.Proxy.Config = interpolateMapStringInterface(taskEnv, sidecar.Proxy.Config)
}
}

interpolated[i] = service
func interpolateConnectSidecarTask(taskEnv *TaskEnv, task *structs.SidecarTask) {
if task == nil {
return
}

return interpolated
task.Driver = taskEnv.ReplaceEnv(task.Driver)
task.Config = interpolateMapStringInterface(taskEnv, task.Config)
task.Env = interpolateMapStringString(taskEnv, task.Env)
task.KillSignal = taskEnv.ReplaceEnv(task.KillSignal)
task.Meta = interpolateMapStringString(taskEnv, task.Meta)
interpolateTaskResources(taskEnv, task.Resources)
task.User = taskEnv.ReplaceEnv(task.User)
}

func interpolateTaskResources(taskEnv *TaskEnv, resources *structs.Resources) {
if resources == nil {
return
}

for i := 0; i < len(resources.Devices); i++ {
resources.Devices[i].Name = taskEnv.ReplaceEnv(resources.Devices[i].Name)
// do not interpolate constraints & affinities
}

for i := 0; i < len(resources.Networks); i++ {
resources.Networks[i].CIDR = taskEnv.ReplaceEnv(resources.Networks[i].CIDR)
resources.Networks[i].Device = taskEnv.ReplaceEnv(resources.Networks[i].Device)
resources.Networks[i].IP = taskEnv.ReplaceEnv(resources.Networks[i].IP)
resources.Networks[i].Mode = taskEnv.ReplaceEnv(resources.Networks[i].Mode)

if resources.Networks[i].DNS != nil {
resources.Networks[i].DNS.Options = taskEnv.ParseAndReplace(resources.Networks[i].DNS.Options)
resources.Networks[i].DNS.Searches = taskEnv.ParseAndReplace(resources.Networks[i].DNS.Searches)
resources.Networks[i].DNS.Servers = taskEnv.ParseAndReplace(resources.Networks[i].DNS.Servers)
}

for p := 0; p < len(resources.Networks[i].DynamicPorts); p++ {
resources.Networks[i].DynamicPorts[p].HostNetwork = taskEnv.ReplaceEnv(resources.Networks[i].DynamicPorts[p].HostNetwork)
resources.Networks[i].DynamicPorts[p].Label = taskEnv.ReplaceEnv(resources.Networks[i].DynamicPorts[p].Label)
}

for p := 0; p < len(resources.Networks[i].ReservedPorts); p++ {
resources.Networks[i].ReservedPorts[p].HostNetwork = taskEnv.ReplaceEnv(resources.Networks[i].ReservedPorts[p].HostNetwork)
resources.Networks[i].ReservedPorts[p].Label = taskEnv.ReplaceEnv(resources.Networks[i].ReservedPorts[p].Label)
}
}
}
Loading

0 comments on commit ec8b42a

Please sign in to comment.