diff --git a/probe/docker/registry.go b/probe/docker/registry.go index 3c21bd4e76..e8ad506efe 100644 --- a/probe/docker/registry.go +++ b/probe/docker/registry.go @@ -31,14 +31,19 @@ type Registry interface { LockedPIDLookup(f func(func(int) Container)) WalkContainers(f func(Container)) WalkImages(f func(*docker_client.APIImages)) + WatchContainerUpdates(ContainerUpdateWatcher) } +// ContainerUpdateWatcher is the type of functions that get called when containers are updated. +type ContainerUpdateWatcher func(c Container) + type registry struct { sync.RWMutex quit chan chan struct{} interval time.Duration client Client + watchers []ContainerUpdateWatcher containers map[string]Container containersByPID map[int]Container images map[string]*docker_client.APIImages @@ -91,6 +96,14 @@ func (r *registry) Stop() { <-ch } +// WatchContainerUpdates registers a callback to be called +// whenever a container is updated. +func (r *registry) WatchContainerUpdates(f ContainerUpdateWatcher) { + r.Lock() + defer r.Unlock() + r.watchers = append(r.watchers, f) +} + func (r *registry) loop() { for { // NB listenForEvents blocks. @@ -244,6 +257,11 @@ func (r *registry) updateContainerState(containerID string) { c.UpdateState(dockerContainer) } + // Trigger anyone watching for updates + for _, f := range r.watchers { + f(c) + } + // And finally, ensure we gather stats for it if dockerContainer.State.Running { if err := c.StartGatheringStats(); err != nil { diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index 8f7561efc0..a2c10c19e6 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -1,10 +1,12 @@ package docker import ( + "log" "net" docker_client "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/scope/probe" "github.com/weaveworks/scope/report" ) @@ -18,16 +20,32 @@ const ( type Reporter struct { registry Registry hostID string + probe *probe.Probe } // NewReporter makes a new Reporter -func NewReporter(registry Registry, hostID string) *Reporter { +func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter { return &Reporter{ registry: registry, hostID: hostID, + probe: probe, } } +// ContainerUpdated should be called whenever a container is updated. +func (r *Reporter) ContainerUpdated(c Container) { + localAddrs, err := report.LocalAddresses() + if err != nil { + log.Printf("Error getting local address: %v", err) + return + } + + // Publish a 'short cut' report container just this container + rpt := report.MakeReport() + rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs)) + r.probe.Publish(rpt) +} + // Report generates a Report containing Container and ContainerImage topologies func (r *Reporter) Report() (report.Report, error) { localAddrs, err := report.LocalAddresses() diff --git a/probe/docker/reporter_test.go b/probe/docker/reporter_test.go index cbabba81bb..598773b693 100644 --- a/probe/docker/reporter_test.go +++ b/probe/docker/reporter_test.go @@ -36,6 +36,8 @@ func (r *mockRegistry) WalkImages(f func(*client.APIImages)) { } } +func (r *mockRegistry) WatchContainerUpdates(_ docker.ContainerUpdateWatcher) {} + var ( mockRegistryInstance = &mockRegistry{ containersByPID: map[int]docker.Container{ @@ -95,7 +97,7 @@ func TestReporter(t *testing.T) { Controls: report.Controls{}, } - reporter := docker.NewReporter(mockRegistryInstance, "") + reporter := docker.NewReporter(mockRegistryInstance, "", nil) have, _ := reporter.Report() if !reflect.DeepEqual(want, have) { t.Errorf("%s", test.Diff(want, have)) diff --git a/probe/probe.go b/probe/probe.go index d429e7cdfb..57edba6450 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -9,6 +9,10 @@ import ( "github.com/weaveworks/scope/xfer" ) +const ( + reportBufferSize = 16 +) + // Probe sits there, generating and publishing reports. type Probe struct { spyInterval, publishInterval time.Duration @@ -20,7 +24,9 @@ type Probe struct { quit chan struct{} done sync.WaitGroup - rpt syncReport + + spiedReports chan report.Report + shortcutReports chan report.Report } // Tagger tags nodes with value-add node metadata. @@ -47,8 +53,9 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) * publishInterval: publishInterval, publisher: publisher, quit: make(chan struct{}), + spiedReports: make(chan report.Report, reportBufferSize), + shortcutReports: make(chan report.Report, reportBufferSize), } - result.rpt.swap(report.MakeReport()) return result } @@ -80,6 +87,12 @@ func (p *Probe) Stop() { p.done.Wait() } +// Publish will queue a report for immediate publication, +// bypassing the spy tick +func (p *Probe) Publish(rpt report.Report) { + p.shortcutReports <- rpt +} + func (p *Probe) spyLoop() { defer p.done.Done() spyTick := time.Tick(p.spyInterval) @@ -94,10 +107,9 @@ func (p *Probe) spyLoop() { } } - localReport := p.rpt.copy() - localReport = localReport.Merge(p.report()) - localReport = p.tag(localReport) - p.rpt.swap(localReport) + rpt := p.report() + rpt = p.tag(rpt) + p.spiedReports <- rpt if took := time.Since(start); took > p.spyInterval { log.Printf("report generation took too long (%s)", took) @@ -140,6 +152,17 @@ func (p *Probe) tag(r report.Report) report.Report { return r } +func condense(rpt report.Report, rs chan report.Report) report.Report { + for { + select { + case r := <-rs: + rpt = rpt.Merge(r) + default: + return rpt + } + } +} + func (p *Probe) publishLoop() { defer p.done.Done() var ( @@ -150,8 +173,14 @@ func (p *Probe) publishLoop() { for { select { case <-pubTick: - localReport := p.rpt.swap(report.MakeReport()) - if err := rptPub.Publish(localReport); err != nil { + rpt := condense(report.MakeReport(), p.spiedReports) + if err := rptPub.Publish(rpt); err != nil { + log.Printf("publish: %v", err) + } + + case rpt := <-p.shortcutReports: + rpt = condense(rpt, p.shortcutReports) + if err := rptPub.Publish(rpt); err != nil { log.Printf("publish: %v", err) } diff --git a/probe/sync_report.go b/probe/sync_report.go deleted file mode 100644 index 830214920c..0000000000 --- a/probe/sync_report.go +++ /dev/null @@ -1,26 +0,0 @@ -package probe - -import ( - "sync" - - "github.com/weaveworks/scope/report" -) - -type syncReport struct { - mtx sync.RWMutex - rpt report.Report -} - -func (r *syncReport) swap(other report.Report) report.Report { - r.mtx.Lock() - defer r.mtx.Unlock() - old := r.rpt - r.rpt = other - return old -} - -func (r *syncReport) copy() report.Report { - r.mtx.RLock() - defer r.mtx.RUnlock() - return r.rpt.Copy() -} diff --git a/prog/probe/main.go b/prog/probe/main.go index 4b4e19400f..536e0e572c 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -134,7 +134,10 @@ func main() { if registry, err := docker.NewRegistry(*dockerInterval); err == nil { defer registry.Stop() p.AddTagger(docker.NewTagger(registry, processCache)) - p.AddReporter(docker.NewReporter(registry, hostID)) + + reporter := docker.NewReporter(registry, hostID, p) + registry.WatchContainerUpdates(reporter.ContainerUpdated) + p.AddReporter(reporter) } else { log.Printf("Docker: failed to start registry: %v", err) }