diff --git a/app/api_topology.go b/app/api_topology.go index 315f7ce8e4..00d86c85ab 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -114,7 +114,11 @@ func handleWebsocket( var ( previousTopo render.RenderableNodes tick = time.Tick(loop) + wait = make(chan struct{}, 1) ) + rep.WaitOn(wait) + defer rep.UnWait(wait) + for { newTopo := renderer.Render(rep.Report()).Prune() diff := render.TopoDiff(previousTopo, newTopo) @@ -128,9 +132,10 @@ func handleWebsocket( } select { + case <-wait: + case <-tick: case <-quit: return - case <-tick: } } } diff --git a/app/collector.go b/app/collector.go index d97846ab26..394043c523 100644 --- a/app/collector.go +++ b/app/collector.go @@ -11,6 +11,8 @@ import ( // interface for parts of the app, and several experimental components. type Reporter interface { Report() report.Report + WaitOn(chan struct{}) + UnWait(chan struct{}) } // Adder is something that can accept reports. It's a convenient interface for @@ -25,12 +27,45 @@ type Collector struct { mtx sync.Mutex reports []timestampReport window time.Duration + waitableCondition +} + +type waitableCondition struct { + sync.Mutex + waiters map[chan struct{}]struct{} +} + +func (wc *waitableCondition) WaitOn(waiter chan struct{}) { + wc.Lock() + wc.waiters[waiter] = struct{}{} + wc.Unlock() +} + +func (wc *waitableCondition) UnWait(waiter chan struct{}) { + wc.Lock() + delete(wc.waiters, waiter) + wc.Unlock() +} + +func (wc *waitableCondition) Broadcast() { + wc.Lock() + for waiter := range wc.waiters { + // Non-block write to channel + select { + case waiter <- struct{}{}: + default: + } + } + wc.Unlock() } // NewCollector returns a collector ready for use. func NewCollector(window time.Duration) *Collector { return &Collector{ window: window, + waitableCondition: waitableCondition{ + waiters: map[chan struct{}]struct{}{}, + }, } } @@ -42,6 +77,9 @@ func (c *Collector) Add(rpt report.Report) { defer c.mtx.Unlock() c.reports = append(c.reports, timestampReport{now(), rpt}) c.reports = clean(c.reports, c.window) + if rpt.Shortcut { + c.Broadcast() + } } // Report returns a merged report over all added reports. It implements diff --git a/app/mock_reporter_test.go b/app/mock_reporter_test.go index 99439e9a0d..9e27941c19 100644 --- a/app/mock_reporter_test.go +++ b/app/mock_reporter_test.go @@ -9,5 +9,6 @@ import ( type StaticReport struct{} func (s StaticReport) Report() report.Report { return fixture.Report } - -func (s StaticReport) Add(report.Report) {} +func (s StaticReport) Add(report.Report) {} +func (s StaticReport) WaitOn(chan struct{}) {} +func (s StaticReport) UnWait(chan struct{}) {} diff --git a/probe/docker/registry.go b/probe/docker/registry.go index 3c21bd4e76..e8ad506efe 100644 --- a/probe/docker/registry.go +++ b/probe/docker/registry.go @@ -31,14 +31,19 @@ type Registry interface { LockedPIDLookup(f func(func(int) Container)) WalkContainers(f func(Container)) WalkImages(f func(*docker_client.APIImages)) + WatchContainerUpdates(ContainerUpdateWatcher) } +// ContainerUpdateWatcher is the type of functions that get called when containers are updated. +type ContainerUpdateWatcher func(c Container) + type registry struct { sync.RWMutex quit chan chan struct{} interval time.Duration client Client + watchers []ContainerUpdateWatcher containers map[string]Container containersByPID map[int]Container images map[string]*docker_client.APIImages @@ -91,6 +96,14 @@ func (r *registry) Stop() { <-ch } +// WatchContainerUpdates registers a callback to be called +// whenever a container is updated. +func (r *registry) WatchContainerUpdates(f ContainerUpdateWatcher) { + r.Lock() + defer r.Unlock() + r.watchers = append(r.watchers, f) +} + func (r *registry) loop() { for { // NB listenForEvents blocks. @@ -244,6 +257,11 @@ func (r *registry) updateContainerState(containerID string) { c.UpdateState(dockerContainer) } + // Trigger anyone watching for updates + for _, f := range r.watchers { + f(c) + } + // And finally, ensure we gather stats for it if dockerContainer.State.Running { if err := c.StartGatheringStats(); err != nil { diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index 8f7561efc0..6d1e088daf 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -1,10 +1,12 @@ package docker import ( + "log" "net" docker_client "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/scope/probe" "github.com/weaveworks/scope/report" ) @@ -18,14 +20,33 @@ const ( type Reporter struct { registry Registry hostID string + probe *probe.Probe } // NewReporter makes a new Reporter -func NewReporter(registry Registry, hostID string) *Reporter { - return &Reporter{ +func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter { + reporter := &Reporter{ registry: registry, hostID: hostID, + probe: probe, } + registry.WatchContainerUpdates(reporter.ContainerUpdated) + return reporter +} + +// ContainerUpdated should be called whenever a container is updated. +func (r *Reporter) ContainerUpdated(c Container) { + localAddrs, err := report.LocalAddresses() + if err != nil { + log.Printf("Error getting local address: %v", err) + return + } + + // Publish a 'short cut' report container just this container + rpt := report.MakeReport() + rpt.Shortcut = true + rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs)) + r.probe.Publish(rpt) } // Report generates a Report containing Container and ContainerImage topologies diff --git a/probe/docker/reporter_test.go b/probe/docker/reporter_test.go index cbabba81bb..598773b693 100644 --- a/probe/docker/reporter_test.go +++ b/probe/docker/reporter_test.go @@ -36,6 +36,8 @@ func (r *mockRegistry) WalkImages(f func(*client.APIImages)) { } } +func (r *mockRegistry) WatchContainerUpdates(_ docker.ContainerUpdateWatcher) {} + var ( mockRegistryInstance = &mockRegistry{ containersByPID: map[int]docker.Container{ @@ -95,7 +97,7 @@ func TestReporter(t *testing.T) { Controls: report.Controls{}, } - reporter := docker.NewReporter(mockRegistryInstance, "") + reporter := docker.NewReporter(mockRegistryInstance, "", nil) have, _ := reporter.Report() if !reflect.DeepEqual(want, have) { t.Errorf("%s", test.Diff(want, have)) diff --git a/probe/probe.go b/probe/probe.go index d429e7cdfb..a0700ba2ff 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -9,10 +9,14 @@ import ( "github.com/weaveworks/scope/xfer" ) +const ( + reportBufferSize = 16 +) + // Probe sits there, generating and publishing reports. type Probe struct { spyInterval, publishInterval time.Duration - publisher xfer.Publisher + publisher *xfer.ReportPublisher tickers []Ticker reporters []Reporter @@ -20,7 +24,9 @@ type Probe struct { quit chan struct{} done sync.WaitGroup - rpt syncReport + + spiedReports chan report.Report + shortcutReports chan report.Report } // Tagger tags nodes with value-add node metadata. @@ -45,10 +51,11 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) * result := &Probe{ spyInterval: spyInterval, publishInterval: publishInterval, - publisher: publisher, + publisher: xfer.NewReportPublisher(publisher), quit: make(chan struct{}), + spiedReports: make(chan report.Report, reportBufferSize), + shortcutReports: make(chan report.Report, reportBufferSize), } - result.rpt.swap(report.MakeReport()) return result } @@ -80,6 +87,12 @@ func (p *Probe) Stop() { p.done.Wait() } +// Publish will queue a report for immediate publication, +// bypassing the spy tick +func (p *Probe) Publish(rpt report.Report) { + p.shortcutReports <- rpt +} + func (p *Probe) spyLoop() { defer p.done.Done() spyTick := time.Tick(p.spyInterval) @@ -94,10 +107,9 @@ func (p *Probe) spyLoop() { } } - localReport := p.rpt.copy() - localReport = localReport.Merge(p.report()) - localReport = p.tag(localReport) - p.rpt.swap(localReport) + rpt := p.report() + rpt = p.tag(rpt) + p.spiedReports <- rpt if took := time.Since(start); took > p.spyInterval { log.Printf("report generation took too long (%s)", took) @@ -140,20 +152,33 @@ func (p *Probe) tag(r report.Report) report.Report { return r } +func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) { +ForLoop: + for { + select { + case r := <-rs: + rpt = rpt.Merge(r) + default: + break ForLoop + } + } + + if err := p.publisher.Publish(rpt); err != nil { + log.Printf("publish: %v", err) + } +} + func (p *Probe) publishLoop() { defer p.done.Done() - var ( - pubTick = time.Tick(p.publishInterval) - rptPub = xfer.NewReportPublisher(p.publisher) - ) + pubTick := time.Tick(p.publishInterval) for { select { case <-pubTick: - localReport := p.rpt.swap(report.MakeReport()) - if err := rptPub.Publish(localReport); err != nil { - log.Printf("publish: %v", err) - } + p.drainAndPublish(report.MakeReport(), p.spiedReports) + + case rpt := <-p.shortcutReports: + p.drainAndPublish(rpt, p.shortcutReports) case <-p.quit: return diff --git a/probe/sync_report.go b/probe/sync_report.go deleted file mode 100644 index 830214920c..0000000000 --- a/probe/sync_report.go +++ /dev/null @@ -1,26 +0,0 @@ -package probe - -import ( - "sync" - - "github.com/weaveworks/scope/report" -) - -type syncReport struct { - mtx sync.RWMutex - rpt report.Report -} - -func (r *syncReport) swap(other report.Report) report.Report { - r.mtx.Lock() - defer r.mtx.Unlock() - old := r.rpt - r.rpt = other - return old -} - -func (r *syncReport) copy() report.Report { - r.mtx.RLock() - defer r.mtx.RUnlock() - return r.rpt.Copy() -} diff --git a/prog/probe/main.go b/prog/probe/main.go index 4b4e19400f..6e2d242228 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -134,7 +134,7 @@ func main() { if registry, err := docker.NewRegistry(*dockerInterval); err == nil { defer registry.Stop() p.AddTagger(docker.NewTagger(registry, processCache)) - p.AddReporter(docker.NewReporter(registry, hostID)) + p.AddReporter(docker.NewReporter(registry, hostID, p)) } else { log.Printf("Docker: failed to start registry: %v", err) } diff --git a/report/report.go b/report/report.go index aca0ad2fde..88876ea6d8 100644 --- a/report/report.go +++ b/report/report.go @@ -63,6 +63,10 @@ type Report struct { // such as in the app, we expect the component to overwrite the window // before serving it to consumers. Window time.Duration + + // Shortcut reports should be propogated to the UI as quickly as possible, + // bypassing the usual spy interval, publish interval and app ws interval. + Shortcut bool } // MakeReport makes a clean report, ready to Merge() other reports into.