Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Consul ESM to support Admin Partitions #281

Merged
merged 10 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
ndhanushkodi marked this conversation as resolved.
Show resolved Hide resolved
"version": "0.2.0",
"configurations": [
{
"name": "Launch Consul ESM",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}",
"args": ["--config-file=${workspaceFolder}/config.hcl"]
}
]
}
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ token = ""
// The Consul datacenter to use.
datacenter = "dc1"

// The target Admin Partition to use.
partition = ""

// The CA file to use for talking to Consul over TLS. Can also be provided
// though the CONSUL_CACERT environment variable.
ca_file = ""
Expand Down
63 changes: 54 additions & 9 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (a *Agent) Run() error {
wg.Wait()

// Clean up.
if err := a.client.Agent().ServiceDeregister(a.serviceID()); err != nil {
if err := a.client.Agent().ServiceDeregisterOpts(a.serviceID(), a.ConsulQueryOption()); err != nil {
a.logger.Warn("Failed to deregister service", "error", err)
}

Expand Down Expand Up @@ -200,7 +200,7 @@ func (e *alreadyExistsError) Error() string {
// register is used to register this agent with Consul service discovery.
func (a *Agent) register() error {
// agent ids need to be unique to disambiguate different instances on same host
if existing, _, _ := a.client.Agent().Service(a.serviceID(), nil); existing != nil {
if existing, _, _ := a.client.Agent().Service(a.serviceID(), a.ConsulQueryOption()); existing != nil {
return &alreadyExistsError{a.serviceID()}
}

Expand All @@ -209,6 +209,10 @@ func (a *Agent) register() error {
Name: a.config.Service,
Meta: a.serviceMeta(),
}
a.HasPartition(func(partition string) {
service.Partition = partition
})

if a.config.Tag != "" {
service.Tags = []string{a.config.Tag}
}
Expand Down Expand Up @@ -279,7 +283,7 @@ func (a *Agent) runRegister() {
return

case <-time.After(agentTTL):
services, err := a.client.Agent().Services()
services, err := a.client.Agent().ServicesWithFilterOpts("", a.ConsulQueryOption())
if err != nil {
a.logger.Error("Failed to check services (will retry)", "error", err)
time.Sleep(retryTime)
Expand Down Expand Up @@ -323,7 +327,10 @@ REGISTER:
DeregisterCriticalServiceAfter: deregisterTime.String(),
},
}
if err := a.client.Agent().CheckRegister(check); err != nil {
a.HasPartition(func(partition string) {
srahul3 marked this conversation as resolved.
Show resolved Hide resolved
check.Partition = partition
})
if err := a.client.Agent().CheckRegisterOpts(check, a.ConsulQueryOption()); err != nil {
a.logger.Error("Failed to register TTL check (will retry)", "error", err)
time.Sleep(retryTime)
ndhanushkodi marked this conversation as resolved.
Show resolved Hide resolved
goto REGISTER
Expand All @@ -336,7 +343,7 @@ REGISTER:
return

case <-time.After(agentTTL / 2):
if err := a.client.Agent().UpdateTTL(ttlID, "", api.HealthPassing); err != nil {
if err := a.client.Agent().UpdateTTLOpts(ttlID, "", api.HealthPassing, a.ConsulQueryOption()); err != nil {
a.logger.Error("Failed to refresh agent TTL check (will reregister)", "error", err)
time.Sleep(retryTime)
goto REGISTER
Expand Down Expand Up @@ -501,17 +508,21 @@ func (a *Agent) watchHealthChecks(nodeListCh chan map[string]bool) {
}

func (a *Agent) getHealthChecks(waitIndex uint64, nodes map[string]bool) (api.HealthChecks, uint64) {
namespaces, err := namespacesList(a.client)
namespaces, err := namespacesList(a.client, a.config)
if err != nil {
a.logger.Warn("Error getting namespaces, falling back to default namespace", "error", err)
namespaces = []*api.Namespace{{Name: ""}}
}

ctx, cancelFunc := context.WithCancel(context.Background())
opts := (&api.QueryOptions{
opts := &api.QueryOptions{
NodeMeta: a.config.NodeMeta,
WaitIndex: waitIndex,
}).WithContext(ctx)
}
opts = opts.WithContext(ctx)
a.HasPartition(func(partition string) {
srahul3 marked this conversation as resolved.
Show resolved Hide resolved
opts.Partition = partition
})
defer cancelFunc()
go func() {
select {
Expand Down Expand Up @@ -595,7 +606,7 @@ VERIFYCONSULSERVER:
}

// Fetch server versions
svs, _, err := a.client.Catalog().Service("consul", "", nil)
svs, _, err := a.client.Catalog().Service("consul", "", a.ConsulQueryOption())
if err != nil {
if strings.Contains(err.Error(), "429") {
// 429 is a warning that something is unhealthy. This may occur when ESM
Expand Down Expand Up @@ -635,3 +646,37 @@ VERIFYCONSULSERVER:
a.logger.Debug("Consul agent and all servers are running compatible versions with ESM")
return nil
ndhanushkodi marked this conversation as resolved.
Show resolved Hide resolved
}

// PartitionOrEmpty returns the partition if it exists, otherwise returns an empty string.
func (a *Agent) PartitionOrEmpty() string {
if a.config == nil || a.config.Partition == "" {
return ""
}
return a.config.Partition
}

// HasPartition checks if the partition is valid and calls the callback with the partition if it has any.
func (a *Agent) HasPartition(callback func(partition string)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads a little strange to me, is this checking if a partition is valid then it should be returning a bool to indicate if it's valid (and should probably be named ValidPartition) though this isn't checking if the partition is valid, it's just checking if the partition on the agent is non-emtpy and non-default, I'd probably get rid of this function entirely and re-write where it's used to just assign the output of PartitionOrEmtpy

partition := a.PartitionOrEmpty()

if partition == "" || strings.ToLower(partition) == "default" {
// Ignore empty or default partitions
return
}

callback(a.config.Partition)
}

// ConsulQueryOption constructs and returns a new api.QueryOptions object.
// If the Agent has a valid partition, it sets the partition in the QueryOptions.
//
// Returns:
// *api.QueryOptions: A new QueryOptions object with the partition set if applicable.

func (a *Agent) ConsulQueryOption() *api.QueryOptions {
opts := &api.QueryOptions{}
a.HasPartition(func(partition string) {
opts.Partition = partition
})
return opts
}
174 changes: 173 additions & 1 deletion agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/api"
Expand All @@ -36,7 +37,6 @@ func testAgent(t *testing.T, cb func(*Config)) *Agent {
if cb != nil {
cb(conf)
}

agent, err := NewAgent(conf, logger)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -570,3 +570,175 @@ func TestAgent_getHealthChecks(t *testing.T) {
}
})
}

func TestAgent_PartitionOrEmpty(t *testing.T) {
conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

cases := []struct {
name, partition, expected string
}{
{"No partition", "", ""},
{"default partition", "default", "default"},
{"admin partition", "admin", "admin"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
conf.Partition = tc.partition

agent := &Agent{
config: conf,
}

assert.Equal(t, tc.expected, agent.PartitionOrEmpty())
})
}
}

func TestAgent_getHealthChecksWithPartition(t *testing.T) {
testPartition := "test-partition"
notUniqueInstanceID := "not-unique-instance-id"
partitionQueryParamKey := "partition"
t.Run("with-partition", func(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
addr := fmt.Sprintf("127.0.0.1:%d", port)
testNs := map[string]bool{}
ts := httptest.NewUnstartedServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.EscapedPath() {
case "/v1/status/leader":

assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
fmt.Fprint(w, `"`+addr+`"`)
case "/v1/namespaces":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
fmt.Fprint(w, testNamespaces())
case "/v1/health/state/any":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
namespace := r.URL.Query()["ns"][0]
testNs[namespace] = true
if namespace == "default" {
fmt.Fprint(w, "[]")
}
fmt.Fprint(w, testHealthChecks(r.URL.Query()["ns"][0]))
case "/v1/agent/service/consul-esm:not-unique-instance-id":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
// write status 404, to tell the service is not registered and proceed with registration
w.WriteHeader(http.StatusNotFound)
case "/v1/agent/service/register":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
var svc api.AgentServiceRegistration
err := json.NewDecoder(r.Body).Decode(&svc)
require.NoError(t, err)
assert.Equal(t, testPartition, svc.Partition)
w.WriteHeader(http.StatusOK)
case "/v1/kv/consul-esm/agents/consul-esm:not-unique-instance-id":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
case "/v1/session/create":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
case "/v1/agent/check/register":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
case "/v1/agent/check/update/consul-esm:not-unique-instance-id:agent-ttl":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
case "/v1/catalog/service/consul-esm":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))
case "/v1/agent/services":
assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey))

default:
t.Log("unhandled:", r.URL.EscapedPath())
}
}))
ts.Listener = listener
ts.Start()
defer ts.Close()

agent := testAgent(t, func(c *Config) {
c.HTTPAddr = addr
c.Tag = "test"
c.Partition = testPartition
c.InstanceID = notUniqueInstanceID
})
defer agent.Shutdown()

ourNodes := map[string]bool{"foo": true}
ourChecks, _ := agent.getHealthChecks(0, ourNodes)
if len(ourChecks) != 2 {
t.Error("should be 2 checks, got", len(ourChecks))
}
ns1check := ourChecks[0]
ns2check := ourChecks[1]
if ns1check.CheckID != "ns1_svc_ck" {
t.Error("Wrong check id:", ns1check.CheckID)
}
if ns2check.CheckID != "ns2_svc_ck" {
t.Error("Wrong check id:", ns1check.CheckID)
}

// test the state API is called for each namespace in an agent's partition
assert.Len(t, testNs, 3)
})
}

func TestAgent_HasPartition(t *testing.T) {
conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

cases := []struct {
name, partition, expected string
}{
{"No partition", "", ""},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the struct keys here so it's a bit clearer what each value means?

{"default partition", "default", ""},
{"admin partition", "admin", "admin"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
conf.Partition = tc.partition

agent := &Agent{
config: conf,
}

actualPartition := ""
agent.HasPartition(func(partition string) {
actualPartition = partition
})

assert.Equal(t, tc.expected, actualPartition)
})
}
}

func TestAgent_ConsulQueryOptions(t *testing.T) {
conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

cases := []struct {
name, partition, expected string
}{
{"No partition", "", ""},
{"default partition", "default", ""},
{"admin partition", "admin", "admin"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
conf.Partition = tc.partition

agent := &Agent{
config: conf,
}

opts := agent.ConsulQueryOption()

assert.Equal(t, tc.expected, opts.Partition)
})
}
}
15 changes: 12 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ type Config struct {
SyslogFacility string
LogJSON bool

Service string
Tag string
KVPath string
Partition string
Service string
Tag string
KVPath string

InstanceID string
NodeMeta map[string]string
Expand Down Expand Up @@ -84,6 +85,11 @@ func (c *Config) ClientConfig() *api.Config {
if c.Datacenter != "" {
conf.Datacenter = c.Datacenter
}

if c.Partition != "" {
conf.Partition = c.Partition
}

if c.CAFile != "" {
conf.TLSConfig.CAFile = c.CAFile
}
Expand Down Expand Up @@ -127,6 +133,7 @@ func DefaultConfig() (*Config, error) {
NodeReconnectTimeout: 72 * time.Hour,
PingType: PingTypeUDP,
DisableCoordinateUpdates: false,
Partition: "",
}, nil
}

Expand Down Expand Up @@ -169,6 +176,7 @@ type HumanConfig struct {
Tag flags.StringValue `mapstructure:"consul_service_tag"`
KVPath flags.StringValue `mapstructure:"consul_kv_path"`
NodeMeta []map[string]string `mapstructure:"external_node_meta"`
Partition flags.StringValue `mapstructure:"partition"`

NodeReconnectTimeout flags.DurationValue `mapstructure:"node_reconnect_timeout"`
NodeProbeInterval flags.DurationValue `mapstructure:"node_probe_interval"`
Expand Down Expand Up @@ -461,6 +469,7 @@ func MergeConfig(dst *Config, src *HumanConfig) error {
src.EnableSyslog.Merge(&dst.EnableSyslog)
src.InstanceID.Merge(&dst.InstanceID)
src.Service.Merge(&dst.Service)
src.Partition.Merge(&dst.Partition)
src.Tag.Merge(&dst.Tag)
src.KVPath.Merge(&dst.KVPath)
if len(src.NodeMeta) == 1 {
Expand Down
Loading