Skip to content

Commit

Permalink
Merge pull request #1363 from weaveworks/1072-docker-rm
Browse files Browse the repository at this point in the history
Immediately remove deleted containers from the UI.
  • Loading branch information
paulbellamy committed Apr 22, 2016
2 parents 901f46c + 66868eb commit a4e68b9
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 69 deletions.
30 changes: 18 additions & 12 deletions probe/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ const (
CPUUsageInKernelmode = "docker_cpu_usage_in_kernelmode"
CPUSystemCPUUsage = "docker_cpu_system_cpu_usage"

StateCreated = "created"
StateDead = "dead"
StateExited = "exited"
StatePaused = "paused"
StateRestarting = "restarting"
StateRunning = "running"

NetworkModeHost = "host"

LabelPrefix = "docker_label_"
Expand All @@ -71,6 +64,18 @@ const (
stopTimeout = 10
)

// These 'constants' are used for node states.
// We need to take pointers to them, so they are vars...
var (
StateCreated = "created"
StateDead = "dead"
StateExited = "exited"
StatePaused = "paused"
StateRestarting = "restarting"
StateRunning = "running"
StateDeleted = "deleted"
)

// Exported for testing
var (
DialStub = net.Dial
Expand All @@ -95,7 +100,7 @@ type Container interface {
Image() string
PID() int
Hostname() string
GetNode(string, []net.IP) report.Node
GetNode([]net.IP) report.Node
State() string
StateString() string
HasTTY() bool
Expand All @@ -111,12 +116,14 @@ type container struct {
latestStats docker.Stats
pendingStats [20]docker.Stats
numPending int
hostID string
}

// NewContainer creates a new Container
func NewContainer(c *docker.Container) Container {
func NewContainer(c *docker.Container, hostID string) Container {
return &container{
container: c,
hostID: hostID,
}
}

Expand Down Expand Up @@ -338,18 +345,17 @@ func (c *container) env() map[string]string {
return result
}

func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node {
func (c *container) GetNode(localAddrs []net.IP) report.Node {
c.RLock()
defer c.RUnlock()

ips := c.container.NetworkSettings.SecondaryIPAddresses
if c.container.NetworkSettings.IPAddress != "" {
ips = append(ips, c.container.NetworkSettings.IPAddress)
}
// Treat all Docker IPs as local scoped.
ipsWithScopes := []string{}
for _, ip := range ips {
ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(hostID, ip))
ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(c.hostID, ip))
}

result := report.MakeNodeWith(report.MakeContainerNodeID(c.ID()), map[string]string{
Expand Down
7 changes: 4 additions & 3 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func TestContainer(t *testing.T) {
return connection
}

c := docker.NewContainer(container1)
const hostID = "scope"
c := docker.NewContainer(container1, hostID)
err := c.StartGatheringStats()
if err != nil {
t.Errorf("%v", err)
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestContainer(t *testing.T) {
)

test.Poll(t, 100*time.Millisecond, want, func() interface{} {
node := c.GetNode("scope", []net.IP{})
node := c.GetNode([]net.IP{})
node.Latest.ForEach(func(k, v string) {
if v == "0" || v == "" {
node.Latest = node.Latest.Delete(k)
Expand All @@ -116,7 +117,7 @@ func TestContainer(t *testing.T) {
if c.PID() != 2 {
t.Errorf("%d != 2", c.PID())
}
if have := docker.ExtractContainerIPs(c.GetNode("", []net.IP{})); !reflect.DeepEqual(have, []string{"1.2.3.4"}) {
if have := docker.ExtractContainerIPs(c.GetNode([]net.IP{})); !reflect.DeepEqual(have, []string{"1.2.3.4"}) {
t.Errorf("%v != %v", have, []string{"1.2.3.4"})
}
}
4 changes: 2 additions & 2 deletions probe/docker/controls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestControls(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, false)
registry, _ := docker.NewRegistry(10*time.Second, nil, false, "")
defer registry.Stop()

for _, tc := range []struct{ command, result string }{
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestPipes(t *testing.T) {

mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, false)
registry, _ := docker.NewRegistry(10*time.Second, nil, false, "")
defer registry.Stop()

test.Poll(t, 100*time.Millisecond, true, func() interface{} {
Expand Down
43 changes: 35 additions & 8 deletions probe/docker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
docker_client "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/report"
)

// Consts exported for testing.
Expand Down Expand Up @@ -39,7 +40,7 @@ type Registry interface {
}

// ContainerUpdateWatcher is the type of functions that get called when containers are updated.
type ContainerUpdateWatcher func(c Container)
type ContainerUpdateWatcher func(report.Node)

type registry struct {
sync.RWMutex
Expand All @@ -48,6 +49,7 @@ type registry struct {
collectStats bool
client Client
pipes controls.PipeClient
hostID string

watchers []ContainerUpdateWatcher
containers map[string]Container
Expand Down Expand Up @@ -78,7 +80,7 @@ func newDockerClient(endpoint string) (Client, error) {
}

// NewRegistry returns a usable Registry. Don't forget to Stop it.
func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool) (Registry, error) {
func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool, hostID string) (Registry, error) {
client, err := NewDockerClientStub(endpoint)
if err != nil {
return nil, err
Expand All @@ -93,6 +95,7 @@ func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats
pipes: pipes,
interval: interval,
collectStats: collectStats,
hostID: hostID,
quit: make(chan chan struct{}),
}

Expand Down Expand Up @@ -214,7 +217,7 @@ func (r *registry) updateContainers() error {
}

for _, apiContainer := range apiContainers {
r.updateContainerState(apiContainer.ID)
r.updateContainerState(apiContainer.ID, nil)
}

return nil
Expand All @@ -240,11 +243,20 @@ func (r *registry) updateImages() error {
func (r *registry) handleEvent(event *docker_client.APIEvents) {
switch event.Status {
case CreateEvent, RenameEvent, StartEvent, DieEvent, DestroyEvent, PauseEvent, UnpauseEvent:
r.updateContainerState(event.ID)
r.updateContainerState(event.ID, stateAfterEvent(event.Status))
}
}

func (r *registry) updateContainerState(containerID string) {
func stateAfterEvent(event string) *string {
switch event {
case DestroyEvent:
return &StateDeleted
default:
return nil
}
}

func (r *registry) updateContainerState(containerID string, intendedState *string) {
r.Lock()
defer r.Unlock()

Expand All @@ -267,13 +279,24 @@ func (r *registry) updateContainerState(containerID string) {
if r.collectStats {
container.StopGatheringStats()
}

if intendedState != nil {
node := report.MakeNodeWith(report.MakeContainerNodeID(containerID), map[string]string{
ContainerID: containerID,
ContainerState: *intendedState,
})
// Trigger anyone watching for updates
for _, f := range r.watchers {
f(node)
}
}
return
}

// Container exists, ensure we have it
c, ok := r.containers[containerID]
if !ok {
c = NewContainerStub(dockerContainer)
c = NewContainerStub(dockerContainer, r.hostID)
r.containers[containerID] = c
} else {
// potentially remove existing pid mapping.
Expand All @@ -287,8 +310,12 @@ func (r *registry) updateContainerState(containerID string) {
}

// Trigger anyone watching for updates
for _, f := range r.watchers {
f(c)
localAddrs, err := report.LocalAddresses()
if err != nil {
node := c.GetNode(localAddrs)
for _, f := range r.watchers {
f(node)
}
}

// And finally, ensure we gather stats for it
Expand Down
66 changes: 61 additions & 5 deletions probe/docker/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (

client "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/reflect"
)

type mockContainer struct {
Expand Down Expand Up @@ -52,7 +54,7 @@ func (c *mockContainer) StartGatheringStats() error {

func (c *mockContainer) StopGatheringStats() {}

func (c *mockContainer) GetNode(_ string, _ []net.IP) report.Node {
func (c *mockContainer) GetNode(_ []net.IP) report.Node {
return report.MakeNodeWith(report.MakeContainerNodeID(c.c.ID), map[string]string{
docker.ContainerID: c.c.ID,
docker.ContainerName: c.c.Name,
Expand Down Expand Up @@ -237,7 +239,7 @@ func setupStubs(mdc *mockDockerClient, f func()) {
return mdc, nil
}

docker.NewContainerStub = func(c *client.Container) docker.Container {
docker.NewContainerStub = func(c *client.Container, _ string) docker.Container {
return &mockContainer{c}
}

Expand Down Expand Up @@ -270,7 +272,7 @@ func allImages(r docker.Registry) []*client.APIImages {
func TestRegistry(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true)
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
defer registry.Stop()
runtime.Gosched()

Expand All @@ -293,7 +295,7 @@ func TestRegistry(t *testing.T) {
func TestLookupByPID(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true)
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
defer registry.Stop()

want := docker.Container(&mockContainer{container1})
Expand All @@ -310,7 +312,7 @@ func TestLookupByPID(t *testing.T) {
func TestRegistryEvents(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true)
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
defer registry.Stop()
runtime.Gosched()

Expand Down Expand Up @@ -377,3 +379,57 @@ func TestRegistryEvents(t *testing.T) {
}
})
}

func TestRegistryDelete(t *testing.T) {
mtime.NowForce(mtime.Now())
defer mtime.NowReset()

mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil, true, "")
defer registry.Stop()
runtime.Gosched()

// Collect all the events.
mtx := sync.Mutex{}
nodes := []report.Node{}
registry.WatchContainerUpdates(func(n report.Node) {
mtx.Lock()
defer mtx.Unlock()
nodes = append(nodes, n)
})

check := func(want []docker.Container) {
test.Poll(t, 100*time.Millisecond, want, func() interface{} {
return allContainers(registry)
})
}

want := []docker.Container{&mockContainer{container1}}
check(want)

{
mdc.Lock()
mdc.apiContainers = []client.APIContainers{}
delete(mdc.containers, "ping")
mdc.Unlock()
mdc.send(&client.APIEvents{Status: docker.DestroyEvent, ID: "ping"})
runtime.Gosched()

check([]docker.Container{})

mtx.Lock()
want := []report.Node{
report.MakeNodeWith(report.MakeContainerNodeID("ping"), map[string]string{
docker.ContainerID: "ping",
docker.ContainerState: "deleted",
}),
}
if !reflect.DeepEqual(want, nodes) {
t.Errorf("Didn't get right container updates: %v", test.Diff(want, nodes))
}
nodes = []report.Node{}
mtx.Unlock()
}
})
}
13 changes: 3 additions & 10 deletions probe/docker/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"net"
"strings"

log "github.com/Sirupsen/logrus"
docker_client "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/probe"
Expand Down Expand Up @@ -75,17 +74,11 @@ func NewReporter(registry Registry, hostID string, probeID string, probe *probe.
func (Reporter) Name() string { return "Docker" }

// ContainerUpdated should be called whenever a container is updated.
func (r *Reporter) ContainerUpdated(c Container) {
localAddrs, err := report.LocalAddresses()
if err != nil {
log.Errorf("Error getting local address: %v", err)
return
}

func (r *Reporter) ContainerUpdated(n report.Node) {
// Publish a 'short cut' report container just this container
rpt := report.MakeReport()
rpt.Shortcut = true
rpt.Container.AddNode(c.GetNode(r.hostID, localAddrs))
rpt.Container.AddNode(n)
r.probe.Publish(rpt)
}

Expand Down Expand Up @@ -146,7 +139,7 @@ func (r *Reporter) containerTopology(localAddrs []net.IP) report.Topology {
metadata := map[string]string{report.ControlProbeID: r.probeID}

r.registry.WalkContainers(func(c Container) {
result.AddNode(c.GetNode(r.hostID, localAddrs).WithLatests(metadata))
result.AddNode(c.GetNode(localAddrs).WithLatests(metadata))
})

return result
Expand Down
Loading

0 comments on commit a4e68b9

Please sign in to comment.