Skip to content

Commit

Permalink
Move probe main.go to prog/probe/, break out a probe struct with appr…
Browse files Browse the repository at this point in the history
…opriate responsibilities.
  • Loading branch information
tomwilkie committed Nov 9, 2015
1 parent 132b447 commit 761b83e
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 228 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ app/app
app/scope-app
probe/probe
probe/scope-probe
prog/probe/scope-probe
docker/scope-app
docker/scope-probe
docker/docker*
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
SUDO=sudo -E
DOCKERHUB_USER=weaveworks
APP_EXE=app/scope-app
PROBE_EXE=probe/scope-probe
PROBE_EXE=prog/probe/scope-probe
FIXPROBE_EXE=experimental/fixprobe/fixprobe
SCOPE_IMAGE=$(DOCKERHUB_USER)/scope
SCOPE_EXPORT=scope.tar
Expand Down
5 changes: 3 additions & 2 deletions probe/hostname.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package main
package probe

import "os"

func hostname() string {
// Hostname returns the hostname of this host.
func Hostname() string {
if hostname := os.Getenv("SCOPE_HOSTNAME"); hostname != "" {
return hostname
}
Expand Down
27 changes: 0 additions & 27 deletions probe/instrumentation.go

This file was deleted.

163 changes: 163 additions & 0 deletions probe/probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package probe

import (
"log"
"sync"
"time"

"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)

// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher xfer.Publisher

tickers []Ticker
reporters []Reporter
taggers []Tagger

quit chan struct{}
done sync.WaitGroup
rpt syncReport
}

// Tagger tags nodes with value-add node metadata.
type Tagger interface {
Tag(r report.Report) (report.Report, error)
}

// Reporter generates Reports.
type Reporter interface {
Report() (report.Report, error)
}

// Ticker is something which will be invoked every spyDuration.
// It's useful for things that should be updated on that interval.
// For example, cached shared state between Taggers and Reporters.
type Ticker interface {
Tick() error
}

// New makes a new Probe.
func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
quit: make(chan struct{}),
}
result.rpt.swap(report.MakeReport())
return result
}

// AddTagger adds a new Tagger to the Probe
func (p *Probe) AddTagger(ts ...Tagger) {
p.taggers = append(p.taggers, ts...)
}

// AddReporter adds a new Reported to the Probe
func (p *Probe) AddReporter(rs ...Reporter) {
p.reporters = append(p.reporters, rs...)
}

// AddTicker adds a new Ticker to the Probe
func (p *Probe) AddTicker(ts ...Ticker) {
p.tickers = append(p.tickers, ts...)
}

// Start starts the probe
func (p *Probe) Start() {
p.done.Add(2)
go p.spyLoop()
go p.publishLoop()
}

// Stop stops the probe
func (p *Probe) Stop() {
close(p.quit)
p.done.Wait()
}

func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)

for {
select {
case <-spyTick:
start := time.Now()
for _, ticker := range p.tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
}

localReport := p.rpt.copy()
localReport = localReport.Merge(p.report())
localReport = p.tag(localReport)
p.rpt.swap(localReport)

if took := time.Since(start); took > p.spyInterval {
log.Printf("report generation took too long (%s)", took)
}

case <-p.quit:
return
}
}
}

func (p *Probe) report() report.Report {
reports := make(chan report.Report, len(p.reporters))
for _, rep := range p.reporters {
go func(rep Reporter) {
newReport, err := rep.Report()
if err != nil {
log.Printf("error generating report: %v", err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
}(rep)
}

result := report.MakeReport()
for i := 0; i < cap(reports); i++ {
result = result.Merge(<-reports)
}
return result
}

func (p *Probe) tag(r report.Report) report.Report {
var err error
for _, tagger := range p.taggers {
r, err = tagger.Tag(r)
if err != nil {
log.Printf("error applying tagger: %v", err)
}
}
return r
}

func (p *Probe) publishLoop() {
defer p.done.Done()
var (
pubTick = time.Tick(p.publishInterval)
rptPub = xfer.NewReportPublisher(p.publisher)
)

for {
select {
case <-pubTick:
localReport := p.rpt.swap(report.MakeReport())
localReport.Window = p.publishInterval
if err := rptPub.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}

case <-p.quit:
return
}
}
}
20 changes: 5 additions & 15 deletions probe/tag_report_test.go → probe/probe_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package main
package probe

import (
"reflect"
"testing"

"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)

func TestApply(t *testing.T) {
Expand All @@ -16,10 +15,13 @@ func TestApply(t *testing.T) {
addressNode = report.MakeNodeWith(map[string]string{"7": "8"})
)

p := New(0, 0, nil)
p.AddTagger(NewTopologyTagger())

r := report.MakeReport()
r.Endpoint.AddNode(endpointNodeID, endpointNode)
r.Address.AddNode(addressNodeID, addressNode)
r = Apply(r, []Tagger{newTopologyTagger()})
r = p.tag(r)

for _, tuple := range []struct {
want report.Node
Expand All @@ -34,15 +36,3 @@ func TestApply(t *testing.T) {
}
}
}

func TestTagMissingID(t *testing.T) {
const nodeID = "not-found"
r := report.MakeReport()
want := report.MakeNode()
rpt, _ := newTopologyTagger().Tag(r)
have := rpt.Endpoint.Nodes[nodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
t.Error("TopologyTagger erroneously tagged a missing node ID")
}
}
26 changes: 26 additions & 0 deletions probe/sync_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package probe

import (
"sync"

"github.com/weaveworks/scope/report"
)

type syncReport struct {
mtx sync.RWMutex
rpt report.Report
}

func (r *syncReport) swap(other report.Report) report.Report {
r.mtx.Lock()
defer r.mtx.Unlock()
old := r.rpt
r.rpt = other
return old
}

func (r *syncReport) copy() report.Report {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.rpt.Copy()
}
35 changes: 2 additions & 33 deletions probe/tag_report.go → probe/topology_tagger.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,17 @@
package main
package probe

import (
"log"

"github.com/weaveworks/scope/report"
)

// Tagger tags nodes with value-add node metadata.
type Tagger interface {
Tag(r report.Report) (report.Report, error)
}

// Reporter generates Reports.
type Reporter interface {
Report() (report.Report, error)
}

// Ticker is something which will be invoked every spyDuration.
// It's useful for things that should be updated on that interval.
// For example, cached shared state between Taggers and Reporters.
type Ticker interface {
Tick() error
}

// Apply tags the report with all the taggers.
func Apply(r report.Report, taggers []Tagger) report.Report {
var err error
for _, tagger := range taggers {
r, err = tagger.Tag(r)
if err != nil {
log.Printf("error applying tagger: %v", err)
}
}
return r
}

// Topology is the Node key for the origin topology.
const Topology = "topology"

type topologyTagger struct{}

// NewTopologyTagger tags each node with the topology that it comes from. It's
// kind of a proof-of-concept tagger, useful primarily for debugging.
func newTopologyTagger() Tagger {
func NewTopologyTagger() Tagger {
return &topologyTagger{}
}

Expand Down
21 changes: 21 additions & 0 deletions probe/topology_tagger_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package probe

import (
"reflect"
"testing"

"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)

func TestTagMissingID(t *testing.T) {
const nodeID = "not-found"
r := report.MakeReport()
want := report.MakeNode()
rpt, _ := NewTopologyTagger().Tag(r)
have := rpt.Endpoint.Nodes[nodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
t.Error("TopologyTagger erroneously tagged a missing node ID")
}
}
Loading

0 comments on commit 761b83e

Please sign in to comment.