Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor consul service monitor #564

Merged
merged 4 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type Consul struct {
CheckTLSSkipVerify bool
CheckDeregisterCriticalServiceAfter string
ChecksRequired string
ServiceMonitors int
}

type Tracing struct {
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var defaultConfig = &Config{
ServiceAddr: ":9998",
ServiceName: "fabio",
ServiceStatus: []string{"passing"},
ServiceMonitors: 1,
CheckInterval: time.Second,
CheckTimeout: 3 * time.Second,
CheckScheme: "http",
Expand Down
5 changes: 5 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.BoolVar(&cfg.Registry.Consul.CheckTLSSkipVerify, "registry.consul.register.checkTLSSkipVerify", defaultConfig.Registry.Consul.CheckTLSSkipVerify, "service check TLS verification")
f.StringVar(&cfg.Registry.Consul.CheckDeregisterCriticalServiceAfter, "registry.consul.register.checkDeregisterCriticalServiceAfter", defaultConfig.Registry.Consul.CheckDeregisterCriticalServiceAfter, "critical service deregistration timeout")
f.StringVar(&cfg.Registry.Consul.ChecksRequired, "registry.consul.checksRequired", defaultConfig.Registry.Consul.ChecksRequired, "number of checks which must pass: one or all")
f.IntVar(&cfg.Registry.Consul.ServiceMonitors, "registry.consul.serviceMonitors", defaultConfig.Registry.Consul.ServiceMonitors, "concurrency for route updates")
f.IntVar(&cfg.Runtime.GOGC, "runtime.gogc", defaultConfig.Runtime.GOGC, "sets runtime.GOGC")
f.IntVar(&cfg.Runtime.GOMAXPROCS, "runtime.gomaxprocs", defaultConfig.Runtime.GOMAXPROCS, "sets runtime.GOMAXPROCS")
f.StringVar(&cfg.UI.Access, "ui.access", defaultConfig.UI.Access, "access mode, one of [ro, rw]")
Expand Down Expand Up @@ -244,6 +245,10 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
cfg.Registry.Consul.CheckScheme = "https"
}

if cfg.Registry.Consul.ServiceMonitors <= 0 {
cfg.Registry.Consul.ServiceMonitors = 1
}

if gzipContentTypesValue != "" {
cfg.Proxy.GZIPContentTypes, err = regexp.Compile(gzipContentTypesValue)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,13 @@ func TestLoad(t *testing.T) {
return cfg
},
},
{
args: []string{"-registry.consul.serviceMonitors", "5"},
cfg: func(cfg *Config) *Config {
cfg.Registry.Consul.ServiceMonitors = 5
return cfg
},
},
{
args: []string{"-log.access.format", "foobar"},
cfg: func(cfg *Config) *Config {
Expand Down
12 changes: 12 additions & 0 deletions docs/content/ref/registry.consul.serviceMonitors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
title: "registry.consul.serviceMonitors"
---

`registry.consul.serviceMonitors` configures the concurrency for
route updates. Fabio will make up to the configured number of
concurrent calls to Consul to fetch status data for route
updates.

The default is

registry.consul.serviceMonitors = 1
10 changes: 10 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,16 @@
# registry.consul.checksRequired = one


# registry.consul.serviceMonitors configures the concurrency for
# route updates. Fabio will make up to the configured number of
# concurrent calls to Consul to fetch status data for route
# updates.
#
# The default is
#
# registry.consul.serviceMonitors = 1


# glob.matching.disabled disables glob matching on route lookups
# If glob matching is enabled there is a performance decrease
# for every route lookup. At a large number of services (> 500) this
Expand Down
3 changes: 2 additions & 1 deletion registry/consul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func (b *be) WatchServices() chan string {
log.Printf("[INFO] consul: Using dynamic routes")
log.Printf("[INFO] consul: Using tag prefix %q", b.cfg.TagPrefix)

m := NewServiceMonitor(b.c, b.cfg, b.dc)
svc := make(chan string)
go watchServices(b.c, b.cfg, svc)
go m.Watch(svc)
return svc
}

Expand Down
49 changes: 0 additions & 49 deletions registry/consul/parse.go

This file was deleted.

140 changes: 140 additions & 0 deletions registry/consul/routecmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package consul

import (
"fmt"
"log"
"net"
"os"
"runtime"
"strconv"
"strings"

"github.com/hashicorp/consul/api"
)

// routecmd builds a route command.
type routecmd struct {
// svc is the consul service instance.
svc *api.CatalogService

// prefix is the prefix of urlprefix tags. e.g. 'urlprefix-'.
prefix string

env map[string]string
}

func (r routecmd) build() []string {
var svctags, routetags []string
for _, t := range r.svc.ServiceTags {
if strings.HasPrefix(t, r.prefix) {
routetags = append(routetags, t)
} else {
svctags = append(svctags, t)
}
}

// generate route commands
var config []string
for _, tag := range routetags {
if route, opts, ok := parseURLPrefixTag(tag, r.prefix, r.env); ok {
name, addr, port := r.svc.ServiceName, r.svc.ServiceAddress, r.svc.ServicePort

// use consul node address if service address is not set
if addr == "" {
addr = r.svc.Address
}

// add .local suffix on OSX for simple host names w/o domain
if runtime.GOOS == "darwin" && !strings.Contains(addr, ".") && !strings.HasSuffix(addr, ".local") {
addr += ".local"
}

addr = net.JoinHostPort(addr, strconv.Itoa(port))
//tags := strings.Join(r.tags, ",")
dst := "http://" + addr + "/"

var weight string
var ropts []string
for _, o := range strings.Fields(opts) {
switch {
case o == "proto=tcp":
dst = "tcp://" + addr

case o == "proto=https":
dst = "https://" + addr

case strings.HasPrefix(o, "weight="):
weight = o[len("weight="):]

case strings.HasPrefix(o, "redirect="):
redir := strings.Split(o[len("redirect="):], ",")
if len(redir) == 2 {
dst = redir[1]
ropts = append(ropts, fmt.Sprintf("redirect=%s", redir[0]))
} else {
log.Printf("[ERROR] Invalid syntax for redirect: %s. should be redirect=<code>,<url>", o)
continue
}
default:
ropts = append(ropts, o)
}
}

cfg := "route add " + name + " " + route + " " + dst
if weight != "" {
cfg += " weight " + weight
}
if len(svctags) > 0 {
cfg += " tags " + strconv.Quote(strings.Join(svctags, ","))
}
if len(ropts) > 0 {
cfg += " opts " + strconv.Quote(strings.Join(ropts, " "))
}

config = append(config, cfg)
}
}
return config
}

// parseURLPrefixTag expects an input in the form of 'tag-host/path[ opts]'
// and returns the lower cased host and the unaltered path if the
// prefix matches the tag.
func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts string, ok bool) {
// expand $x or ${x} to env[x] or ""
expand := func(s string) string {
return os.Expand(s, func(x string) string {
if env == nil {
return ""
}
return env[x]
})
}

s = strings.TrimSpace(s)
if !strings.HasPrefix(s, prefix) {
return "", "", false
}
s = strings.TrimSpace(s[len(prefix):])

p := strings.SplitN(s, " ", 2)
if len(p) == 2 {
opts = p[1]
}
s = p[0]

// prefix is ":port"
if strings.HasPrefix(s, ":") {
return s, opts, true
}

// prefix is "host/path"
p = strings.SplitN(s, "/", 2)
if len(p) == 1 {
log.Printf("[WARN] consul: Invalid %s tag %q - You need to have a trailing slash!", prefix, s)
return "", "", false
}
host, path := p[0], p[1]

return strings.ToLower(expand(host)) + "/" + expand(path), opts, true
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,58 @@
package consul

import "testing"
import (
"reflect"
"testing"

"github.com/hashicorp/consul/api"
)

func TestRouteCmd(t *testing.T) {
cases := []struct {
name string
r routecmd
cfg []string
}{
{
name: "http",
r: routecmd{
prefix: "p-",
svc: &api.CatalogService{
ServiceName: "svc-1",
ServiceAddress: "1.1.1.1",
ServicePort: 2222,
ServiceTags: []string{`p-foo/bar`},
},
},
cfg: []string{
`route add svc-1 foo/bar http://1.1.1.1:2222/`,
},
},
{
name: "tcp",
r: routecmd{
prefix: "p-",
svc: &api.CatalogService{
ServiceName: "svc-1",
ServiceAddress: "1.1.1.1",
ServicePort: 2222,
ServiceTags: []string{`p-:1234 proto=tcp`},
},
},
cfg: []string{
`route add svc-1 :1234 tcp://1.1.1.1:2222`,
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if got, want := c.r.build(), c.cfg; !reflect.DeepEqual(got, want) {
t.Fatalf("\ngot %#v\nwant %#v", got, want)
}
})
}
}

func TestParseTag(t *testing.T) {
prefix := "p-"
Expand Down
Loading