Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push mini-reports when container states changes #640

Merged
merged 3 commits into from
Nov 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func handleWebsocket(
var (
previousTopo render.RenderableNodes
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
)
rep.WaitOn(wait)
defer rep.UnWait(wait)

This comment was marked as abuse.

This comment was marked as abuse.


for {
newTopo := renderer.Render(rep.Report()).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
Expand All @@ -128,9 +132,10 @@ func handleWebsocket(
}

select {
case <-wait:
case <-tick:
case <-quit:
return
case <-tick:
}
}
}
38 changes: 38 additions & 0 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report() report.Report
WaitOn(chan struct{})
UnWait(chan struct{})
}

// Adder is something that can accept reports. It's a convenient interface for
Expand All @@ -25,12 +27,45 @@ type Collector struct {
mtx sync.Mutex
reports []timestampReport
window time.Duration
waitableCondition
}

type waitableCondition struct {
sync.Mutex
waiters map[chan struct{}]struct{}
}

func (wc *waitableCondition) WaitOn(waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}

func (wc *waitableCondition) UnWait(waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
}

func (wc *waitableCondition) Broadcast() {
wc.Lock()
for waiter := range wc.waiters {
// Non-block write to channel
select {
case waiter <- struct{}{}:
default:
}
}
wc.Unlock()
}

// NewCollector returns a collector ready for use.
func NewCollector(window time.Duration) *Collector {
return &Collector{
window: window,
waitableCondition: waitableCondition{
waiters: map[chan struct{}]struct{}{},
},
}
}

Expand All @@ -42,6 +77,9 @@ func (c *Collector) Add(rpt report.Report) {
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
c.reports = clean(c.reports, c.window)
if rpt.Shortcut {
c.Broadcast()
}
}

