diff --git a/app/api_topology.go b/app/api_topology.go index 315f7ce8e4..00d86c85ab 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -114,7 +114,11 @@ func handleWebsocket( var ( previousTopo render.RenderableNodes tick = time.Tick(loop) + wait = make(chan struct{}, 1) ) + rep.WaitOn(wait) + defer rep.UnWait(wait) + for { newTopo := renderer.Render(rep.Report()).Prune() diff := render.TopoDiff(previousTopo, newTopo) @@ -128,9 +132,10 @@ func handleWebsocket( } select { + case <-wait: + case <-tick: case <-quit: return - case <-tick: } } } diff --git a/app/collector.go b/app/collector.go index d97846ab26..394043c523 100644 --- a/app/collector.go +++ b/app/collector.go @@ -11,6 +11,8 @@ import ( // interface for parts of the app, and several experimental components. type Reporter interface { Report() report.Report + WaitOn(chan struct{}) + UnWait(chan struct{}) } // Adder is something that can accept reports. It's a convenient interface for @@ -25,12 +27,45 @@ type Collector struct { mtx sync.Mutex reports []timestampReport window time.Duration + waitableCondition +} + +type waitableCondition struct { + sync.Mutex + waiters map[chan struct{}]struct{} +} + +func (wc *waitableCondition) WaitOn(waiter chan struct{}) { + wc.Lock() + wc.waiters[waiter] = struct{}{} + wc.Unlock() +} + +func (wc *waitableCondition) UnWait(waiter chan struct{}) { + wc.Lock() + delete(wc.waiters, waiter) + wc.Unlock() +} + +func (wc *waitableCondition) Broadcast() { + wc.Lock() + for waiter := range wc.waiters { + // Non-block write to channel + select { + case waiter <- struct{}{}: + default: + } + } + wc.Unlock() } // NewCollector returns a collector ready for use. func NewCollector(window time.Duration) *Collector { return &Collector{ window: window, + waitableCondition: waitableCondition{ + waiters: map[chan struct{}]struct{}{}, + }, } } @@ -42,6 +77,9 @@ func (c *Collector) Add(rpt report.Report) { defer c.mtx.Unlock() c.reports = append(c.reports, timestampReport{now(), rpt}) c.reports = clean(c.reports, c.window) + if rpt.Shortcut { + c.Broadcast() + } } // Report returns a merged report over all added reports. It implements diff --git a/app/mock_reporter_test.go b/app/mock_reporter_test.go index 99439e9a0d..9e27941c19 100644 --- a/app/mock_reporter_test.go +++ b/app/mock_reporter_test.go @@ -9,5 +9,6 @@ import ( type StaticReport struct{} func (s StaticReport) Report() report.Report { return fixture.Report } - -func (s StaticReport) Add(report.Report) {} +func (s StaticReport) Add(report.Report) {} +func (s StaticReport) WaitOn(chan struct{}) {} +func (s StaticReport) UnWait(chan struct{}) {} diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index a2c10c19e6..afd8e565a0 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -42,6 +42,7 @@ func (r *Reporter) ContainerUpdated(c Container) { // Publish a 'short cut' report container just this container rpt := report.MakeReport() + rpt.Shortcut = true rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs)) r.probe.Publish(rpt) } diff --git a/report/report.go b/report/report.go index aca0ad2fde..88876ea6d8 100644 --- a/report/report.go +++ b/report/report.go @@ -63,6 +63,10 @@ type Report struct { // such as in the app, we expect the component to overwrite the window // before serving it to consumers. Window time.Duration + + // Shortcut reports should be propogated to the UI as quickly as possible, + // bypassing the usual spy interval, publish interval and app ws interval. + Shortcut bool } // MakeReport makes a clean report, ready to Merge() other reports into.