Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consul server resync when server and client point to the same Consul agent #4365

Merged
merged 2 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat

vclient := vaultclient.NewMockVaultClient()
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, logger)
serviceClient := consul.NewServiceClient(cclient, logger, true)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
Expand Down Expand Up @@ -1860,7 +1860,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
// backed by a mock consul whose checks are always unhealthy.
consulAgent := consul.NewMockAgent()
consulAgent.SetStatus("critical")
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger)
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger, true)
go consulClient.Run()
defer consulClient.Shutdown()

Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
a.consulCatalog = client.Catalog()

// Create Consul Service client for service advertisement and checks.
a.consulService = consul.NewServiceClient(client.Agent(), a.logger)
a.consulService = consul.NewServiceClient(client.Agent(), a.logger, a.Client() != nil)

// Run the Consul service client's sync'ing main loop
go a.consulService.Run()
Expand Down
25 changes: 21 additions & 4 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,17 @@ type ServiceClient struct {

// checkWatcher restarts checks that are unhealthy.
checkWatcher *checkWatcher

// isClientAgent specifies whether this Consul client is being used
// by a Nomad client.
isClientAgent bool
}

// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient {
// Client, logger and takes whether the client is being used by a Nomad Client agent.
// When being used by a Nomad client, this Consul client reconciles all services and
// checks created by Nomad on behalf of running tasks.
func NewServiceClient(consulClient AgentAPI, logger *log.Logger, isNomadClient bool) *ServiceClient {
return &ServiceClient{
client: consulClient,
logger: logger,
Expand All @@ -255,6 +261,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
}
}

Expand Down Expand Up @@ -433,7 +440,12 @@ func (c *ServiceClient) sync() error {
// Known service, skip
continue
}
if !isNomadService(id) {

// Ignore if this is not a Nomad managed service. Also ignore
// Nomad managed services if this is not a client agent.
// This is to prevent server agents from removing services
// registered by client agents
if !isNomadService(id) || !c.isClientAgent {
// Not managed by Nomad, skip
continue
}
Expand Down Expand Up @@ -470,7 +482,12 @@ func (c *ServiceClient) sync() error {
// Known check, leave it
continue
}
if !isNomadService(check.ServiceID) {

// Ignore if this is not a Nomad managed check. Also ignore
// Nomad managed checks if this is not a client agent.
// This is to prevent server agents from removing checks
// registered by client agents
if !isNomadService(check.ServiceID) || !c.isClientAgent {
// Service not managed by Nomad, skip
continue
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/consul/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestConsul_Integration(t *testing.T) {
consulClient, err := consulapi.NewClient(consulConfig)
assert.Nil(err)

serviceClient := consul.NewServiceClient(consulClient.Agent(), logger)
serviceClient := consul.NewServiceClient(consulClient.Agent(), logger, true)
defer serviceClient.Shutdown() // just-in-case cleanup
consulRan := make(chan struct{})
go func() {
Expand Down
2 changes: 1 addition & 1 deletion command/agent/consul/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
tt := testTask()
return &testFakeCtx{
ServiceClient: NewServiceClient(fc, testlog.Logger(t)),
ServiceClient: NewServiceClient(fc, testlog.Logger(t), true),
FakeConsul: fc,
Task: tt,
MockExec: tt.DriverExec.(*mockExec),
Expand Down