Skip to content

Commit

Permalink
Run the Weave integrations regardless of if weave is detected.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Feb 1, 2016
1 parent f64b561 commit dd19cac
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 69 deletions.
1 change: 0 additions & 1 deletion docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ done

if is_running $WEAVE_CONTAINER_NAME; then
container_ip $WEAVE_CONTAINER_NAME
PROBE_ARGS="$PROBE_ARGS -weave.router.addr=$CONTAINER_IP"
weave_expose

DOCKER_BRIDGE_IP=$(docker_bridge_ip)
Expand Down
156 changes: 94 additions & 62 deletions probe/overlay/weave.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// WeaveMACAddress is the key for the mac address of the container on the
// weave network, to be found in container node metadata
WeaveMACAddress = "weave_mac_address"

initialBackoff = 5 * time.Second
maxBackoff = 60 * time.Second
)

var weavePsMatch = regexp.MustCompile(`^([0-9a-f]{12}) ((?:[0-9a-f][0-9a-f]\:){5}(?:[0-9a-f][0-9a-f]))(.*)$`)
Expand All @@ -45,11 +48,12 @@ type Weave struct {
url string
hostID string

quit chan struct{}
done sync.WaitGroup
mtx sync.RWMutex
status weaveStatus
ps map[string]psEntry
quit chan struct{}
done sync.WaitGroup
mtx sync.RWMutex

statusCache weaveStatus
psCache map[string]psEntry
}

