Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service Discovery Integration #219

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
df6ddc0
agent: consul has settings for clients
ryanuber Oct 1, 2015
06185d8
client/fingerprint: use consul client defaults
ryanuber Oct 1, 2015
1327ea1
client: first pass at consul integration
ryanuber Oct 2, 2015
500f468
client: adding discovery context
ryanuber Oct 2, 2015
b6fda97
client: service registration works
ryanuber Oct 2, 2015
a542d44
client: dynamic ports register in discovery
ryanuber Oct 2, 2015
7385abf
client: handle deregister for discovery
ryanuber Oct 2, 2015
1b6f96c
client: move discovery into task runner
ryanuber Oct 2, 2015
13e709a
command: remove unused config
ryanuber Oct 2, 2015
7db8deb
client: simplify consul discovery
ryanuber Oct 2, 2015
436ae98
client: testing consul discovery
ryanuber Oct 2, 2015
7353697
client/discovery: simplify discovery layer
ryanuber Oct 3, 2015
56219f0
client/discovery: testing discovery layer
ryanuber Oct 3, 2015
a47457b
client/discovery: restore builtins between tests
ryanuber Oct 3, 2015
9647476
client: thread job ID and task group name through to task runner
ryanuber Oct 5, 2015
2ef0462
website: brain dumping initial discovery docs
ryanuber Oct 5, 2015
0122144
client/discovery: providers determine format of service name
ryanuber Oct 5, 2015
29096fc
client/discovery: cleanup
ryanuber Oct 5, 2015
fd46a84
client/discovery: changes for external testability
ryanuber Oct 5, 2015
8b53f16
website: update docs for jobspec
ryanuber Oct 5, 2015
8336aa1
client: pass through alloc ID to uniquely identify multiple task inst…
ryanuber Oct 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup {
// Task is a single process in a task group.
type Task struct {
Name string
Discover string
Driver string
Config map[string]string
Constraints []*Constraint
Expand Down
16 changes: 13 additions & 3 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/discovery"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -37,6 +38,8 @@ type AllocRunner struct {
updater AllocStateUpdater
logger *log.Logger

discovery *discovery.DiscoveryLayer

alloc *structs.Allocation

dirtyCh chan struct{}
Expand All @@ -63,11 +66,14 @@ type allocRunnerState struct {
}

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner {
func NewAllocRunner(
logger *log.Logger, config *config.Config, updater AllocStateUpdater,
disc *discovery.DiscoveryLayer, alloc *structs.Allocation) *AllocRunner {
ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
discovery: disc,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
Expand Down Expand Up @@ -100,7 +106,9 @@ func (r *AllocRunner) RestoreState() error {
var mErr multierror.Error
for name := range r.taskStatus {
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task)
tr := NewTaskRunner(
r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID,
r.alloc.JobID, r.alloc.TaskGroup, task, r.discovery)
r.tasks[name] = tr
if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
Expand Down Expand Up @@ -294,7 +302,9 @@ func (r *AllocRunner) Run() {
// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]

tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task)
tr := NewTaskRunner(
r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID,
r.alloc.JobID, r.alloc.TaskGroup, task, r.discovery)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down
2 changes: 1 addition & 1 deletion client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) {
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
ar := NewAllocRunner(logger, conf, upd.Update, alloc)
ar := NewAllocRunner(logger, conf, upd.Update, nil, alloc)
return upd, ar
}

Expand Down
38 changes: 31 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/discovery"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad"
Expand Down Expand Up @@ -86,6 +87,9 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex

// discovery is the set of enabled discovery subsystems
discovery *discovery.DiscoveryLayer

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -111,16 +115,21 @@ func NewClient(cfg *config.Config) (*Client, error) {
return nil, fmt.Errorf("failed intializing client: %v", err)
}

// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
}

// Setup the node
if err := c.setupNode(); err != nil {
return nil, fmt.Errorf("node setup failed: %v", err)
}

// Set up service discovery
if err := c.setupDiscovery(); err != nil {
return nil, fmt.Errorf("discovery setup failed: %v", err)
}

// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
}

// Fingerprint the node
if err := c.fingerprint(); err != nil {
return nil, fmt.Errorf("fingerprinting failed: %v", err)
Expand Down Expand Up @@ -319,7 +328,7 @@ func (c *Client) restoreState() error {
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc)
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, c.discovery, alloc)
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v",
Expand Down Expand Up @@ -460,6 +469,21 @@ func (c *Client) setupDrivers() error {
return nil
}

// setupDiscovery sets up the discovery layers and initializes them.
func (c *Client) setupDiscovery() error {
// Create the discovery layer
disc, err := discovery.NewDiscoveryLayer(discovery.Builtins, c.config,
c.logger, c.config.Node)
if err != nil {
return err
}
c.discovery = disc

avail := disc.EnabledProviders()
c.logger.Printf("[DEBUG] client: available discovery layers: %v", avail)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably collapse these as avail is never used other than in the log.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I was just trying to stay consistent with what drivers and fingerprints do. I can strings.Join() them or something, if you think that's better, but we should do the others too in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just suggesting:
c.logger.Printf("[DEBUG] client: available discovery layers: %v", disc.EnabledProviders())

return nil
}

// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
Expand Down Expand Up @@ -712,7 +736,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc)
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, c.discovery, alloc)
c.allocs[alloc.ID] = ar
go ar.Run()
return nil
Expand Down
82 changes: 82 additions & 0 deletions client/discovery/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package discovery

