diff --git a/config/config.go b/config/config.go index 098985aa5..3dcc30067 100644 --- a/config/config.go +++ b/config/config.go @@ -146,6 +146,7 @@ type Consul struct { CheckTLSSkipVerify bool CheckDeregisterCriticalServiceAfter string ChecksRequired string + ServiceMonitors int } type Tracing struct { diff --git a/config/default.go b/config/default.go index dd1e9df6c..183f7e9d0 100644 --- a/config/default.go +++ b/config/default.go @@ -57,6 +57,7 @@ var defaultConfig = &Config{ ServiceAddr: ":9998", ServiceName: "fabio", ServiceStatus: []string{"passing"}, + ServiceMonitors: 1, CheckInterval: time.Second, CheckTimeout: 3 * time.Second, CheckScheme: "http", diff --git a/config/load.go b/config/load.go index ec92d81cc..c5a022c4b 100644 --- a/config/load.go +++ b/config/load.go @@ -179,6 +179,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c f.BoolVar(&cfg.Registry.Consul.CheckTLSSkipVerify, "registry.consul.register.checkTLSSkipVerify", defaultConfig.Registry.Consul.CheckTLSSkipVerify, "service check TLS verification") f.StringVar(&cfg.Registry.Consul.CheckDeregisterCriticalServiceAfter, "registry.consul.register.checkDeregisterCriticalServiceAfter", defaultConfig.Registry.Consul.CheckDeregisterCriticalServiceAfter, "critical service deregistration timeout") f.StringVar(&cfg.Registry.Consul.ChecksRequired, "registry.consul.checksRequired", defaultConfig.Registry.Consul.ChecksRequired, "number of checks which must pass: one or all") + f.IntVar(&cfg.Registry.Consul.ServiceMonitors, "registry.consul.serviceMonitors", defaultConfig.Registry.Consul.ServiceMonitors, "concurrency for route updates") f.IntVar(&cfg.Runtime.GOGC, "runtime.gogc", defaultConfig.Runtime.GOGC, "sets runtime.GOGC") f.IntVar(&cfg.Runtime.GOMAXPROCS, "runtime.gomaxprocs", defaultConfig.Runtime.GOMAXPROCS, "sets runtime.GOMAXPROCS") f.StringVar(&cfg.UI.Access, "ui.access", defaultConfig.UI.Access, "access mode, one of [ro, rw]") @@ -244,6 +245,10 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c cfg.Registry.Consul.CheckScheme = "https" } + if cfg.Registry.Consul.ServiceMonitors <= 0 { + cfg.Registry.Consul.ServiceMonitors = 1 + } + if gzipContentTypesValue != "" { cfg.Proxy.GZIPContentTypes, err = regexp.Compile(gzipContentTypesValue) if err != nil { diff --git a/config/load_test.go b/config/load_test.go index 29fb9682a..c8f938359 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -618,6 +618,13 @@ func TestLoad(t *testing.T) { return cfg }, }, + { + args: []string{"-registry.consul.serviceMonitors", "5"}, + cfg: func(cfg *Config) *Config { + cfg.Registry.Consul.ServiceMonitors = 5 + return cfg + }, + }, { args: []string{"-log.access.format", "foobar"}, cfg: func(cfg *Config) *Config { diff --git a/docs/content/ref/registry.consul.serviceMonitors.md b/docs/content/ref/registry.consul.serviceMonitors.md new file mode 100644 index 000000000..fcc0f5454 --- /dev/null +++ b/docs/content/ref/registry.consul.serviceMonitors.md @@ -0,0 +1,12 @@ +--- +title: "registry.consul.serviceMonitors" +--- + +`registry.consul.serviceMonitors` configures the concurrency for +route updates. Fabio will make up to the configured number of +concurrent calls to Consul to fetch status data for route +updates. + +The default is + + registry.consul.serviceMonitors = 1 diff --git a/fabio.properties b/fabio.properties index f57ef95a7..c86c14c47 100644 --- a/fabio.properties +++ b/fabio.properties @@ -750,6 +750,16 @@ # registry.consul.checksRequired = one +# registry.consul.serviceMonitors configures the concurrency for +# route updates. Fabio will make up to the configured number of +# concurrent calls to Consul to fetch status data for route +# updates. +# +# The default is +# +# registry.consul.serviceMonitors = 1 + + # glob.matching.disabled disables glob matching on route lookups # If glob matching is enabled there is a performance decrease # for every route lookup. At a large number of services (> 500) this diff --git a/registry/consul/backend.go b/registry/consul/backend.go index 0d40448ca..629bd925d 100644 --- a/registry/consul/backend.go +++ b/registry/consul/backend.go @@ -125,8 +125,9 @@ func (b *be) WatchServices() chan string { log.Printf("[INFO] consul: Using dynamic routes") log.Printf("[INFO] consul: Using tag prefix %q", b.cfg.TagPrefix) + m := NewServiceMonitor(b.c, b.cfg, b.dc) svc := make(chan string) - go watchServices(b.c, b.cfg, svc) + go m.Watch(svc) return svc } diff --git a/registry/consul/parse.go b/registry/consul/parse.go deleted file mode 100644 index e34caa755..000000000 --- a/registry/consul/parse.go +++ /dev/null @@ -1,49 +0,0 @@ -package consul - -import ( - "log" - "os" - "strings" -) - -// parseURLPrefixTag expects an input in the form of 'tag-host/path[ opts]' -// and returns the lower cased host and the unaltered path if the -// prefix matches the tag. -func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts string, ok bool) { - // expand $x or ${x} to env[x] or "" - expand := func(s string) string { - return os.Expand(s, func(x string) string { - if env == nil { - return "" - } - return env[x] - }) - } - - s = strings.TrimSpace(s) - if !strings.HasPrefix(s, prefix) { - return "", "", false - } - s = strings.TrimSpace(s[len(prefix):]) - - p := strings.SplitN(s, " ", 2) - if len(p) == 2 { - opts = p[1] - } - s = p[0] - - // prefix is ":port" - if strings.HasPrefix(s, ":") { - return s, opts, true - } - - // prefix is "host/path" - p = strings.SplitN(s, "/", 2) - if len(p) == 1 { - log.Printf("[WARN] consul: Invalid %s tag %q - You need to have a trailing slash!", prefix, s) - return "", "", false - } - host, path := p[0], p[1] - - return strings.ToLower(expand(host)) + "/" + expand(path), opts, true -} diff --git a/registry/consul/routecmd.go b/registry/consul/routecmd.go new file mode 100644 index 000000000..5281753bf --- /dev/null +++ b/registry/consul/routecmd.go @@ -0,0 +1,140 @@ +package consul + +import ( + "fmt" + "log" + "net" + "os" + "runtime" + "strconv" + "strings" + + "github.com/hashicorp/consul/api" +) + +// routecmd builds a route command. +type routecmd struct { + // svc is the consul service instance. + svc *api.CatalogService + + // prefix is the prefix of urlprefix tags. e.g. 'urlprefix-'. + prefix string + + env map[string]string +} + +func (r routecmd) build() []string { + var svctags, routetags []string + for _, t := range r.svc.ServiceTags { + if strings.HasPrefix(t, r.prefix) { + routetags = append(routetags, t) + } else { + svctags = append(svctags, t) + } + } + + // generate route commands + var config []string + for _, tag := range routetags { + if route, opts, ok := parseURLPrefixTag(tag, r.prefix, r.env); ok { + name, addr, port := r.svc.ServiceName, r.svc.ServiceAddress, r.svc.ServicePort + + // use consul node address if service address is not set + if addr == "" { + addr = r.svc.Address + } + + // add .local suffix on OSX for simple host names w/o domain + if runtime.GOOS == "darwin" && !strings.Contains(addr, ".") && !strings.HasSuffix(addr, ".local") { + addr += ".local" + } + + addr = net.JoinHostPort(addr, strconv.Itoa(port)) + //tags := strings.Join(r.tags, ",") + dst := "http://" + addr + "/" + + var weight string + var ropts []string + for _, o := range strings.Fields(opts) { + switch { + case o == "proto=tcp": + dst = "tcp://" + addr + + case o == "proto=https": + dst = "https://" + addr + + case strings.HasPrefix(o, "weight="): + weight = o[len("weight="):] + + case strings.HasPrefix(o, "redirect="): + redir := strings.Split(o[len("redirect="):], ",") + if len(redir) == 2 { + dst = redir[1] + ropts = append(ropts, fmt.Sprintf("redirect=%s", redir[0])) + } else { + log.Printf("[ERROR] Invalid syntax for redirect: %s. should be redirect=,", o) + continue + } + default: + ropts = append(ropts, o) + } + } + + cfg := "route add " + name + " " + route + " " + dst + if weight != "" { + cfg += " weight " + weight + } + if len(svctags) > 0 { + cfg += " tags " + strconv.Quote(strings.Join(svctags, ",")) + } + if len(ropts) > 0 { + cfg += " opts " + strconv.Quote(strings.Join(ropts, " ")) + } + + config = append(config, cfg) + } + } + return config +} + +// parseURLPrefixTag expects an input in the form of 'tag-host/path[ opts]' +// and returns the lower cased host and the unaltered path if the +// prefix matches the tag. +func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts string, ok bool) { + // expand $x or ${x} to env[x] or "" + expand := func(s string) string { + return os.Expand(s, func(x string) string { + if env == nil { + return "" + } + return env[x] + }) + } + + s = strings.TrimSpace(s) + if !strings.HasPrefix(s, prefix) { + return "", "", false + } + s = strings.TrimSpace(s[len(prefix):]) + + p := strings.SplitN(s, " ", 2) + if len(p) == 2 { + opts = p[1] + } + s = p[0] + + // prefix is ":port" + if strings.HasPrefix(s, ":") { + return s, opts, true + } + + // prefix is "host/path" + p = strings.SplitN(s, "/", 2) + if len(p) == 1 { + log.Printf("[WARN] consul: Invalid %s tag %q - You need to have a trailing slash!", prefix, s) + return "", "", false + } + host, path := p[0], p[1] + + return strings.ToLower(expand(host)) + "/" + expand(path), opts, true +} diff --git a/registry/consul/parse_test.go b/registry/consul/routecmd_test.go similarity index 64% rename from registry/consul/parse_test.go rename to registry/consul/routecmd_test.go index 6c3054a02..2c906e476 100644 --- a/registry/consul/parse_test.go +++ b/registry/consul/routecmd_test.go @@ -1,6 +1,58 @@ package consul -import "testing" +import ( + "reflect" + "testing" + + "github.com/hashicorp/consul/api" +) + +func TestRouteCmd(t *testing.T) { + cases := []struct { + name string + r routecmd + cfg []string + }{ + { + name: "http", + r: routecmd{ + prefix: "p-", + svc: &api.CatalogService{ + ServiceName: "svc-1", + ServiceAddress: "1.1.1.1", + ServicePort: 2222, + ServiceTags: []string{`p-foo/bar`}, + }, + }, + cfg: []string{ + `route add svc-1 foo/bar http://1.1.1.1:2222/`, + }, + }, + { + name: "tcp", + r: routecmd{ + prefix: "p-", + svc: &api.CatalogService{ + ServiceName: "svc-1", + ServiceAddress: "1.1.1.1", + ServicePort: 2222, + ServiceTags: []string{`p-:1234 proto=tcp`}, + }, + }, + cfg: []string{ + `route add svc-1 :1234 tcp://1.1.1.1:2222`, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got, want := c.r.build(), c.cfg; !reflect.DeepEqual(got, want) { + t.Fatalf("\ngot %#v\nwant %#v", got, want) + } + }) + } +} func TestParseTag(t *testing.T) { prefix := "p-" diff --git a/registry/consul/service.go b/registry/consul/service.go index 18bd4c38d..f9ad1067e 100644 --- a/registry/consul/service.go +++ b/registry/consul/service.go @@ -3,10 +3,7 @@ package consul import ( "fmt" "log" - "net" - "runtime" "sort" - "strconv" "strings" "time" @@ -14,15 +11,30 @@ import ( "github.com/hashicorp/consul/api" ) -// watchServices monitors the consul health checks and creates a new configuration -// on every change. -func watchServices(client *api.Client, config *config.Consul, svcConfig chan string) { - var lastIndex uint64 - var strict bool = strings.EqualFold("all", config.ChecksRequired) +// ServiceMonitor generates fabio configurations from consul state. +type ServiceMonitor struct { + client *api.Client + config *config.Consul + dc string + strict bool +} + +func NewServiceMonitor(client *api.Client, config *config.Consul, dc string) *ServiceMonitor { + return &ServiceMonitor{ + client: client, + config: config, + dc: dc, + strict: config.ChecksRequired == "all", + } +} +// Watch monitors the consul health checks and sends a new +// configuration to the updates channnel on every change. +func (w *ServiceMonitor) Watch(updates chan string) { + var lastIndex uint64 for { q := &api.QueryOptions{RequireConsistent: true, WaitIndex: lastIndex} - checks, meta, err := client.Health().State("any", q) + checks, meta, err := w.client.Health().State("any", q) if err != nil { log.Printf("[WARN] consul: Error fetching health state. %v", err) time.Sleep(time.Second) @@ -30,14 +42,21 @@ func watchServices(client *api.Client, config *config.Consul, svcConfig chan str } log.Printf("[DEBUG] consul: Health changed to #%d", meta.LastIndex) - svcConfig <- servicesConfig(client, passingServices(checks, config.ServiceStatus, strict), config.TagPrefix) + + // determine which services have passing health checks + passing := passingServices(checks, w.config.ServiceStatus, w.strict) + + // build the config for the passing services + updates <- w.makeConfig(passing) + + // remember the last state and wait for the next change lastIndex = meta.LastIndex } } -// servicesConfig determines which service instances have passing health checks +// makeCconfig determines which service instances have passing health checks // and then finds the ones which have tags with the right prefix to build the config from. -func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix string) string { +func (w *ServiceMonitor) makeConfig(checks []*api.HealthCheck) string { // map service name to list of service passing for which the health check is ok m := map[string]map[string]bool{} for _, check := range checks { @@ -52,9 +71,25 @@ func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix str m[name][id] = true } - var config []string + n := w.config.ServiceMonitors + if n <= 0 { + n = 1 + } + + sem := make(chan int, n) + cfgs := make(chan []string, len(m)) for name, passing := range m { - cfg := serviceConfig(client, name, passing, tagPrefix) + name, passing := name, passing + go func() { + sem <- 1 + cfgs <- w.serviceConfig(name, passing) + <-sem + }() + } + + var config []string + for i := 0; i < len(m); i++ { + cfg := <-cfgs config = append(config, cfg...) } @@ -65,99 +100,36 @@ func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix str } // serviceConfig constructs the config for all good instances of a single service. -func serviceConfig(client *api.Client, name string, passing map[string]bool, tagPrefix string) (config []string) { +func (w *ServiceMonitor) serviceConfig(name string, passing map[string]bool) (config []string) { if name == "" || len(passing) == 0 { return nil } - dc, err := datacenter(client) - if err != nil { - log.Printf("[WARN] consul: Error getting datacenter. %s", err) - return nil - } - q := &api.QueryOptions{RequireConsistent: true} - svcs, _, err := client.Catalog().Service(name, "", q) + svcs, _, err := w.client.Catalog().Service(name, "", q) if err != nil { log.Printf("[WARN] consul: Error getting catalog service %s. %v", name, err) return nil } env := map[string]string{ - "DC": dc, + "DC": w.dc, } for _, svc := range svcs { - // check if the instance is in the list of instances - // which passed the health check - if _, ok := passing[fmt.Sprintf("%s.%s", svc.Node, svc.ServiceID)]; !ok { + // check if this instance passed the health check + if _, ok := passing[svc.Node+"."+svc.ServiceID]; !ok { continue } - // get all tags which do not have the tag prefix - var svctags []string - for _, tag := range svc.ServiceTags { - if !strings.HasPrefix(tag, tagPrefix) { - svctags = append(svctags, tag) - } + r := routecmd{ + svc: svc, + env: env, + prefix: w.config.TagPrefix, } + cmds := r.build() - // generate route commands - for _, tag := range svc.ServiceTags { - if route, opts, ok := parseURLPrefixTag(tag, tagPrefix, env); ok { - name, addr, port := svc.ServiceName, svc.ServiceAddress, svc.ServicePort - - // use consul node address if service address is not set - if addr == "" { - addr = svc.Address - } - - // add .local suffix on OSX for simple host names w/o domain - if runtime.GOOS == "darwin" && !strings.Contains(addr, ".") && !strings.HasSuffix(addr, ".local") { - addr += ".local" - } - - // build route command - weight := "" - ropts := []string{} - tags := strings.Join(svctags, ",") - addr = net.JoinHostPort(addr, strconv.Itoa(port)) - dst := "http://" + addr + "/" - for _, o := range strings.Fields(opts) { - switch { - case o == "proto=tcp": - dst = "tcp://" + addr - case o == "proto=https": - dst = "https://" + addr - case strings.HasPrefix(o, "weight="): - weight = o[len("weight="):] - case strings.HasPrefix(o, "redirect="): - redir := strings.Split(o[len("redirect="):], ",") - if len(redir) == 2 { - dst = redir[1] - ropts = append(ropts, fmt.Sprintf("redirect=%s", redir[0])) - } else { - log.Printf("[ERROR] Invalid syntax for redirect: %s. should be redirect=,", o) - continue - } - default: - ropts = append(ropts, o) - } - } - - cfg := "route add " + name + " " + route + " " + dst - if weight != "" { - cfg += " weight " + weight - } - if tags != "" { - cfg += " tags " + strconv.Quote(tags) - } - if len(ropts) > 0 { - cfg += " opts " + strconv.Quote(strings.Join(ropts, " ")) - } - config = append(config, cfg) - } - } + config = append(config, cmds...) } return config }