Skip to content

Commit

Permalink
Only fetch weave status report once per tick.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie committed Sep 8, 2015
1 parent 51bd5e0 commit 91efd86
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 24 deletions.
9 changes: 7 additions & 2 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func main() {
var (
endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
tickers = []Ticker{processCache}
reporters = []Reporter{
endpointReporter,
host.NewReporter(hostID, hostName, localNets),
Expand Down Expand Up @@ -143,6 +144,7 @@ func main() {
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)
}
Expand Down Expand Up @@ -188,8 +190,11 @@ func main() {

case <-spyTick:
start := time.Now()
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)

for _, ticker := range tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
}

r = r.Merge(doReport(reporters))
Expand Down
39 changes: 18 additions & 21 deletions probe/overlay/weave.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3
type Weave struct {
url string
hostID string
status weaveStatus
}

type weaveStatus struct {
Expand Down Expand Up @@ -75,24 +76,30 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
}, nil
}

func (w Weave) update() (weaveStatus, error) {
// Tick implements Ticker
func (w *Weave) Tick() error {
var result weaveStatus
req, err := http.NewRequest("GET", w.url, nil)
if err != nil {
return result, err
return err
}
req.Header.Add("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return result, err
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return result, fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
}

if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}

return result, json.NewDecoder(resp.Body).Decode(&result)
w.status = result
return nil
}

type psEntry struct {
Expand All @@ -101,7 +108,7 @@ type psEntry struct {
ips []string
}

func (w Weave) ps() ([]psEntry, error) {
func (w *Weave) ps() ([]psEntry, error) {
var result []psEntry
cmd := exec.Command("weave", "--local", "ps")
out, err := cmd.StdoutPipe()
Expand Down Expand Up @@ -132,7 +139,7 @@ func (w Weave) ps() ([]psEntry, error) {
return result, scanner.Err()
}

func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) {
func (w *Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) {
for nodeid, nmd := range r.Container.Nodes {
idPrefix := nmd.Metadata[docker.ContainerID][:12]
if idPrefix != containerIDPrefix {
Expand All @@ -149,13 +156,8 @@ func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress strin
}

// Tag implements Tagger.
func (w Weave) Tag(r report.Report) (report.Report, error) {
status, err := w.update()
if err != nil {
return r, nil
}

for _, entry := range status.DNS.Entries {
func (w *Weave) Tag(r report.Report) (report.Report, error) {
for _, entry := range w.status.DNS.Entries {
if entry.Tombstone > 0 {
continue
}
Expand All @@ -181,14 +183,9 @@ func (w Weave) Tag(r report.Report) (report.Report, error) {
}

// Report implements Reporter.
func (w Weave) Report() (report.Report, error) {
func (w *Weave) Report() (report.Report, error) {
r := report.MakeReport()
status, err := w.update()
if err != nil {
return r, err
}

for _, peer := range status.Router.Peers {
for _, peer := range w.status.Router.Peers {
r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{
WeavePeerName: peer.Name,
WeavePeerNickName: peer.NickName,
Expand Down
2 changes: 1 addition & 1 deletion probe/process/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *CachingWalker) Walk(f func(Process)) error {
}

// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
func (c *CachingWalker) Tick() error {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
newCache = append(newCache, p)
Expand Down
6 changes: 6 additions & 0 deletions probe/tag_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type Reporter interface {
Report() (report.Report, error)
}

// Ticker 'ticks' every spyDuration - useful for updating
// cached state shared between Taggers and Reporters
type Ticker interface {
Tick() error
}

// Apply tags the report with all the taggers.
func Apply(r report.Report, taggers []Tagger) report.Report {
var err error
Expand Down

0 comments on commit 91efd86

Please sign in to comment.