Skip to content

Commit

Permalink
Shortcut app -> UI ws push for certain reports.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Nov 10, 2015
1 parent 92b2479 commit 47ef1e0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 3 deletions.
7 changes: 6 additions & 1 deletion app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func handleWebsocket(
var (
previousTopo render.RenderableNodes
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
)
rep.WaitOn(wait)
defer rep.UnWait(wait)

for {
newTopo := renderer.Render(rep.Report()).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
Expand All @@ -128,9 +132,10 @@ func handleWebsocket(
}

select {
case <-wait:
case <-tick:
case <-quit:
return
case <-tick:
}
}
}
38 changes: 38 additions & 0 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report() report.Report
WaitOn(chan struct{})
UnWait(chan struct{})
}

// Adder is something that can accept reports. It's a convenient interface for
Expand All @@ -25,12 +27,45 @@ type Collector struct {
mtx sync.Mutex
reports []timestampReport
window time.Duration
waitableCondition
}

type waitableCondition struct {
sync.Mutex
waiters map[chan struct{}]struct{}
}

func (wc *waitableCondition) WaitOn(waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}

func (wc *waitableCondition) UnWait(waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
}

func (wc *waitableCondition) Broadcast() {
wc.Lock()
for waiter := range wc.waiters {
// Non-block write to channel
select {
case waiter <- struct{}{}:
default:
}
}
wc.Unlock()
}

// NewCollector returns a collector ready for use.
func NewCollector(window time.Duration) *Collector {
return &Collector{
window: window,
waitableCondition: waitableCondition{
waiters: map[chan struct{}]struct{}{},
},
}
}

Expand All @@ -42,6 +77,9 @@ func (c *Collector) Add(rpt report.Report) {
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
c.reports = clean(c.reports, c.window)
if rpt.Shortcut {
c.Broadcast()
}
}

// Report returns a merged report over all added reports. It implements
Expand Down
5 changes: 3 additions & 2 deletions app/mock_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import (
type StaticReport struct{}

func (s StaticReport) Report() report.Report { return fixture.Report }

func (s StaticReport) Add(report.Report) {}
func (s StaticReport) Add(report.Report) {}
func (s StaticReport) WaitOn(chan struct{}) {}
func (s StaticReport) UnWait(chan struct{}) {}
1 change: 1 addition & 0 deletions probe/docker/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (r *Reporter) ContainerUpdated(c Container) {

// Publish a 'short cut' report container just this container
rpt := report.MakeReport()
rpt.Shortcut = true
rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs))
r.probe.Publish(rpt)
}
Expand Down
4 changes: 4 additions & 0 deletions report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type Report struct {
// such as in the app, we expect the component to overwrite the window
// before serving it to consumers.
Window time.Duration

// Shortcut reports should be propogated to the UI as quickly as possible,
// bypassing the usual spy interval, publish interval and app ws interval.
Shortcut bool
}

// MakeReport makes a clean report, ready to Merge() other reports into.
Expand Down

0 comments on commit 47ef1e0

Please sign in to comment.