import (
"fmt"
"strings"

"github.com/hashicorp/consul/api"
)

// ConsulDiscovery is a back-end for service discovery which can be used
// to populate a local Consul agent with service information. Because
// Consul already has information about the local node, some shortcuts
// can be taken in this back-end. Specifically, the IP address of the Nomad
// agent does not need to be used, because Consul has this information
// already and may even be configured to expose services on an alternate
// advertise address.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be misreading this but I'm not sure this is a safe assumption to make. Since nomad may be managing tasks across multiple IPs and those services will be bound to a specific interface, the IP is actually important. Even if nomad or consul is listening on a particular IP, tasks are not necessarily registered on the same IP.

type ConsulDiscovery struct {
client *api.Client
Context
}

// NewConsulDiscovery creates a new Consul discovery provider using the
// configuration provided in the client options.
func NewConsulDiscovery(ctx Context) (Provider, error) {
// Build the config
conf := api.DefaultConfig()
conf.Datacenter = ctx.node.Datacenter
conf.Address = ctx.config.Read("discovery.consul.address")
conf.Scheme = ctx.config.Read("discovery.consul.scheme")
conf.Token = ctx.config.Read("discovery.consul.token")

// Create the client
client, err := api.NewClient(conf)
if err != nil {
return nil, err
}

// Create and return the discovery provider
return &ConsulDiscovery{client, ctx}, nil
}

// Name returns the name of the discovery provider.
func (c *ConsulDiscovery) Name() string {
return "consul"
}

// Enabled determines if the Consul layer is enabled. This just looks at a
// client option and doesn't do any connection testing, as Consul may or may
// not be available at the time of Nomad's start.
func (c *ConsulDiscovery) Enabled() bool {
return c.config.Read("discovery.consul.enable") == "true"
}

// Register registers a service name into a Consul agent. The agent will then
// sync this definition into the service catalog.
func (c *ConsulDiscovery) Register(allocID, name string, port int) error {
// Build the service definition
serviceID := fmt.Sprintf("%s:%s", name, allocID)
svc := &api.AgentServiceRegistration{
ID: serviceID,
Name: name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets inject the IP from the NetworkResource of the task if any.

Port: port,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as mentioned, we wanted a richer DiscoverConfig struct, so that here can also specify things like tags, and health checks. For example, it would be really nice to support HTTP health checks out of the gate.

}

// Attempt to register
return c.client.Agent().ServiceRegister(svc)
}

// Deregister removes a service from the Consul agent. Anti-entropy will
// then handle deregistering the service from the catalog.
func (c *ConsulDiscovery) Deregister(allocID, name string) error {
// Send the deregister request
serviceID := fmt.Sprintf("%s:%s", name, allocID)
return c.client.Agent().ServiceDeregister(serviceID)
}

// DiscoverName returns the service name in Consul, given the parts of the
// name. This is a simple hyphen-joined string so that we can easily support
// DNS lookups from Consul.
func (c *ConsulDiscovery) DiscoverName(parts []string) string {
return strings.Join(parts, "-")
}
104 changes: 104 additions & 0 deletions client/discovery/consul_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package discovery

import (
"bytes"
"io"
"log"
"os"
"testing"

consulapi "github.com/hashicorp/consul/api"
consultest "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
)

func TestConsulDiscovery_Register(t *testing.T) {
srv := consultest.NewTestServerConfig(t, func(c *consultest.TestServerConfig) {
c.Bootstrap = false
})
defer srv.Stop()

// Make the consul client
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = srv.HTTPAddr
consulClient, err := consulapi.NewClient(consulConfig)
if err != nil {
t.Fatalf("err: %s", err)
}

// Build the context
conf := &config.Config{}
logBuf := new(bytes.Buffer)
logger := log.New(io.MultiWriter(logBuf, os.Stdout), "", log.LstdFlags)
node := &structs.Node{}
ctx := Context{
config: conf,
logger: logger,
node: node,
}

// Create the discovery layer
disc, err := NewConsulDiscovery(ctx)
if err != nil {
t.Fatalf("err: %s", err)
}

// Returns false if not enabled
if disc.Enabled() {
t.Fatalf("should not be enabled")
}

// Enable the discovery layer
conf.Options = map[string]string{
"discovery.consul.enable": "true",
"discovery.consul.address": srv.HTTPAddr,
}
disc, err = NewConsulDiscovery(ctx)
if err != nil {
t.Fatalf("err: %s", err)
}
if !disc.Enabled() {
t.Fatalf("should be enabled")
}

// Should register a service
if err := disc.Register("alloc1", "foobar", 123); err != nil {
t.Fatalf("err: %s", err)
}

// Check that the service exists
services, err := consulClient.Agent().Services()
if err != nil {
t.Fatalf("err: %s", err)
}
for _, svc := range services {
if svc.Service == "foobar" {
if svc.ID != "foobar:alloc1" {
t.Fatalf("bad id: %s", svc.ID)
}
if svc.Port != 123 {
t.Fatalf("bad port: %d", svc.Port)
}
goto REGISTERED
}
}
t.Fatalf("missing service")

REGISTERED:
// Deregister the service
if err := disc.Deregister("alloc1", "foobar"); err != nil {
t.Fatalf("err: %s", err)
}

// Check that the service is gone
services, err = consulClient.Agent().Services()
if err != nil {
t.Fatalf("err: %s", err)
}
for _, svc := range services {
if svc.Service == "foobar" {
t.Fatalf("foobar service should be deregistered")
}
}
}
Loading