Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advertise driver-specific addresses #2709

Merged
merged 21 commits into from
Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ func TestJobs_Canonicalize(t *testing.T) {
},
Services: []*Service{
{
Name: "global-redis-check",
Tags: []string{"global", "cache"},
PortLabel: "db",
Name: "global-redis-check",
Tags: []string{"global", "cache"},
PortLabel: "db",
AddressMode: "auto",
Checks: []ServiceCheck{
{
Name: "alive",
Expand Down
16 changes: 11 additions & 5 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,23 @@ type ServiceCheck struct {

// The Service model represents a Consul service definition
type Service struct {
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
Checks []ServiceCheck
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canonicalize below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}

func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.Name == "" {
s.Name = fmt.Sprintf("%s-%s-%s", *job.Name, *tg.Name, t.Name)
}

// Default to AddressModeAuto
if s.AddressMode == "" {
s.AddressMode = "auto"
}
}

// EphemeralDisk is an ephemeral disk object
Expand Down
5 changes: 3 additions & 2 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package client

import (
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
}
19 changes: 11 additions & 8 deletions client/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -18,9 +19,10 @@ type mockConsulOp struct {
allocID string
task *structs.Task
exec driver.ScriptExecutor
net *cstructs.DriverNetwork
}

func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor) mockConsulOp {
func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) mockConsulOp {
if op != "add" && op != "remove" && op != "update" {
panic(fmt.Errorf("invalid consul op: %s", op))
}
Expand All @@ -29,6 +31,7 @@ func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptE
allocID: allocID,
task: task,
exec: exec,
net: net,
}
}

Expand All @@ -52,25 +55,25 @@ func newMockConsulServiceClient() *mockConsulServiceClient {
return &m
}

func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor) error {
func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec)
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec))
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash())
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net))
return nil
}

func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error {
func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec)
m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec))
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash())
m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec, net))
return nil
}

func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name)
m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil))
m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil, nil))
}
68 changes: 63 additions & 5 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*Prestart
return nil, err
}

// Set state needed by Start()
// Set state needed by Start
d.driverConfig = driverConfig

// Initialize docker API clients
Expand All @@ -485,15 +485,21 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*Prestart
if err != nil {
return nil, err
}
d.imageID = id

resp := NewPrestartResponse()
resp.CreatedResources.Add(dockerImageResKey, id)
resp.PortMap = d.driverConfig.PortMap
d.imageID = id

// Return the PortMap if it's set
if len(driverConfig.PortMap) > 0 {
resp.Network = &cstructs.DriverNetwork{
PortMap: driverConfig.PortMap,
}
}
return resp, nil
}

func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse, error) {

pluginLogFile := filepath.Join(ctx.TaskDir.Dir, "executor.out")
executorConfig := &dstructs.ExecutorConfig{
Expand Down Expand Up @@ -560,6 +566,15 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
pluginClient.Kill()
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
}
// InspectContainer to get all of the container metadata as
// much of the metadata (eg networking) isn't populated until
// the container is started
if container, err = client.InspectContainer(container.ID); err != nil {
err = fmt.Errorf("failed to inspect started container %s: %s", container.ID, err)
d.logger.Printf("[ERR] driver.docker: %v", err)
pluginClient.Kill()
return nil, structs.NewRecoverableError(err, true)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
} else {
d.logger.Printf("[DEBUG] driver.docker: re-attaching to container %s with status %q",
Expand All @@ -585,7 +600,50 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
}
go h.collectStats()
go h.run()
return h, nil

// Detect container address
ip, autoUse := d.detectIP(container)

// Create a response with the driver handle and container network metadata
resp := &StartResponse{
Handle: h,
Network: &cstructs.DriverNetwork{
PortMap: d.driverConfig.PortMap,
IP: ip,
AutoAdvertise: autoUse,
},
}
return resp, nil
}

func (d *DockerDriver) detectIP(c *docker.Container) (string, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment on what this does and the fields it returns

if c.NetworkSettings == nil {
// This should only happen if there's been a coding error (such
// as not calling InspetContainer after CreateContainer). Code
// defensively in case the Docker API changes subtly.
d.logger.Printf("[ERROR] driver.docker: no network settings for container %s", c.ID)
return "", false
}
ip, ipName := "", ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank spaces between blocks. Here and line 643

auto := false
for name, net := range c.NetworkSettings.Networks {
if net.IPAddress == "" {
// Ignore networks without an IP address
continue
}
ip = net.IPAddress
ipName = name

// Don't auto-advertise bridge IPs
if name != "bridge" {
auto = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just break?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

break
}
if n := len(c.NetworkSettings.Networks); n > 1 {
d.logger.Printf("[WARN] driver.docker: multiple (%d) Docker networks for container %q but Nomad only supports 1: choosing %q", n, c.ID, ipName)
}
return ip, auto
}

func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error {
Expand Down
Loading