// Report returns a merged report over all added reports. It implements
Expand Down
5 changes: 3 additions & 2 deletions app/mock_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import (
type StaticReport struct{}

func (s StaticReport) Report() report.Report { return fixture.Report }

func (s StaticReport) Add(report.Report) {}
func (s StaticReport) Add(report.Report) {}
func (s StaticReport) WaitOn(chan struct{}) {}
func (s StaticReport) UnWait(chan struct{}) {}
18 changes: 18 additions & 0 deletions probe/docker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ type Registry interface {
LockedPIDLookup(f func(func(int) Container))
WalkContainers(f func(Container))
WalkImages(f func(*docker_client.APIImages))
WatchContainerUpdates(ContainerUpdateWatcher)
}

// ContainerUpdateWatcher is the type of functions that get called when containers are updated.
type ContainerUpdateWatcher func(c Container)

type registry struct {
sync.RWMutex
quit chan chan struct{}
interval time.Duration
client Client

watchers []ContainerUpdateWatcher
containers map[string]Container
containersByPID map[int]Container
images map[string]*docker_client.APIImages
Expand Down Expand Up @@ -91,6 +96,14 @@ func (r *registry) Stop() {
<-ch
}

// WatchContainerUpdates registers a callback to be called
// whenever a container is updated.
func (r *registry) WatchContainerUpdates(f ContainerUpdateWatcher) {
r.Lock()
defer r.Unlock()
r.watchers = append(r.watchers, f)
}

func (r *registry) loop() {
for {
// NB listenForEvents blocks.
Expand Down Expand Up @@ -244,6 +257,11 @@ func (r *registry) updateContainerState(containerID string) {
c.UpdateState(dockerContainer)
}

// Trigger anyone watching for updates
for _, f := range r.watchers {
f(c)
}

// And finally, ensure we gather stats for it
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
Expand Down
25 changes: 23 additions & 2 deletions probe/docker/reporter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package docker

import (
"log"
"net"

docker_client "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/report"
)

Expand All @@ -18,14 +20,33 @@ const (
type Reporter struct {
registry Registry
hostID string
probe *probe.Probe
}

// NewReporter makes a new Reporter
func NewReporter(registry Registry, hostID string) *Reporter {
return &Reporter{
func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter {
reporter := &Reporter{
registry: registry,
hostID: hostID,
probe: probe,
}
registry.WatchContainerUpdates(reporter.ContainerUpdated)
return reporter
}

// ContainerUpdated should be called whenever a container is updated.
func (r *Reporter) ContainerUpdated(c Container) {
localAddrs, err := report.LocalAddresses()
if err != nil {
log.Printf("Error getting local address: %v", err)
return
}

// Publish a 'short cut' report container just this container
rpt := report.MakeReport()
rpt.Shortcut = true
rpt.Container.AddNode(report.MakeContainerNodeID(r.hostID, c.ID()), c.GetNode(r.hostID, localAddrs))
r.probe.Publish(rpt)
}

// Report generates a Report containing Container and ContainerImage topologies
Expand Down
4 changes: 3 additions & 1 deletion probe/docker/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (r *mockRegistry) WalkImages(f func(*client.APIImages)) {
}
}

func (r *mockRegistry) WatchContainerUpdates(_ docker.ContainerUpdateWatcher) {}

var (
mockRegistryInstance = &mockRegistry{
containersByPID: map[int]docker.Container{
Expand Down Expand Up @@ -95,7 +97,7 @@ func TestReporter(t *testing.T) {
Controls: report.Controls{},
}

reporter := docker.NewReporter(mockRegistryInstance, "")
reporter := docker.NewReporter(mockRegistryInstance, "", nil)
have, _ := reporter.Report()
if !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
Expand Down
57 changes: 41 additions & 16 deletions probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@ import (
"github.com/weaveworks/scope/xfer"
)

const (
reportBufferSize = 16
)

// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher xfer.Publisher
publisher *xfer.ReportPublisher

tickers []Ticker
reporters []Reporter
taggers []Tagger

quit chan struct{}
done sync.WaitGroup
rpt syncReport

spiedReports chan report.Report
shortcutReports chan report.Report
}

// Tagger tags nodes with value-add node metadata.
Expand All @@ -45,10 +51,11 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
publisher: xfer.NewReportPublisher(publisher),
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),
}
result.rpt.swap(report.MakeReport())
return result
}

Expand Down Expand Up @@ -80,6 +87,12 @@ func (p *Probe) Stop() {
p.done.Wait()
}

// Publish will queue a report for immediate publication,
// bypassing the spy tick
func (p *Probe) Publish(rpt report.Report) {
p.shortcutReports <- rpt
}

func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)
Expand All @@ -94,10 +107,9 @@ func (p *Probe) spyLoop() {
}
}

localReport := p.rpt.copy()
localReport = localReport.Merge(p.report())
localReport = p.tag(localReport)
p.rpt.swap(localReport)
rpt := p.report()
rpt = p.tag(rpt)
p.spiedReports <- rpt

if took := time.Since(start); took > p.spyInterval {
log.Printf("report generation took too long (%s)", took)
Expand Down Expand Up @@ -140,20 +152,33 @@ func (p *Probe) tag(r report.Report) report.Report {
return r
}

func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) {
ForLoop:
for {
select {
case r := <-rs:
rpt = rpt.Merge(r)
default:
break ForLoop
}
}

if err := p.publisher.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}
}

func (p *Probe) publishLoop() {
defer p.done.Done()
var (
pubTick = time.Tick(p.publishInterval)
rptPub = xfer.NewReportPublisher(p.publisher)
)
pubTick := time.Tick(p.publishInterval)

for {
select {
case <-pubTick:
localReport := p.rpt.swap(report.MakeReport())
if err := rptPub.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}
p.drainAndPublish(report.MakeReport(), p.spiedReports)

case rpt := <-p.shortcutReports:
p.drainAndPublish(rpt, p.shortcutReports)

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.


case <-p.quit:
return
Expand Down
26 changes: 0 additions & 26 deletions probe/sync_report.go

This file was deleted.

2 changes: 1 addition & 1 deletion prog/probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func main() {
if registry, err := docker.NewRegistry(*dockerInterval); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
p.AddReporter(docker.NewReporter(registry, hostID))
p.AddReporter(docker.NewReporter(registry, hostID, p))
} else {
log.Printf("Docker: failed to start registry: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type Report struct {
// such as in the app, we expect the component to overwrite the window
// before serving it to consumers.
Window time.Duration

// Shortcut reports should be propogated to the UI as quickly as possible,
// bypassing the usual spy interval, publish interval and app ws interval.
Shortcut bool
}

// MakeReport makes a clean report, ready to Merge() other reports into.
Expand Down