Skip to content

Commit

Permalink
fingerprint: add support for fingerprinting multiple Consul clusters
Browse files Browse the repository at this point in the history
Add fingerprinting we'll need to accept multiple Consul clusters in upcoming
Nomad Enterprise features. The fingerprinter will create a map of Consul clients
by cluster name. In Nomad CE, all but the default cluster will be ignored and
there will be no visible behavior change.

Ref: hashicorp/team-nomad#404
  • Loading branch information
tgross committed Sep 7, 2023
1 parent c474433 commit 33a2046
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 215 deletions.
160 changes: 98 additions & 62 deletions client/fingerprint/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ import (
"time"

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
)

const (
consulAvailable = "available"
consulUnavailable = "unavailable"
"github.com/hashicorp/nomad/nomad/structs/config"
)

var (
Expand All @@ -30,10 +28,14 @@ var (

// ConsulFingerprint is used to fingerprint for Consul
type ConsulFingerprint struct {
logger log.Logger
client *consulapi.Client
lastState string
extractors map[string]consulExtractor
logger log.Logger
states map[string]*consulFingerprintState
}

type consulFingerprintState struct {
client *consulapi.Client
isAvailable bool
extractors map[string]consulExtractor
}

// consulExtractor is used to parse out one attribute from consulInfo. Returns
Expand All @@ -43,29 +45,48 @@ type consulExtractor func(agentconsul.Self) (string, bool)
// NewConsulFingerprint is used to create a Consul fingerprint
func NewConsulFingerprint(logger log.Logger) Fingerprint {
return &ConsulFingerprint{
logger: logger.Named("consul"),
lastState: consulUnavailable,
logger: logger.Named("consul"),
states: map[string]*consulFingerprintState{},
}
}

func (f *ConsulFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
var mErr *multierror.Error
for _, cfg := range f.consulConfigs(req) {
err := f.fingerprintImpl(cfg, resp)
if err != nil {
mErr = multierror.Append(mErr, err)
}
}

return mErr.ErrorOrNil()
}

func (f *ConsulFingerprint) fingerprintImpl(cfg *config.ConsulConfig, resp *FingerprintResponse) error {

logger := f.logger.With("cluster", cfg.Name)

state, ok := f.states[cfg.Name]
if !ok {
state = &consulFingerprintState{}
f.states[cfg.Name] = state
}

// establish consul client if necessary
if err := f.initialize(req); err != nil {
if err := state.initialize(cfg, logger); err != nil {
return err
}

// query consul for agent self api
info := f.query(resp)
info := state.query(logger)
if len(info) == 0 {
// unable to reach consul, nothing to do this time
return nil
}

// apply the extractor for each attribute
for attr, extractor := range f.extractors {
for attr, extractor := range state.extractors {
if s, ok := extractor(info); !ok {
f.logger.Warn("unable to fingerprint consul", "attribute", attr)
logger.Warn("unable to fingerprint consul", "attribute", attr)
} else {
resp.AddAttribute(attr, s)
}
Expand All @@ -75,11 +96,11 @@ func (f *ConsulFingerprint) Fingerprint(req *FingerprintRequest, resp *Fingerpri
f.link(resp)

// indicate Consul is now available
if f.lastState == consulUnavailable {
f.logger.Info("consul agent is available")
if !state.isAvailable {
logger.Info("consul agent is available")
}

f.lastState = consulAvailable
state.isAvailable = true
resp.Detected = true
return nil
}
Expand All @@ -88,46 +109,61 @@ func (f *ConsulFingerprint) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}

func (f *ConsulFingerprint) initialize(req *FingerprintRequest) error {
// Only create the Consul client once to avoid creating many connections
if f.client == nil {
consulConfig, err := req.Config.ConsulConfig.ApiConfig()
if err != nil {
return fmt.Errorf("failed to initialize Consul client config: %v", err)
}
func (cfs *consulFingerprintState) initialize(cfg *config.ConsulConfig, logger hclog.Logger) error {
if cfs.client != nil {
return nil // already initialized!
}

f.client, err = consulapi.NewClient(consulConfig)
if err != nil {
return fmt.Errorf("failed to initialize Consul client: %s", err)
}
consulConfig, err := cfg.ApiConfig()
if err != nil {
return fmt.Errorf("failed to initialize Consul client config: %v", err)
}

cfs.client, err = consulapi.NewClient(consulConfig)
if err != nil {
return fmt.Errorf("failed to initialize Consul client: %v", err)
}

f.extractors = map[string]consulExtractor{
"consul.server": f.server,
"consul.version": f.version,
"consul.sku": f.sku,
"consul.revision": f.revision,
"unique.consul.name": f.name,
"consul.datacenter": f.dc,
"consul.segment": f.segment,
"consul.connect": f.connect,
"consul.grpc": f.grpc(consulConfig.Scheme),
"consul.ft.namespaces": f.namespaces,
if cfg.Name == "default" {
cfs.extractors = map[string]consulExtractor{
"consul.server": cfs.server,
"consul.version": cfs.version,
"consul.sku": cfs.sku,
"consul.revision": cfs.revision,
"unique.consul.name": cfs.name, // note: won't have this for non-default clusters
"consul.datacenter": cfs.dc,
"consul.segment": cfs.segment,
"consul.connect": cfs.connect,
"consul.grpc": cfs.grpc(consulConfig.Scheme, logger),
"consul.ft.namespaces": cfs.namespaces,
}
} else {
cfs.extractors = map[string]consulExtractor{
fmt.Sprintf("consul.%s.server", cfg.Name): cfs.server,
fmt.Sprintf("consul.%s.version", cfg.Name): cfs.version,
fmt.Sprintf("consul.%s.sku", cfg.Name): cfs.sku,
fmt.Sprintf("consul.%s.revision", cfg.Name): cfs.revision,
fmt.Sprintf("consul.%s.datacenter", cfg.Name): cfs.dc,
fmt.Sprintf("consul.%s.segment", cfg.Name): cfs.segment,
fmt.Sprintf("consul.%s.connect", cfg.Name): cfs.connect,
fmt.Sprintf("consul.%s.grpc", cfg.Name): cfs.grpc(consulConfig.Scheme, logger),
fmt.Sprintf("consul.%s.ft.namespaces", cfg.Name): cfs.namespaces,
}
}

return nil
}

func (f *ConsulFingerprint) query(resp *FingerprintResponse) agentconsul.Self {
func (cfs *consulFingerprintState) query(logger hclog.Logger) agentconsul.Self {
// We'll try to detect consul by making a query to to the agent's self API.
// If we can't hit this URL consul is probably not running on this machine.
info, err := f.client.Agent().Self()
info, err := cfs.client.Agent().Self()
if err != nil {
// indicate consul no longer available
if f.lastState == consulAvailable {
f.logger.Info("consul agent is unavailable")
if cfs.isAvailable {
logger.Info("consul agent is unavailable: %v", err)
}
f.lastState = consulUnavailable
cfs.isAvailable = false
return nil
}
return info
Expand All @@ -143,36 +179,36 @@ func (f *ConsulFingerprint) link(resp *FingerprintResponse) {
}
}

func (f *ConsulFingerprint) server(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) server(info agentconsul.Self) (string, bool) {
s, ok := info["Config"]["Server"].(bool)
return strconv.FormatBool(s), ok
}

func (f *ConsulFingerprint) version(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) version(info agentconsul.Self) (string, bool) {
v, ok := info["Config"]["Version"].(string)
return v, ok
}

func (f *ConsulFingerprint) sku(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) sku(info agentconsul.Self) (string, bool) {
return agentconsul.SKU(info)
}

func (f *ConsulFingerprint) revision(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) revision(info agentconsul.Self) (string, bool) {
r, ok := info["Config"]["Revision"].(string)
return r, ok
}

func (f *ConsulFingerprint) name(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) name(info agentconsul.Self) (string, bool) {
n, ok := info["Config"]["NodeName"].(string)
return n, ok
}

func (f *ConsulFingerprint) dc(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) dc(info agentconsul.Self) (string, bool) {
d, ok := info["Config"]["Datacenter"].(string)
return d, ok
}

func (f *ConsulFingerprint) segment(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) segment(info agentconsul.Self) (string, bool) {
tags, tagsOK := info["Member"]["Tags"].(map[string]interface{})
if !tagsOK {
return "", false
Expand All @@ -181,12 +217,12 @@ func (f *ConsulFingerprint) segment(info agentconsul.Self) (string, bool) {
return s, ok
}

func (f *ConsulFingerprint) connect(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) connect(info agentconsul.Self) (string, bool) {
c, ok := info["DebugConfig"]["ConnectEnabled"].(bool)
return strconv.FormatBool(c), ok
}

func (f *ConsulFingerprint) grpc(scheme string) func(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) grpc(scheme string, logger hclog.Logger) func(info agentconsul.Self) (string, bool) {
return func(info agentconsul.Self) (string, bool) {

// The version is needed in order to understand which config object to
Expand All @@ -199,38 +235,38 @@ func (f *ConsulFingerprint) grpc(scheme string) func(info agentconsul.Self) (str

consulVersion, err := version.NewVersion(strings.TrimSpace(v))
if err != nil {
f.logger.Warn("invalid Consul version", "version", v)
logger.Warn("invalid Consul version", "version", v)
return "", false
}

// If the Consul agent being fingerprinted is running a version less
// than 1.14.0 we use the original single gRPC port.
if consulVersion.Core().LessThan(consulGRPCPortChangeVersion.Core()) {
return f.grpcPort(info)
return cfs.grpcPort(info)
}

// Now that we know we are querying a Consul agent running v1.14.0 or
// greater, we need to select the correct port parameter from the
// config depending on whether we have been asked to speak TLS or not.
switch strings.ToLower(scheme) {
case "https":
return f.grpcTLSPort(info)
return cfs.grpcTLSPort(info)
default:
return f.grpcPort(info)
return cfs.grpcPort(info)
}
}
}

func (f *ConsulFingerprint) grpcPort(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) grpcPort(info agentconsul.Self) (string, bool) {
p, ok := info["DebugConfig"]["GRPCPort"].(float64)
return fmt.Sprintf("%d", int(p)), ok
}

func (f *ConsulFingerprint) grpcTLSPort(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) grpcTLSPort(info agentconsul.Self) (string, bool) {
p, ok := info["DebugConfig"]["GRPCTLSPort"].(float64)
return fmt.Sprintf("%d", int(p)), ok
}

func (f *ConsulFingerprint) namespaces(info agentconsul.Self) (string, bool) {
func (cfs *consulFingerprintState) namespaces(info agentconsul.Self) (string, bool) {
return strconv.FormatBool(agentconsul.Namespaces(info)), true
}
23 changes: 23 additions & 0 deletions client/fingerprint/consul_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !ent

package fingerprint

import "github.com/hashicorp/nomad/nomad/structs/config"

// consulConfigs returns the set of Consul configurations the fingerprint needs
// to check. In Nomad CE we only check the default Consul.
func (f *ConsulFingerprint) consulConfigs(req *FingerprintRequest) map[string]*config.ConsulConfig {
agentCfg := req.Config
if agentCfg.ConsulConfig == nil {
return nil
}

if len(req.Config.ConsulConfigs) > 1 {
f.logger.Warn("multiple Consul configurations are only supported in Nomad Enterprise")
}

return map[string]*config.ConsulConfig{"default": agentCfg.ConsulConfig}
}
Loading

0 comments on commit 33a2046

Please sign in to comment.