From dd19cac6d541bd069106cd50fb7d9f25846ea454 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 27 Jan 2016 20:09:24 -0800 Subject: [PATCH] Run the Weave integrations regardless of if weave is detected. --- docker/entrypoint.sh | 1 - probe/overlay/weave.go | 156 ++++++++++++++++++++++-------------- probe/overlay/weave_test.go | 12 ++- prog/probe.go | 3 +- 4 files changed, 103 insertions(+), 69 deletions(-) diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index a58e0f5bf8..574a13956f 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -127,7 +127,6 @@ done if is_running $WEAVE_CONTAINER_NAME; then container_ip $WEAVE_CONTAINER_NAME - PROBE_ARGS="$PROBE_ARGS -weave.router.addr=$CONTAINER_IP" weave_expose DOCKER_BRIDGE_IP=$(docker_bridge_ip) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index efc2fbf0c9..4f5ab7809e 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -31,6 +31,9 @@ const ( // WeaveMACAddress is the key for the mac address of the container on the // weave network, to be found in container node metadata WeaveMACAddress = "weave_mac_address" + + initialBackoff = 5 * time.Second + maxBackoff = 60 * time.Second ) var weavePsMatch = regexp.MustCompile(`^([0-9a-f]{12}) ((?:[0-9a-f][0-9a-f]\:){5}(?:[0-9a-f][0-9a-f]))(.*)$`) @@ -45,11 +48,12 @@ type Weave struct { url string hostID string - quit chan struct{} - done sync.WaitGroup - mtx sync.RWMutex - status weaveStatus - ps map[string]psEntry + quit chan struct{} + done sync.WaitGroup + mtx sync.RWMutex + + statusCache weaveStatus + psCache map[string]psEntry } type weaveStatus struct { @@ -73,10 +77,10 @@ type weaveStatus struct { // address. The address should be an IP or FQDN, no port. func NewWeave(hostID, weaveRouterAddress string) *Weave { w := &Weave{ - url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), - hostID: hostID, - quit: make(chan struct{}), - ps: map[string]psEntry{}, + url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), + hostID: hostID, + quit: make(chan struct{}), + psCache: map[string]psEntry{}, } w.done.Add(1) go w.loop() @@ -92,83 +96,51 @@ func (w *Weave) Stop() { w.done.Wait() } -func (w *Weave) loop() { - defer w.done.Done() - tick := time.Tick(5 * time.Second) +func (w *Weave) doWithBackoff(f func() error) { + backoff := initialBackoff for { - psEntries, err := w.getPSEntries() + err := f() if err != nil { - log.Printf("Error running weave ps: %v", err) - break - } - - psEntriesByPrefix := map[string]psEntry{} - for _, entry := range psEntries { - psEntriesByPrefix[entry.containerIDPrefix] = entry + log.Printf("Error gathering weave info: %s", err) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } else { + backoff = initialBackoff } - w.mtx.Lock() - w.ps = psEntriesByPrefix - w.mtx.Unlock() - select { + case <-time.After(backoff): case <-w.quit: return - case <-tick: } } } -// Tick implements Ticker -func (w *Weave) Tick() error { - req, err := http.NewRequest("GET", w.url, nil) - if err != nil { - return err - } - req.Header.Add("Accept", "application/json") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode) - } - - var result weaveStatus - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return err - } - - w.mtx.Lock() - defer w.mtx.Unlock() - w.status = result - return nil -} - type psEntry struct { containerIDPrefix string macAddress string ips []string } -func (w *Weave) getPSEntries() ([]psEntry, error) { - var result []psEntry +func (w *Weave) ps() error { cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() if err != nil { - return result, err + return err } if err := cmd.Start(); err != nil { - return result, err + return err } defer func() { if err := cmd.Wait(); err != nil { log.Printf("Weave tagger, cmd failed: %v", err) } }() + + psEntriesByPrefix := map[string]psEntry{} scanner := bufio.NewScanner(out) for scanner.Scan() { line := scanner.Text() @@ -180,9 +152,69 @@ func (w *Weave) getPSEntries() ([]psEntry, error) { for _, ipGroup := range ipMatch.FindAllStringSubmatch(groups[3], -1) { ips = append(ips, ipGroup[1]) } - result = append(result, psEntry{containerIDPrefix, macAddress, ips}) + psEntriesByPrefix[containerIDPrefix] = psEntry{containerIDPrefix, macAddress, ips} } - return result, scanner.Err() + if err := scanner.Err(); err != nil { + return err + } + + w.mtx.Lock() + defer w.mtx.Unlock() + w.psCache = psEntriesByPrefix + return nil +} + +func (w *Weave) status() error { + req, err := http.NewRequest("GET", w.url, nil) + if err != nil { + return err + } + req.Header.Add("Accept", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Got %d", resp.StatusCode) + } + + var result weaveStatus + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return err + } + + w.mtx.Lock() + defer w.mtx.Unlock() + w.statusCache = result + return nil +} + +func (w *Weave) loop() { + defer w.done.Done() + w.doWithBackoff(func() (err error) { + // If we fail to get info from weave + // we should wipe away stale data + defer func() { + if err != nil { + w.mtx.Lock() + defer w.mtx.Unlock() + w.statusCache = weaveStatus{} + w.psCache = map[string]psEntry{} + } + }() + + if err = w.ps(); err != nil { + return + } + + if err = w.status(); err != nil { + return + } + + return + }) } // Tag implements Tagger. @@ -191,7 +223,7 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) { defer w.mtx.RUnlock() // Put information from weaveDNS on the container nodes - for _, entry := range w.status.DNS.Entries { + for _, entry := range w.statusCache.DNS.Entries { if entry.Tombstone > 0 { continue } @@ -212,7 +244,7 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) { for id, node := range r.Container.Nodes { prefix, _ := node.Latest.Lookup(docker.ContainerID) prefix = prefix[:12] - entry, ok := w.ps[prefix] + entry, ok := w.psCache[prefix] if !ok { continue } @@ -235,7 +267,7 @@ func (w *Weave) Report() (report.Report, error) { defer w.mtx.RUnlock() r := report.MakeReport() - for _, peer := range w.status.Router.Peers { + for _, peer := range w.statusCache.Router.Peers { r.Overlay.AddNode(report.MakeOverlayNodeID(peer.Name), report.MakeNodeWith(map[string]string{ WeavePeerName: peer.Name, WeavePeerNickName: peer.NickName, diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index cc4d431da9..2a348f955f 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -5,20 +5,20 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/overlay" "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" testExec "github.com/weaveworks/scope/test/exec" ) func TestWeaveTaggerOverlayTopology(t *testing.T) { - wait := make(chan struct{}) oldExecCmd := exec.Command defer func() { exec.Command = oldExecCmd }() exec.Command = func(name string, args ...string) exec.Cmd { - close(wait) return testExec.NewMockCmdString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP)) } @@ -27,8 +27,12 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { w := overlay.NewWeave(mockHostID, s.URL) defer w.Stop() - w.Tick() - <-wait + + // Wait until the reporter reports some nodes + test.Poll(t, 300*time.Millisecond, 1, func() interface{} { + have, _ := w.Report() + return len(have.Overlay.Nodes) + }) { // Overlay node should include peer name and nickname diff --git a/prog/probe.go b/prog/probe.go index f8aa9ed1b7..ac43f68520 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -45,7 +45,7 @@ func probeMain() { kubernetesEnabled = flag.Bool("kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node") kubernetesAPI = flag.String("kubernetes.api", "", "Address of kubernetes master api") kubernetesInterval = flag.Duration("kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data") - weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router") + weaveRouterAddr = flag.String("weave.router.addr", "127.0.0.1:6784", "IP address & port of the Weave router") procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections") insecure = flag.Bool("insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers") @@ -151,7 +151,6 @@ func probeMain() { if *weaveRouterAddr != "" { weave := overlay.NewWeave(hostID, *weaveRouterAddr) defer weave.Stop() - p.AddTicker(weave) p.AddTagger(weave) p.AddReporter(weave) }