From 91efd86f2e4762bf26b97bf7f803df591e7a6ac7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 8 Sep 2015 18:16:03 +0000 Subject: [PATCH] Only fetch weave status report once per tick. --- probe/main.go | 9 +++++++-- probe/overlay/weave.go | 39 ++++++++++++++++++--------------------- probe/process/walker.go | 2 +- probe/tag_report.go | 6 ++++++ 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/probe/main.go b/probe/main.go index e256905936..c4016bbcf2 100644 --- a/probe/main.go +++ b/probe/main.go @@ -114,6 +114,7 @@ func main() { var ( endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + tickers = []Ticker{processCache} reporters = []Reporter{ endpointReporter, host.NewReporter(hostID, hostName, localNets), @@ -143,6 +144,7 @@ func main() { if err != nil { log.Fatalf("failed to start Weave tagger: %v", err) } + tickers = append(tickers, weave) taggers = append(taggers, weave) reporters = append(reporters, weave) } @@ -188,8 +190,11 @@ func main() { case <-spyTick: start := time.Now() - if err := processCache.Update(); err != nil { - log.Printf("error reading processes: %v", err) + + for _, ticker := range tickers { + if err := ticker.Tick(); err != nil { + log.Printf("error doing ticker: %v", err) + } } r = r.Merge(doReport(reporters)) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index b9699cdd33..ea605bf25d 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -43,6 +43,7 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3 type Weave struct { url string hostID string + status weaveStatus } type weaveStatus struct { @@ -75,24 +76,30 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) { }, nil } -func (w Weave) update() (weaveStatus, error) { +// Tick implements Ticker +func (w *Weave) Tick() error { var result weaveStatus req, err := http.NewRequest("GET", w.url, nil) if err != nil { - return result, err + return err } req.Header.Add("Accept", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { - return result, err + return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return result, fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) + return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return err } - return result, json.NewDecoder(resp.Body).Decode(&result) + w.status = result + return nil } type psEntry struct { @@ -101,7 +108,7 @@ type psEntry struct { ips []string } -func (w Weave) ps() ([]psEntry, error) { +func (w *Weave) ps() ([]psEntry, error) { var result []psEntry cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() @@ -132,7 +139,7 @@ func (w Weave) ps() ([]psEntry, error) { return result, scanner.Err() } -func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) { +func (w *Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) { for nodeid, nmd := range r.Container.Nodes { idPrefix := nmd.Metadata[docker.ContainerID][:12] if idPrefix != containerIDPrefix { @@ -149,13 +156,8 @@ func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress strin } // Tag implements Tagger. -func (w Weave) Tag(r report.Report) (report.Report, error) { - status, err := w.update() - if err != nil { - return r, nil - } - - for _, entry := range status.DNS.Entries { +func (w *Weave) Tag(r report.Report) (report.Report, error) { + for _, entry := range w.status.DNS.Entries { if entry.Tombstone > 0 { continue } @@ -181,14 +183,9 @@ func (w Weave) Tag(r report.Report) (report.Report, error) { } // Report implements Reporter. -func (w Weave) Report() (report.Report, error) { +func (w *Weave) Report() (report.Report, error) { r := report.MakeReport() - status, err := w.update() - if err != nil { - return r, err - } - - for _, peer := range status.Router.Peers { + for _, peer := range w.status.Router.Peers { r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{ WeavePeerName: peer.Name, WeavePeerNickName: peer.NickName, diff --git a/probe/process/walker.go b/probe/process/walker.go index 4a88c45cd9..1f89a610b5 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -40,7 +40,7 @@ func (c *CachingWalker) Walk(f func(Process)) error { } // Update updates cached copy of process list -func (c *CachingWalker) Update() error { +func (c *CachingWalker) Tick() error { newCache := []Process{} err := c.source.Walk(func(p Process) { newCache = append(newCache, p) diff --git a/probe/tag_report.go b/probe/tag_report.go index 15024d77ae..b220e65800 100644 --- a/probe/tag_report.go +++ b/probe/tag_report.go @@ -16,6 +16,12 @@ type Reporter interface { Report() (report.Report, error) } +// Ticker 'ticks' every spyDuration - useful for updating +// cached state shared between Taggers and Reporters +type Ticker interface { + Tick() error +} + // Apply tags the report with all the taggers. func Apply(r report.Report, taggers []Tagger) report.Report { var err error