Skip to content

Commit

Permalink
Merge pull request #640 from weaveworks/629-container-state-push
Browse files Browse the repository at this point in the history
Push mini-reports when container states changes
  • Loading branch information
tomwilkie committed Nov 11, 2015
2 parents f274c50 + c610bf0 commit 57e2046
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 49 deletions.
7 changes: 6 additions & 1 deletion app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func handleWebsocket(
var (
previousTopo render.RenderableNodes
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
)
rep.WaitOn(wait)
defer rep.UnWait(wait)

for {
newTopo := renderer.Render(rep.Report()).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
Expand All @@ -128,9 +132,10 @@ func handleWebsocket(
}

select {
case <-wait:
case <-tick:
case <-quit:
return
case <-tick:
}
}
}
38 changes: 38 additions & 0 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report() report.Report
WaitOn(chan struct{})
UnWait(chan struct{})
}

// Adder is something that can accept reports. It's a convenient interface for
Expand All @@ -25,12 +27,45 @@ type Collector struct {
mtx sync.Mutex
reports []timestampReport
window time.Duration
waitableCondition
}

type waitableCondition struct {
sync.Mutex
waiters map[chan struct{}]struct{}
}

func (wc *waitableCondition) WaitOn(waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}

func (wc *waitableCondition) UnWait(waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
}

func (wc *waitableCondition) Broadcast() {
wc.Lock()
for waiter := range wc.waiters {
// Non-block write to channel
select {
case waiter <- struct{}{}:
default:
}
}
wc.Unlock()
}

// NewCollector returns a collector ready for use.
func NewCollector(window time.Duration) *Collector {
return &Collector{
window: window,
waitableCondition: waitableCondition{
waiters: map[chan struct{}]struct{}{},
},
}
}

Expand All @@ -42,6 +77,9 @@ func (c *Collector) Add(rpt report.Report) {
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
c.reports = clean(c.reports, c.window)
if rpt.Shortcut {
c.Broadcast()
}
}