type weaveStatus struct {
Expand All @@ -73,10 +77,10 @@ type weaveStatus struct {
// address. The address should be an IP or FQDN, no port.
func NewWeave(hostID, weaveRouterAddress string) *Weave {
w := &Weave{
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
hostID: hostID,
quit: make(chan struct{}),
ps: map[string]psEntry{},
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
hostID: hostID,
quit: make(chan struct{}),
psCache: map[string]psEntry{},
}
w.done.Add(1)
go w.loop()
Expand All @@ -92,83 +96,51 @@ func (w *Weave) Stop() {
w.done.Wait()
}

func (w *Weave) loop() {
defer w.done.Done()
tick := time.Tick(5 * time.Second)
func (w *Weave) doWithBackoff(f func() error) {
backoff := initialBackoff

for {
psEntries, err := w.getPSEntries()
err := f()
if err != nil {
log.Printf("Error running weave ps: %v", err)
break
}

psEntriesByPrefix := map[string]psEntry{}
for _, entry := range psEntries {
psEntriesByPrefix[entry.containerIDPrefix] = entry
log.Printf("Error gathering weave info: %s", err)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
} else {
backoff = initialBackoff
}

w.mtx.Lock()
w.ps = psEntriesByPrefix
w.mtx.Unlock()

select {
case <-time.After(backoff):
case <-w.quit:
return
case <-tick:
}
}
}

// Tick implements Ticker
func (w *Weave) Tick() error {
req, err := http.NewRequest("GET", w.url, nil)
if err != nil {
return err
}
req.Header.Add("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
}

var result weaveStatus
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}

w.mtx.Lock()
defer w.mtx.Unlock()
w.status = result
return nil
}

type psEntry struct {
containerIDPrefix string
macAddress string
ips []string
}

func (w *Weave) getPSEntries() ([]psEntry, error) {
var result []psEntry
func (w *Weave) ps() error {
cmd := exec.Command("weave", "--local", "ps")
out, err := cmd.StdoutPipe()
if err != nil {
return result, err
return err
}
if err := cmd.Start(); err != nil {
return result, err
return err
}
defer func() {
if err := cmd.Wait(); err != nil {
log.Printf("Weave tagger, cmd failed: %v", err)
}
}()

psEntriesByPrefix := map[string]psEntry{}
scanner := bufio.NewScanner(out)
for scanner.Scan() {
line := scanner.Text()
Expand All @@ -180,9 +152,69 @@ func (w *Weave) getPSEntries() ([]psEntry, error) {
for _, ipGroup := range ipMatch.FindAllStringSubmatch(groups[3], -1) {
ips = append(ips, ipGroup[1])
}
result = append(result, psEntry{containerIDPrefix, macAddress, ips})
psEntriesByPrefix[containerIDPrefix] = psEntry{containerIDPrefix, macAddress, ips}
}
return result, scanner.Err()
if err := scanner.Err(); err != nil {
return err
}

w.mtx.Lock()
defer w.mtx.Unlock()
w.psCache = psEntriesByPrefix
return nil
}

func (w *Weave) status() error {
req, err := http.NewRequest("GET", w.url, nil)
if err != nil {
return err
}
req.Header.Add("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Got %d", resp.StatusCode)
}

var result weaveStatus
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}

w.mtx.Lock()
defer w.mtx.Unlock()
w.statusCache = result
return nil
}

func (w *Weave) loop() {
defer w.done.Done()
w.doWithBackoff(func() (err error) {
// If we fail to get info from weave
// we should wipe away stale data
defer func() {
if err != nil {
w.mtx.Lock()
defer w.mtx.Unlock()
w.statusCache = weaveStatus{}
w.psCache = map[string]psEntry{}
}
}()

if err = w.ps(); err != nil {
return
}

if err = w.status(); err != nil {
return
}

return
})
}

// Tag implements Tagger.
Expand All @@ -191,7 +223,7 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) {
defer w.mtx.RUnlock()

// Put information from weaveDNS on the container nodes
for _, entry := range w.status.DNS.Entries {
for _, entry := range w.statusCache.DNS.Entries {
if entry.Tombstone > 0 {
continue
}
Expand All @@ -212,7 +244,7 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) {
for id, node := range r.Container.Nodes {
prefix, _ := node.Latest.Lookup(docker.ContainerID)
prefix = prefix[:12]
entry, ok := w.ps[prefix]
entry, ok := w.psCache[prefix]
if !ok {
continue
}
Expand All @@ -235,7 +267,7 @@ func (w *Weave) Report() (report.Report, error) {
defer w.mtx.RUnlock()

r := report.MakeReport()
for _, peer := range w.status.Router.Peers {
for _, peer := range w.statusCache.Router.Peers {
r.Overlay.AddNode(report.MakeOverlayNodeID(peer.Name), report.MakeNodeWith(map[string]string{
WeavePeerName: peer.Name,
WeavePeerNickName: peer.NickName,
Expand Down
12 changes: 8 additions & 4 deletions probe/overlay/weave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
testExec "github.com/weaveworks/scope/test/exec"
)

func TestWeaveTaggerOverlayTopology(t *testing.T) {
wait := make(chan struct{})
oldExecCmd := exec.Command
defer func() { exec.Command = oldExecCmd }()
exec.Command = func(name string, args ...string) exec.Cmd {
close(wait)
return testExec.NewMockCmdString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP))
}

Expand All @@ -27,8 +27,12 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {

w := overlay.NewWeave(mockHostID, s.URL)
defer w.Stop()
w.Tick()
<-wait

// Wait until the reporter reports some nodes
test.Poll(t, 300*time.Millisecond, 1, func() interface{} {
have, _ := w.Report()
return len(have.Overlay.Nodes)
})

{
// Overlay node should include peer name and nickname
Expand Down
3 changes: 1 addition & 2 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func probeMain() {
kubernetesEnabled = flag.Bool("kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node")
kubernetesAPI = flag.String("kubernetes.api", "", "Address of kubernetes master api")
kubernetesInterval = flag.Duration("kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
weaveRouterAddr = flag.String("weave.router.addr", "127.0.0.1:6784", "IP address & port of the Weave router")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections")
insecure = flag.Bool("insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers")
Expand Down Expand Up @@ -151,7 +151,6 @@ func probeMain() {
if *weaveRouterAddr != "" {
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
defer weave.Stop()
p.AddTicker(weave)
p.AddTagger(weave)
p.AddReporter(weave)
}
Expand Down

0 comments on commit dd19cac

Please sign in to comment.