Skip to content

Commit

Permalink
Push mini-reports when container's state changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Nov 10, 2015
1 parent f274c50 commit 92b2479
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 37 deletions.
18 changes: 18 additions & 0 deletions probe/docker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ type Registry interface {
LockedPIDLookup(f func(func(int) Container))
WalkContainers(f func(Container))
WalkImages(f func(*docker_client.APIImages))
WatchContainerUpdates(ContainerUpdateWatcher)
}

// ContainerUpdateWatcher is the type of functions that get called when containers are updated.
type ContainerUpdateWatcher func(c Container)

type registry struct {
sync.RWMutex
quit chan chan struct{}
interval time.Duration
client Client

watchers []ContainerUpdateWatcher
containers map[string]Container
containersByPID map[int]Container
images map[string]*docker_client.APIImages
Expand Down Expand Up @@ -91,6 +96,14 @@ func (r *registry) Stop() {
<-ch
}

// WatchContainerUpdates registers a callback to be called
// whenever a container is updated.
func (r *registry) WatchContainerUpdates(f ContainerUpdateWatcher) {
r.Lock()
defer r.Unlock()
r.watchers = append(r.watchers, f)
}

func (r *registry) loop() {
for {
// NB listenForEvents blocks.
Expand Down Expand Up @@ -244,6 +257,11 @@ func (r *registry) updateContainerState(containerID string) {
c.UpdateState(dockerContainer)
}

// Trigger anyone watching for updates
for _, f := range r.watchers {
f(c)
}

// And finally, ensure we gather stats for it
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
Expand Down
20 changes: 19 additions & 1 deletion probe/docker/reporter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package docker

import (
"log"
"net"

docker_client "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/report"
)

Expand All @@ -18,16 +20,32 @@ const (
type Reporter struct {
registry Registry
hostID string
probe *probe.Probe
}

// NewReporter makes a new Reporter
func NewReporter(registry Registry, hostID string) *Reporter {
func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter {
return &Reporter{
registry: registry,
hostID: hostID,
probe: probe,
}
}

// ContainerUpdated should be called whenever a container is updated.
func (r *Reporter) ContainerUpdated(c Container) {
localAddrs, err := report.LocalAddresses()
if err != nil {
log.Printf("Error getting local address: %v", err)
return
}

// Publish a 'short cut' report container just this container
rpt := report.MakeReport()
rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs))
r.probe.Publish(rpt)
}

// Report generates a Report containing Container and ContainerImage topologies
func (r *Reporter) Report() (report.Report, error) {
localAddrs, err := report.LocalAddresses()
Expand Down
4 changes: 3 additions & 1 deletion probe/docker/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (r *mockRegistry) WalkImages(f func(*client.APIImages)) {
}
}

func (r *mockRegistry) WatchContainerUpdates(_ docker.ContainerUpdateWatcher) {}

var (
mockRegistryInstance = &mockRegistry{
containersByPID: map[int]docker.Container{
Expand Down Expand Up @@ -95,7 +97,7 @@ func TestReporter(t *testing.T) {
Controls: report.Controls{},
}

reporter := docker.NewReporter(mockRegistryInstance, "")
reporter := docker.NewReporter(mockRegistryInstance, "", nil)
have, _ := reporter.Report()
if !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
Expand Down
45 changes: 37 additions & 8 deletions probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/weaveworks/scope/xfer"
)

const (
reportBufferSize = 16
)

// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
Expand All @@ -20,7 +24,9 @@ type Probe struct {

quit chan struct{}
done sync.WaitGroup
rpt syncReport

spiedReports chan report.Report
shortcutReports chan report.Report
}

// Tagger tags nodes with value-add node metadata.
Expand All @@ -47,8 +53,9 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *
publishInterval: publishInterval,
publisher: publisher,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),
}
result.rpt.swap(report.MakeReport())
return result
}

Expand Down Expand Up @@ -80,6 +87,12 @@ func (p *Probe) Stop() {
p.done.Wait()
}

// Publish will queue a report for immediate publication,
// bypassing the spy tick
func (p *Probe) Publish(rpt report.Report) {
p.shortcutReports <- rpt
}

func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)
Expand All @@ -94,10 +107,9 @@ func (p *Probe) spyLoop() {
}
}

localReport := p.rpt.copy()
localReport = localReport.Merge(p.report())
localReport = p.tag(localReport)
p.rpt.swap(localReport)
rpt := p.report()
rpt = p.tag(rpt)
p.spiedReports <- rpt

if took := time.Since(start); took > p.spyInterval {
log.Printf("report generation took too long (%s)", took)
Expand Down Expand Up @@ -140,6 +152,17 @@ func (p *Probe) tag(r report.Report) report.Report {
return r
}

func condense(rpt report.Report, rs chan report.Report) report.Report {
for {
select {
case r := <-rs:
rpt = rpt.Merge(r)
default:
return rpt
}
}
}

func (p *Probe) publishLoop() {
defer p.done.Done()
var (
Expand All @@ -150,8 +173,14 @@ func (p *Probe) publishLoop() {
for {
select {
case <-pubTick:
localReport := p.rpt.swap(report.MakeReport())
if err := rptPub.Publish(localReport); err != nil {
rpt := condense(report.MakeReport(), p.spiedReports)
if err := rptPub.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}

case rpt := <-p.shortcutReports:
rpt = condense(rpt, p.shortcutReports)
if err := rptPub.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}

Expand Down
26 changes: 0 additions & 26 deletions probe/sync_report.go

This file was deleted.

5 changes: 4 additions & 1 deletion prog/probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ func main() {
if registry, err := docker.NewRegistry(*dockerInterval); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
p.AddReporter(docker.NewReporter(registry, hostID))

reporter := docker.NewReporter(registry, hostID, p)
registry.WatchContainerUpdates(reporter.ContainerUpdated)
p.AddReporter(reporter)
} else {
log.Printf("Docker: failed to start registry: %v", err)
}
Expand Down

0 comments on commit 92b2479

Please sign in to comment.