// Report returns a merged report over all added reports. It implements
Expand Down
5 changes: 3 additions & 2 deletions app/mock_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import (
type StaticReport struct{}

func (s StaticReport) Report() report.Report { return fixture.Report }

func (s StaticReport) Add(report.Report) {}
func (s StaticReport) Add(report.Report) {}
func (s StaticReport) WaitOn(chan struct{}) {}
func (s StaticReport) UnWait(chan struct{}) {}
18 changes: 18 additions & 0 deletions probe/docker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ type Registry interface {
LockedPIDLookup(f func(func(int) Container))
WalkContainers(f func(Container))
WalkImages(f func(*docker_client.APIImages))
WatchContainerUpdates(ContainerUpdateWatcher)
}

// ContainerUpdateWatcher is the type of functions that get called when containers are updated.
type ContainerUpdateWatcher func(c Container)

type registry struct {
sync.RWMutex
quit chan chan struct{}
interval time.Duration
client Client

watchers []ContainerUpdateWatcher
containers map[string]Container
containersByPID map[int]Container
images map[string]*docker_client.APIImages
Expand Down Expand Up @@ -91,6 +96,14 @@ func (r *registry) Stop() {
<-ch
}

// WatchContainerUpdates registers a callback to be called
// whenever a container is updated.
func (r *registry) WatchContainerUpdates(f ContainerUpdateWatcher) {
r.Lock()
defer r.Unlock()
r.watchers = append(r.watchers, f)
}

func (r *registry) loop() {
for {
// NB listenForEvents blocks.
Expand Down Expand Up @@ -244,6 +257,11 @@ func (r *registry) updateContainerState(containerID string) {
c.UpdateState(dockerContainer)
}

// Trigger anyone watching for updates
for _, f := range r.watchers {
f(c)
}

// And finally, ensure we gather stats for it
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
Expand Down
25 changes: 23 additions & 2 deletions probe/docker/reporter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package docker

import (
"log"
"net"

docker_client "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/report"
)

Expand All @@ -18,14 +20,33 @@ const (
type Reporter struct {
registry Registry
hostID string
probe *probe.Probe
}

// NewReporter makes a new Reporter
func NewReporter(registry Registry, hostID string) *Reporter {
return &Reporter{
func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter {
reporter := &Reporter{
registry: registry,
hostID: hostID,
probe: probe,
}
registry.WatchContainerUpdates(reporter.ContainerUpdated)
return reporter
}

// ContainerUpdated should be called whenever a container is updated.
func (r *Reporter) ContainerUpdated(c Container) {
localAddrs, err := report.LocalAddresses()
if err != nil {
log.Printf("Error getting local address: %v", err)
return
}

// Publish a 'short cut' report container just this container
rpt := report.MakeReport()
rpt.Shortcut = true
rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs))
r.probe.Publish(rpt)
}

// Report generates a Report containing Container and ContainerImage topologies
Expand Down
4 changes: 3 additions & 1 deletion probe/docker/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (r *mockRegistry) WalkImages(f func(*client.APIImages)) {
}
}

func (r *mockRegistry) WatchContainerUpdates(_ docker.ContainerUpdateWatcher) {}

var (
mockRegistryInstance = &mockRegistry{
containersByPID: map[int]docker.Container{
Expand Down Expand Up @@ -95,7 +97,7 @@ func TestReporter(t *testing.T) {
Controls: report.Controls{},
}

reporter := docker.NewReporter(mockRegistryInstance, "")
reporter := docker.NewReporter(mockRegistryInstance, "", nil)
have, _ := reporter.Report()
if !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
Expand Down
57 changes: 41 additions & 16 deletions probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@ import (
"github.com/weaveworks/scope/xfer"
)

const (
reportBufferSize = 16
)

// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher xfer.Publisher
publisher *xfer.ReportPublisher

tickers []Ticker
reporters []Reporter
taggers []Tagger

quit chan struct{}
done sync.WaitGroup
rpt syncReport

spiedReports chan report.Report
shortcutReports chan report.Report
}

// Tagger tags nodes with value-add node metadata.
Expand All @@ -45,10 +51,11 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
publisher: xfer.NewReportPublisher(publisher),
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),
}
result.rpt.swap(report.MakeReport())
return result
}

Expand Down Expand Up @@ -80,6 +87,12 @@ func (p *Probe) Stop() {
p.done.Wait()
}

// Publish will queue a report for immediate publication,
// bypassing the spy tick
func (p *Probe) Publish(rpt report.Report) {
p.shortcutReports <- rpt
}

func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)
Expand All @@ -94,10 +107,9 @@ func (p *Probe) spyLoop() {
}
}

localReport := p.rpt.copy()
localReport = localReport.Merge(p.report())
localReport = p.tag(localReport)
p.rpt.swap(localReport)
rpt := p.report()
rpt = p.tag(rpt)
p.spiedReports <- rpt

if took := time.Since(start); took > p.spyInterval {
log.Printf("report generation took too long (%s)", took)
Expand Down Expand Up @@ -140,20 +152,33 @@ func (p *Probe) tag(r report.Report) report.Report {
return r
}

func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) {
ForLoop:
for {
select {
case r := <-rs:
rpt = rpt.Merge(r)
default:
break ForLoop
}
}

if err := p.publisher.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}
}

func (p *Probe) publishLoop() {
defer p.done.Done()
var (
pubTick = time.Tick(p.publishInterval)
rptPub = xfer.NewReportPublisher(p.publisher)
)
pubTick := time.Tick(p.publishInterval)

for {
select {
case <-pubTick:
localReport := p.rpt.swap(report.MakeReport())
if err := rptPub.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}
p.drainAndPublish(report.MakeReport(), p.spiedReports)

case rpt := <-p.shortcutReports:
p.drainAndPublish(rpt, p.shortcutReports)

case <-p.quit:
return
Expand Down
26 changes: 0 additions & 26 deletions probe/sync_report.go

This file was deleted.

2 changes: 1 addition & 1 deletion prog/probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func main() {
if registry, err := docker.NewRegistry(*dockerInterval); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
p.AddReporter(docker.NewReporter(registry, hostID))
p.AddReporter(docker.NewReporter(registry, hostID, p))
} else {
log.Printf("Docker: failed to start registry: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type Report struct {
// such as in the app, we expect the component to overwrite the window
// before serving it to consumers.
Window time.Duration

// Shortcut reports should be propogated to the UI as quickly as possible,
// bypassing the usual spy interval, publish interval and app ws interval.
Shortcut bool
}

// MakeReport makes a clean report, ready to Merge() other reports into.
Expand Down

0 comments on commit 57e2046

Please sign in to comment.