From 1f752196d76d31c6d1cf7bdb3d907e3444f1e6aa Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 27 Jan 2016 20:40:49 -0800 Subject: [PATCH] Have the app periodically register with weaveDNS, and the probe do lookups there. --- app/weave.go | 149 ++++++++++++++++++ app/weave_test.go | 103 +++++++++++++ common/backoff/backoff.go | 90 +++++++++++ common/exec/exec.go | 2 + common/weave/client.go | 171 +++++++++++++++++++++ common/weave/client_test.go | 152 ++++++++++++++++++ docker/entrypoint.sh | 89 +---------- probe/appclient/resolver.go | 45 ++++-- probe/appclient/resolver_internal_test.go | 6 +- probe/overlay/weave.go | 179 +++++----------------- probe/overlay/weave_test.go | 83 +++------- prog/app.go | 36 +++++ prog/probe.go | 67 ++++++-- test/exec/exec.go | 8 + test/weave/mock.go | 71 +++++++++ 15 files changed, 930 insertions(+), 321 deletions(-) create mode 100644 app/weave.go create mode 100644 app/weave_test.go create mode 100644 common/backoff/backoff.go create mode 100644 common/weave/client.go create mode 100644 common/weave/client_test.go create mode 100644 test/weave/mock.go diff --git a/app/weave.go b/app/weave.go new file mode 100644 index 0000000000..3ae1b40d5c --- /dev/null +++ b/app/weave.go @@ -0,0 +1,149 @@ +package app + +import ( + "fmt" + "net" + "strings" + + fsouza "github.com/fsouza/go-dockerclient" + + "github.com/weaveworks/scope/common/backoff" +) + +// Default values for weave app integration +const ( + DefaultHostname = "scope.weave.local." + DefaultWeaveURL = "http://127.0.0.1:6784" + DefaultContainerName = "weavescope" + DefaultDockerEndpoint = "unix:///var/run/docker.sock" +) + +// WeavePublisher is a thing which periodically registers this app with WeaveDNS. +type WeavePublisher struct { + containerName string + hostname string + dockerClient DockerClient + weaveClient WeaveClient + backoff backoff.Interface + interfaces InterfaceFunc +} + +// DockerClient is the little bit of the docker client we need. +type DockerClient interface { + ListContainers(fsouza.ListContainersOptions) ([]fsouza.APIContainers, error) +} + +// WeaveClient is the little bit of the weave clent we need. +type WeaveClient interface { + AddDNSEntry(hostname, containerid string, ip net.IP) error + Expose() error +} + +// Interface is because net.Interface isn't mockable. +type Interface struct { + Name string + Addrs []net.Addr +} + +// InterfaceFunc is the type of Interfaces() +type InterfaceFunc func() ([]Interface, error) + +// Interfaces returns the list of Interfaces on the machine. +func Interfaces() ([]Interface, error) { + ifaces, err := net.Interfaces() + if err != nil { + return nil, err + } + result := []Interface{} + for _, i := range ifaces { + addrs, err := i.Addrs() + if err != nil { + continue + } + result = append(result, Interface{ + Name: i.Name, + Addrs: addrs, + }) + } + return result, nil +} + +// NewWeavePublisher makes a new Weave. +func NewWeavePublisher(weaveClient WeaveClient, dockerClient DockerClient, interfaces InterfaceFunc, hostname, containerName string) *WeavePublisher { + w := &WeavePublisher{ + containerName: containerName, + hostname: hostname, + dockerClient: dockerClient, + weaveClient: weaveClient, + interfaces: interfaces, + } + w.backoff = backoff.New(w.updateDNS, "updating weaveDNS") + go w.backoff.Start() + return w +} + +// Stop the Weave. +func (w *WeavePublisher) Stop() { + w.backoff.Stop() +} + +func (w *WeavePublisher) updateDNS() (bool, error) { + // 0. expose this host + if err := w.weaveClient.Expose(); err != nil { + return false, err + } + + // 1. work out my IP addresses + ifaces, err := w.interfaces() + if err != nil { + return false, err + } + ips := []net.IP{} + for _, i := range ifaces { + if strings.HasPrefix(i.Name, "lo") || + strings.HasPrefix(i.Name, "docker") || + strings.HasPrefix(i.Name, "veth") { + continue + } + + for _, addr := range i.Addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPAddr: + ip = v.IP + case *net.IPNet: + ip = v.IP + } + if ip != nil && ip.To4() != nil { + ips = append(ips, ip) + } + } + } + + // 2. work out my container name + containers, err := w.dockerClient.ListContainers(fsouza.ListContainersOptions{}) + if err != nil { + return false, err + } + containerID := "" +outer: + for _, container := range containers { + for _, name := range container.Names { + if name == "/"+w.containerName { + containerID = container.ID + break outer + } + } + } + if containerID == "" { + return false, fmt.Errorf("Container %s not found", w.containerName) + } + + // 3. Register these with weave dns + for _, ip := range ips { + if err := w.weaveClient.AddDNSEntry(w.hostname, containerID, ip); err != nil { + return false, err + } + } + return false, nil +} diff --git a/app/weave_test.go b/app/weave_test.go new file mode 100644 index 0000000000..2d9e88a2af --- /dev/null +++ b/app/weave_test.go @@ -0,0 +1,103 @@ +package app_test + +import ( + "net" + "sync" + "testing" + "time" + + fsouza "github.com/fsouza/go-dockerclient" + + "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/test" +) + +type mockDockerClient struct{} + +func (mockDockerClient) ListContainers(fsouza.ListContainersOptions) ([]fsouza.APIContainers, error) { + return []fsouza.APIContainers{ + { + Names: []string{"/" + containerName}, + ID: containerID, + }, + { + Names: []string{"/notme"}, + ID: "1234abcd", + }, + }, nil +} + +type entry struct { + containerid string + ip net.IP +} + +type mockWeaveClient struct { + sync.Mutex + published map[string]entry +} + +func (m *mockWeaveClient) AddDNSEntry(hostname, containerid string, ip net.IP) error { + m.Lock() + defer m.Unlock() + m.published[hostname] = entry{containerid, ip} + return nil +} + +func (m *mockWeaveClient) Expose() error { + return nil +} + +const ( + hostname = "foo.weave" + containerName = "bar" + containerID = "a1b2c3d4" +) + +var ( + ip = net.ParseIP("1.2.3.4") +) + +func TestWeave(t *testing.T) { + weaveClient := &mockWeaveClient{ + published: map[string]entry{}, + } + dockerClient := mockDockerClient{} + interfaces := func() ([]app.Interface, error) { + return []app.Interface{ + { + Name: "eth0", + Addrs: []net.Addr{ + &net.IPAddr{ + IP: ip, + }, + }, + }, + { + Name: "docker0", + Addrs: []net.Addr{ + &net.IPAddr{ + IP: net.ParseIP("4.3.2.1"), + }, + }, + }, + }, nil + } + publisher := app.NewWeavePublisher( + weaveClient, dockerClient, interfaces, + hostname, containerName) + defer publisher.Stop() + + want := map[string]entry{ + hostname: {containerID, ip}, + } + test.Poll(t, 100*time.Millisecond, want, func() interface{} { + weaveClient.Lock() + defer weaveClient.Unlock() + result := map[string]entry{} + for k, v := range weaveClient.published { + result[k] = v + } + return result + }) +} diff --git a/common/backoff/backoff.go b/common/backoff/backoff.go new file mode 100644 index 0000000000..6349147a63 --- /dev/null +++ b/common/backoff/backoff.go @@ -0,0 +1,90 @@ +package backoff + +import ( + "log" + "time" +) + +type backoff struct { + f func() (bool, error) + quit, done chan struct{} + msg string + initialBackoff, maxBackoff time.Duration +} + +// Interface does f in a loop, sleeping for initialBackoff between +// each iterations. If it hits an error, it exponentially backs +// off to maxBackoff. Backoff will log when it backs off, but +// will stop logging when it reaches maxBackoff. It will also +// log on first success. +type Interface interface { + Start() + Stop() + SetInitialBackoff(time.Duration) +} + +// New makes a new Interface +func New(f func() (bool, error), msg string) Interface { + return &backoff{ + f: f, + quit: make(chan struct{}), + done: make(chan struct{}), + msg: msg, + initialBackoff: 10 * time.Second, + maxBackoff: 60 * time.Second, + } +} + +func (b *backoff) SetInitialBackoff(d time.Duration) { + b.initialBackoff = d +} + +// Stop the backoff, and waits for it to stop. +func (b *backoff) Stop() { + close(b.quit) + <-b.done +} + +// Start the backoff. Can only be called once. +func (b *backoff) Start() { + defer close(b.done) + backoff := b.initialBackoff + shouldLog := true + + for { + done, err := b.f() + if done { + return + } + + if err != nil { + backoff *= 2 + if backoff > b.maxBackoff { + backoff = b.maxBackoff + } + } else if backoff > b.initialBackoff { + backoff = b.initialBackoff + shouldLog = true + } + + if shouldLog { + if err != nil { + log.Printf("Error %s, backing off %s: %s", + b.msg, backoff, err) + } else { + log.Printf("Success %s", b.msg) + } + } + + if backoff >= b.maxBackoff || err == nil { + shouldLog = false + } + + select { + case <-time.After(backoff): + case <-b.quit: + return + } + } + +} diff --git a/common/exec/exec.go b/common/exec/exec.go index 6514578626..fc0b27ad1a 100644 --- a/common/exec/exec.go +++ b/common/exec/exec.go @@ -12,6 +12,8 @@ type Cmd interface { Start() error Wait() error Kill() error + Output() ([]byte, error) + Run() error } // Command is a hook for mocking diff --git a/common/weave/client.go b/common/weave/client.go new file mode 100644 index 0000000000..fd735f06ca --- /dev/null +++ b/common/weave/client.go @@ -0,0 +1,171 @@ +package weave + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "log" + "net" + "net/http" + "net/url" + "regexp" + "strconv" + + "github.com/weaveworks/scope/common/exec" +) + +// Client for Weave Net API +type Client interface { + Status() (Status, error) + AddDNSEntry(fqdn, containerid string, ip net.IP) error + PS() (map[string]PSEntry, error) // on the interface for mocking + Expose() error // on the interface for mocking +} + +// Status describes whats happen in the Weave Net router. +type Status struct { + Router Router + DNS DNS +} + +// Router describes the status of the Weave Router +type Router struct { + Peers []struct { + Name string + NickName string + } +} + +// DNS descirbes the status of Weave DNS +type DNS struct { + Entries []struct { + Hostname string + ContainerID string + Tombstone int64 + } +} + +var weavePsMatch = regexp.MustCompile(`^([0-9a-f]{12}) ((?:[0-9a-f][0-9a-f]\:){5}(?:[0-9a-f][0-9a-f]))(.*)$`) +var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(/[0-9]+)`) + +// PSEntry is a row from the output of `weave ps` +type PSEntry struct { + ContainerIDPrefix string + MACAddress string + IPs []string +} + +type client struct { + url string +} + +// NewClient makes a new Client +func NewClient(url string) Client { + return &client{ + url: url, + } +} + +func (c *client) Status() (Status, error) { + req, err := http.NewRequest("GET", c.url+"/report", nil) + if err != nil { + return Status{}, err + } + req.Header.Add("Accept", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return Status{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return Status{}, fmt.Errorf("Got %d", resp.StatusCode) + } + + var status Status + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return Status{}, err + } + return status, nil +} + +func (c *client) AddDNSEntry(fqdn, containerID string, ip net.IP) error { + data := url.Values{ + "fqdn": []string{fqdn}, + } + url := fmt.Sprintf("%s/name/%s/%s", c.url, containerID, ip.String()) + req, err := http.NewRequest("PUT", url, bytes.NewBufferString(data.Encode())) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if err := resp.Body.Close(); err != nil { + return err + } + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("Got %d", resp.StatusCode) + } + return nil +} + +func (c *client) PS() (map[string]PSEntry, error) { + cmd := exec.Command("weave", "--local", "ps") + out, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + if err := cmd.Start(); err != nil { + return nil, err + } + defer func() { + if err := cmd.Wait(); err != nil { + log.Printf("'weave ps' cmd failed: %v", err) + } + }() + + psEntriesByPrefix := map[string]PSEntry{} + scanner := bufio.NewScanner(out) + for scanner.Scan() { + line := scanner.Text() + groups := weavePsMatch.FindStringSubmatch(line) + if len(groups) == 0 { + continue + } + containerIDPrefix, macAddress, ips := groups[1], groups[2], []string{} + for _, ipGroup := range ipMatch.FindAllStringSubmatch(groups[3], -1) { + ips = append(ips, ipGroup[1]) + } + psEntriesByPrefix[containerIDPrefix] = PSEntry{ + ContainerIDPrefix: containerIDPrefix, + MACAddress: macAddress, + IPs: ips, + } + } + if err := scanner.Err(); err != nil { + return nil, err + } + + return psEntriesByPrefix, nil +} + +func (c *client) Expose() error { + output, err := exec.Command("weave", "--local", "ps", "weave:expose").Output() + if err != nil { + return err + } + ips := ipMatch.FindAllSubmatch(output, -1) + if ips != nil { + // Alread exposed! + return nil + } + if err := exec.Command("weave", "expose").Run(); err != nil { + return fmt.Errorf("Error running weave expose: %v", err) + } + return nil +} diff --git a/common/weave/client_test.go b/common/weave/client_test.go new file mode 100644 index 0000000000..01128e969b --- /dev/null +++ b/common/weave/client_test.go @@ -0,0 +1,152 @@ +package weave_test + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "sync" + "testing" + + "github.com/weaveworks/scope/common/exec" + "github.com/weaveworks/scope/common/weave" + "github.com/weaveworks/scope/test" + testExec "github.com/weaveworks/scope/test/exec" +) + +const ( + mockHostID = "host1" + mockWeavePeerName = "winnebago" + mockWeavePeerNickName = "winny" + mockContainerID = "83183a667c01" + mockContainerMAC = "d6:f2:5a:12:36:a8" + mockContainerIP = "10.0.0.123" + mockContainerIPWithScope = ";10.0.0.123" + mockHostname = "hostname.weave.local" +) + +var ( + mockResponse = fmt.Sprintf(`{ + "Router": { + "Peers": [{ + "Name": "%s", + "Nickname": "%s" + }] + }, + "DNS": { + "Entries": [{ + "ContainerID": "%s", + "Hostname": "%s.", + "Tombstone": 0 + }] + } + }`, mockWeavePeerName, mockWeavePeerNickName, mockContainerID, mockHostname) + mockIP = net.ParseIP("1.2.3.4") +) + +func mockWeaveRouter(w http.ResponseWriter, r *http.Request) { + if _, err := w.Write([]byte(mockResponse)); err != nil { + panic(err) + } +} + +func TestStatus(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter)) + defer s.Close() + + client := weave.NewClient(s.URL) + status, err := client.Status() + if err != nil { + t.Fatal(err) + } + + want := weave.Status{ + Router: weave.Router{ + Peers: []struct { + Name string + NickName string + }{ + { + Name: mockWeavePeerName, + NickName: mockWeavePeerNickName, + }, + }, + }, + DNS: weave.DNS{ + Entries: []struct { + Hostname string + ContainerID string + Tombstone int64 + }{ + { + Hostname: mockHostname + ".", + ContainerID: mockContainerID, + Tombstone: 0, + }, + }, + }, + } + if !reflect.DeepEqual(status, want) { + t.Fatal(test.Diff(status, want)) + } +} + +type entry struct { + containerid string + ip net.IP +} + +func TestDNSAdd(t *testing.T) { + mtx := sync.Mutex{} + published := map[string]entry{} + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mtx.Lock() + defer mtx.Unlock() + parts := strings.SplitN(r.URL.Path, "/", 4) + containerID, ip := parts[2], net.ParseIP(parts[3]) + fqdn := r.FormValue("fqdn") + published[fqdn] = entry{containerID, ip} + w.WriteHeader(http.StatusNoContent) + })) + defer s.Close() + + client := weave.NewClient(s.URL) + err := client.AddDNSEntry(mockHostname, mockContainerID, mockIP) + if err != nil { + t.Fatal(err) + } + + want := map[string]entry{ + mockHostname: {mockContainerID, mockIP}, + } + if !reflect.DeepEqual(published, want) { + t.Fatal(test.Diff(published, want)) + } +} + +func TestPS(t *testing.T) { + oldExecCmd := exec.Command + defer func() { exec.Command = oldExecCmd }() + exec.Command = func(name string, args ...string) exec.Cmd { + return testExec.NewMockCmdString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP)) + } + + client := weave.NewClient("") + entries, err := client.PS() + if err != nil { + t.Fatal(err) + } + + want := map[string]weave.PSEntry{ + mockContainerID: { + ContainerIDPrefix: mockContainerID, + MACAddress: mockContainerMAC, + IPs: []string{mockContainerIP}, + }, + } + if !reflect.DeepEqual(entries, want) { + t.Fatal(test.Diff(entries, want)) + } +} diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 574a13956f..db058bd211 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -5,65 +5,6 @@ usage() { exit 1 } -# This script exists to modify the network settings in the scope containers -# as docker doesn't allow it when started with --net=host - -WEAVE_CONTAINER_NAME=weave -DOCKER_BRIDGE=docker0 -HOSTNAME=scope -DOMAIN=weave.local -IP_REGEXP="[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}" - -container_ip() { - if ! status=$(docker inspect --format='{{.State.Running}} {{.HostConfig.NetworkMode}}' $1 2>/dev/null); then - echo "Container $1 not found" >&2 - return 1 - fi - case "$status" in - "true host") - CONTAINER_IP="127.0.0.1" - return 0 - ;; - "true default" | "true bridge") - CONTAINER_IP="$(docker inspect --format='{{.NetworkSettings.IPAddress}}' $1 2>/dev/null)" - return 0 - ;; - *) - echo "Container $1 not running" >&2 - return 1 - ;; - esac -} - -is_running() { - status=$(docker inspect --format='{{.State.Running}}' $1 2>/dev/null) && [ "$status" = "true" ] - return $? -} - -docker_bridge_ip() { - local DOCKER_BRIDGE_IP=$(ip -f inet address show dev $DOCKER_BRIDGE | grep -m1 -o 'inet \([.0-9]\)*') - echo ${DOCKER_BRIDGE_IP#inet } -} - -# Run `weave` in the weave exec container -weave() { - WEAVEXEC_IMAGE=$(docker inspect --format='{{.Config.Image}}' weave | sed 's/\/weave/\/weaveexec/') - docker run -t --rm --privileged --net=host \ - -v /var/run/docker.sock:/var/run/docker.sock \ - -v /proc:/hostproc \ - -e PROCFS=/hostproc \ - $WEAVEXEC_IMAGE --local "$@" -} - -# Run `weave expose` if it's not already exposed. -weave_expose() { - status=$(weave ps weave:expose | awk '{print $3}' 2>/dev/null) - if [ "$status" = "" ]; then - echo "Exposing host to weave network." - weave expose - fi -} - mkdir -p /etc/weave APP_ARGS="" PROBE_ARGS="" @@ -112,10 +53,10 @@ while true; do TOKEN_PROVIDED=true touch /etc/service/app/down ;; - --no-app) + --no-app|--app-only) touch /etc/service/app/down ;; - --no-probe) + --no-probe|--probe-only) touch /etc/service/probe/down ;; *) @@ -125,30 +66,6 @@ while true; do shift done -if is_running $WEAVE_CONTAINER_NAME; then - container_ip $WEAVE_CONTAINER_NAME - weave_expose - - DOCKER_BRIDGE_IP=$(docker_bridge_ip) - echo "Weave container detected at $CONTAINER_IP, Docker bridge at $DOCKER_BRIDGE_IP" - - echo "domain $DOMAIN" >/etc/resolv.conf - echo "search $DOMAIN" >>/etc/resolv.conf - echo "nameserver $DOCKER_BRIDGE_IP" >>/etc/resolv.conf - - IP_ADDRS=$(find /sys/class/net -type l | xargs -n1 basename | grep -vE 'docker|veth|lo' | \ - xargs -n1 ip addr show | grep inet | awk '{ print $2 }' | grep -oE "$IP_REGEXP") - CONTAINER=$(docker inspect --format='{{.Id}}' weavescope) - if [ -z "$IP_ADDRS" ]; then - echo "Could not determine local IP address; Weave DNS integration will not work correctly." - exit 1 - else - for ip in $IP_ADDRS; do - weave dns-add $ip $CONTAINER -h $HOSTNAME.$DOMAIN - done - fi -fi - echo "$APP_ARGS" >/etc/weave/scope-app.args echo "$PROBE_ARGS" >/etc/weave/scope-probe.args @@ -166,6 +83,4 @@ fi echo "$MANUAL_APPS" >>/etc/weave/apps - exec /home/weave/runsvinit - diff --git a/probe/appclient/resolver.go b/probe/appclient/resolver.go index f37909adae..c707bf8975 100644 --- a/probe/appclient/resolver.go +++ b/probe/appclient/resolver.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/miekg/dns" + "github.com/weaveworks/scope/common/xfer" ) @@ -15,8 +17,7 @@ const ( ) var ( - tick = time.Tick - lookupIP = net.LookupIP + tick = time.Tick ) type setter func(string, []string) @@ -30,25 +31,49 @@ type staticResolver struct { setters []setter targets []target quit chan struct{} + lookup LookupIP } +// LookupIP type is used for looking up IPs. +type LookupIP func(host string) (ips []net.IP, err error) + type target struct{ host, port string } func (t target) String() string { return net.JoinHostPort(t.host, t.port) } -// NewStaticResolver periodically resolves the targets, and calls the set +// NewResolver periodically resolves the targets, and calls the set // function with all the resolved IPs. It explictiy supports targets which -// resolve to multiple IPs. -func NewStaticResolver(targets []string, setters ...setter) Resolver { +// resolve to multiple IPs. It uses the supplied DNS server name. +func NewResolver(targets []string, lookup LookupIP, setters ...setter) Resolver { r := staticResolver{ targets: prepare(targets), setters: setters, quit: make(chan struct{}), + lookup: lookup, } go r.loop() return r } +// LookupUsing produces a LookupIP function for the given DNS server. +func LookupUsing(dnsServer string) func(host string) (ips []net.IP, err error) { + return func(host string) (ips []net.IP, err error) { + m := &dns.Msg{} + m.SetQuestion(dns.Fqdn(host), dns.TypeA) + in, err := dns.Exchange(m, dnsServer) + if err != nil { + return nil, err + } + result := []net.IP{} + for _, answer := range in.Answer { + if a, ok := answer.(*dns.A); ok { + result = append(result, a.A) + } + } + return result, nil + } +} + func (r staticResolver) loop() { r.resolve() t := tick(dnsPollInterval) @@ -86,28 +111,28 @@ func prepare(strs []string) []target { } func (r staticResolver) resolve() { - for t, endpoints := range resolveMany(r.targets) { + for t, endpoints := range r.resolveMany(r.targets) { for _, setter := range r.setters { setter(t.String(), endpoints) } } } -func resolveMany(targets []target) map[target][]string { +func (r staticResolver) resolveMany(targets []target) map[target][]string { result := map[target][]string{} for _, t := range targets { - result[t] = resolveOne(t) + result[t] = r.resolveOne(t) } return result } -func resolveOne(t target) []string { +func (r staticResolver) resolveOne(t target) []string { var addrs []net.IP if addr := net.ParseIP(t.host); addr != nil { addrs = []net.IP{addr} } else { var err error - addrs, err = lookupIP(t.host) + addrs, err = r.lookup(t.host) if err != nil { return []string{} } diff --git a/probe/appclient/resolver_internal_test.go b/probe/appclient/resolver_internal_test.go index f86abfd00a..a3b2397379 100644 --- a/probe/appclient/resolver_internal_test.go +++ b/probe/appclient/resolver_internal_test.go @@ -17,11 +17,9 @@ func TestResolver(t *testing.T) { c := make(chan time.Time) tick = func(_ time.Duration) <-chan time.Time { return c } - oldLookupIP := lookupIP - defer func() { lookupIP = oldLookupIP }() ipsLock := sync.Mutex{} ips := map[string][]net.IP{} - lookupIP = func(host string) ([]net.IP, error) { + lookupIP := func(host string) ([]net.IP, error) { ipsLock.Lock() defer ipsLock.Unlock() addrs, ok := ips[host] @@ -46,7 +44,7 @@ func TestResolver(t *testing.T) { } } - r := NewStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set) + r := NewResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, lookupIP, set) assertAdd := func(want ...string) { remaining := map[string]struct{}{} diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index c366c55579..f908bd535f 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -1,18 +1,12 @@ package overlay import ( - "bufio" - "encoding/json" - "fmt" - "log" - "net/http" - "regexp" "strings" "sync" "time" - "github.com/weaveworks/scope/common/exec" - "github.com/weaveworks/scope/common/sanitize" + "github.com/weaveworks/scope/common/backoff" + "github.com/weaveworks/scope/common/weave" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) @@ -31,59 +25,36 @@ const ( // WeaveMACAddress is the key for the mac address of the container on the // weave network, to be found in container node metadata WeaveMACAddress = "weave_mac_address" - - initialBackoff = 5 * time.Second - maxBackoff = 60 * time.Second ) -var weavePsMatch = regexp.MustCompile(`^([0-9a-f]{12}) ((?:[0-9a-f][0-9a-f]\:){5}(?:[0-9a-f][0-9a-f]))(.*)$`) -var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(/[0-9]+)`) - // Weave represents a single Weave router, presumably on the same host // as the probe. It is both a Reporter and a Tagger: it produces an Overlay // topology, and (in theory) can tag existing topologies with foreign keys to // overlay -- though I'm not sure what that would look like in practice right // now. type Weave struct { - url string + client weave.Client hostID string - quit chan struct{} - done sync.WaitGroup - mtx sync.RWMutex - - statusCache weaveStatus - psCache map[string]psEntry -} - -type weaveStatus struct { - Router struct { - Peers []struct { - Name string - NickName string - } - } + mtx sync.RWMutex + statusCache weave.Status + psCache map[string]weave.PSEntry - DNS struct { - Entries []struct { - Hostname string - ContainerID string - Tombstone int64 - } - } + backoff backoff.Interface } // NewWeave returns a new Weave tagger based on the Weave router at // address. The address should be an IP or FQDN, no port. -func NewWeave(hostID, weaveRouterAddress string) *Weave { +func NewWeave(hostID string, client weave.Client) *Weave { w := &Weave{ - url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), + client: client, hostID: hostID, - quit: make(chan struct{}), - psCache: map[string]psEntry{}, + psCache: map[string]weave.PSEntry{}, } - w.done.Add(1) - go w.loop() + + w.backoff = backoff.New(w.collect, "collecting weave info") + w.backoff.SetInitialBackoff(5 * time.Second) + go w.backoff.Start() return w } @@ -92,71 +63,36 @@ func (*Weave) Name() string { return "Weave" } // Stop gathering weave ps output. func (w *Weave) Stop() { - close(w.quit) - w.done.Wait() + w.backoff.Stop() } -func (w *Weave) doWithBackoff(f func() error) { - backoff := initialBackoff - - for { - err := f() +func (w *Weave) collect() (done bool, err error) { + // If we fail to get info from weave + // we should wipe away stale data + defer func() { if err != nil { - log.Printf("Error gathering weave info: %s", err) - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } else { - backoff = initialBackoff + w.mtx.Lock() + defer w.mtx.Unlock() + w.statusCache = weave.Status{} + w.psCache = map[string]weave.PSEntry{} } + }() - select { - case <-time.After(backoff): - case <-w.quit: - return - } + if err = w.ps(); err != nil { + return + } + if err = w.status(); err != nil { + return } -} -type psEntry struct { - containerIDPrefix string - macAddress string - ips []string + return } func (w *Weave) ps() error { - cmd := exec.Command("weave", "--local", "ps") - out, err := cmd.StdoutPipe() + psEntriesByPrefix, err := w.client.PS() if err != nil { return err } - if err := cmd.Start(); err != nil { - return err - } - defer func() { - if err := cmd.Wait(); err != nil { - log.Printf("Weave tagger, cmd failed: %v", err) - } - }() - - psEntriesByPrefix := map[string]psEntry{} - scanner := bufio.NewScanner(out) - for scanner.Scan() { - line := scanner.Text() - groups := weavePsMatch.FindStringSubmatch(line) - if len(groups) == 0 { - continue - } - containerIDPrefix, macAddress, ips := groups[1], groups[2], []string{} - for _, ipGroup := range ipMatch.FindAllStringSubmatch(groups[3], -1) { - ips = append(ips, ipGroup[1]) - } - psEntriesByPrefix[containerIDPrefix] = psEntry{containerIDPrefix, macAddress, ips} - } - if err := scanner.Err(); err != nil { - return err - } w.mtx.Lock() defer w.mtx.Unlock() @@ -165,58 +101,17 @@ func (w *Weave) ps() error { } func (w *Weave) status() error { - req, err := http.NewRequest("GET", w.url, nil) - if err != nil { - return err - } - req.Header.Add("Accept", "application/json") - resp, err := http.DefaultClient.Do(req) + status, err := w.client.Status() if err != nil { return err } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("Got %d", resp.StatusCode) - } - - var result weaveStatus - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return err - } w.mtx.Lock() defer w.mtx.Unlock() - w.statusCache = result + w.statusCache = status return nil } -func (w *Weave) loop() { - defer w.done.Done() - w.doWithBackoff(func() (err error) { - // If we fail to get info from weave - // we should wipe away stale data - defer func() { - if err != nil { - w.mtx.Lock() - defer w.mtx.Unlock() - w.statusCache = weaveStatus{} - w.psCache = map[string]psEntry{} - } - }() - - if err = w.ps(); err != nil { - return - } - - if err = w.status(); err != nil { - return - } - - return - }) -} - // Tag implements Tagger. func (w *Weave) Tag(r report.Report) (report.Report, error) { w.mtx.RLock() @@ -248,12 +143,14 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) { } ipsWithScope := report.MakeStringSet() - for _, ip := range entry.ips { + for _, ip := range entry.IPs { ipsWithScope = ipsWithScope.Add(report.MakeAddressNodeID("", ip)) } - node = node.WithSet(docker.ContainerIPs, report.MakeStringSet(entry.ips...)) + node = node.WithSet(docker.ContainerIPs, report.MakeStringSet(entry.IPs...)) node = node.WithSet(docker.ContainerIPsWithScopes, ipsWithScope) - node = node.WithLatests(map[string]string{WeaveMACAddress: entry.macAddress}) + node = node.WithLatests(map[string]string{ + WeaveMACAddress: entry.MACAddress, + }) r.Container.Nodes[id] = node } return r, nil diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index 2a348f955f..6b28c40b25 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -1,31 +1,23 @@ package overlay_test import ( - "fmt" - "net/http" - "net/http/httptest" "testing" "time" - "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/overlay" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" - testExec "github.com/weaveworks/scope/test/exec" + "github.com/weaveworks/scope/test/weave" ) -func TestWeaveTaggerOverlayTopology(t *testing.T) { - oldExecCmd := exec.Command - defer func() { exec.Command = oldExecCmd }() - exec.Command = func(name string, args ...string) exec.Cmd { - return testExec.NewMockCmdString(fmt.Sprintf("%s %s %s/24\n", mockContainerID, mockContainerMAC, mockContainerIP)) - } - - s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter)) - defer s.Close() +const ( + mockHostID = "host1" + mockContainerIPWithScope = ";" + weave.MockContainerIP +) - w := overlay.NewWeave(mockHostID, s.URL) +func TestWeaveTaggerOverlayTopology(t *testing.T) { + w := overlay.NewWeave(mockHostID, weave.MockClient{}) defer w.Stop() // Wait until the reporter reports some nodes @@ -41,25 +33,25 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { t.Fatal(err) } - nodeID := report.MakeOverlayNodeID(mockWeavePeerName) + nodeID := report.MakeOverlayNodeID(weave.MockWeavePeerName) node, ok := have.Overlay.Nodes[nodeID] if !ok { t.Errorf("Expected overlay node %q, but not found", nodeID) } - if peerName, ok := node.Latest.Lookup(overlay.WeavePeerName); !ok || peerName != mockWeavePeerName { - t.Errorf("Expected weave peer name %q, got %q", mockWeavePeerName, peerName) + if peerName, ok := node.Latest.Lookup(overlay.WeavePeerName); !ok || peerName != weave.MockWeavePeerName { + t.Errorf("Expected weave peer name %q, got %q", weave.MockWeavePeerName, peerName) } - if peerNick, ok := node.Latest.Lookup(overlay.WeavePeerNickName); !ok || peerNick != mockWeavePeerNickName { - t.Errorf("Expected weave peer nickname %q, got %q", mockWeavePeerNickName, peerNick) + if peerNick, ok := node.Latest.Lookup(overlay.WeavePeerNickName); !ok || peerNick != weave.MockWeavePeerNickName { + t.Errorf("Expected weave peer nickname %q, got %q", weave.MockWeavePeerNickName, peerNick) } } { // Container nodes should be tagged with their overlay info - nodeID := report.MakeContainerNodeID(mockContainerID) + nodeID := report.MakeContainerNodeID(weave.MockContainerID) have, err := w.Tag(report.Report{ Container: report.MakeTopology().AddNode(nodeID, report.MakeNodeWith(map[string]string{ - docker.ContainerID: mockContainerID, + docker.ContainerID: weave.MockContainerID, })), }) if err != nil { @@ -72,16 +64,16 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { } // Should have Weave DNS Hostname - if have, ok := node.Latest.Lookup(overlay.WeaveDNSHostname); !ok || have != mockHostname { - t.Errorf("Expected weave dns hostname %q, got %q", mockHostname, have) + if have, ok := node.Latest.Lookup(overlay.WeaveDNSHostname); !ok || have != weave.MockHostname { + t.Errorf("Expected weave dns hostname %q, got %q", weave.MockHostname, have) } // Should have Weave MAC Address - if have, ok := node.Latest.Lookup(overlay.WeaveMACAddress); !ok || have != mockContainerMAC { - t.Errorf("Expected weave mac address %q, got %q", mockContainerMAC, have) + if have, ok := node.Latest.Lookup(overlay.WeaveMACAddress); !ok || have != weave.MockContainerMAC { + t.Errorf("Expected weave mac address %q, got %q", weave.MockContainerMAC, have) } // Should have Weave container ip - if have, ok := node.Sets.Lookup(docker.ContainerIPs); !ok || !have.Contains(mockContainerIP) { - t.Errorf("Expected container ips to include the weave IP %q, got %q", mockContainerIP, have) + if have, ok := node.Sets.Lookup(docker.ContainerIPs); !ok || !have.Contains(weave.MockContainerIP) { + t.Errorf("Expected container ips to include the weave IP %q, got %q", weave.MockContainerIP, have) } // Should have Weave container ip (with scope) if have, ok := node.Sets.Lookup(docker.ContainerIPsWithScopes); !ok || !have.Contains(mockContainerIPWithScope) { @@ -89,38 +81,3 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { } } } - -const ( - mockHostID = "host1" - mockWeavePeerName = "winnebago" - mockWeavePeerNickName = "winny" - mockContainerID = "83183a667c01" - mockContainerMAC = "d6:f2:5a:12:36:a8" - mockContainerIP = "10.0.0.123" - mockContainerIPWithScope = ";10.0.0.123" - mockHostname = "hostname.weave.local" -) - -var ( - mockResponse = fmt.Sprintf(`{ - "Router": { - "Peers": [{ - "Name": "%s", - "Nickname": "%s" - }] - }, - "DNS": { - "Entries": [{ - "ContainerID": "%s", - "Hostname": "%s.", - "Tombstone": 0 - }] - } - }`, mockWeavePeerName, mockWeavePeerNickName, mockContainerID, mockHostname) -) - -func mockWeaveRouter(w http.ResponseWriter, r *http.Request) { - if _, err := w.Write([]byte(mockResponse)); err != nil { - panic(err) - } -} diff --git a/prog/app.go b/prog/app.go index 2251300e14..26d3eb65dd 100644 --- a/prog/app.go +++ b/prog/app.go @@ -14,7 +14,9 @@ import ( "github.com/weaveworks/weave/common" "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/common/weave" "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/probe/docker" ) // Router creates the mux for all the various app components. @@ -32,6 +34,11 @@ func appMain() { window = flag.Duration("window", 15*time.Second, "window") listen = flag.String("http.address", ":"+strconv.Itoa(xfer.AppPort), "webserver listen address") logPrefix = flag.String("log.prefix", "", "prefix for each log line") + + weaveAddr = flag.String("weave.addr", app.DefaultWeaveURL, "Address on which to contact WeaveDNS") + weaveHostname = flag.String("weave.hostname", app.DefaultHostname, "Hostname to advertise in WeaveDNS") + containerName = flag.String("container.name", app.DefaultContainerName, "Name of this container (to lookup container ID)") + dockerEndpoint = flag.String("docker", app.DefaultDockerEndpoint, "Location of docker endpoint (to lookup container ID)") ) flag.Parse() @@ -46,6 +53,20 @@ func appMain() { app.UniqueID = strconv.FormatInt(rand.Int63(), 16) app.Version = version log.Printf("app starting, version %s, ID %s", app.Version, app.UniqueID) + + // If user supplied a weave router address, periodically try and register + // out IP address in WeaveDNS. + if *weaveAddr != "" { + weave, err := newWeavePublisher( + *dockerEndpoint, *weaveAddr, + *weaveHostname, *containerName) + if err != nil { + log.Println("Failed to start weave integration:", err) + } else { + defer weave.Stop() + } + } + handler := router(app.NewCollector(*window)) go func() { log.Printf("listening on %s", *listen) @@ -54,3 +75,18 @@ func appMain() { common.SignalHandlerLoop() } + +func newWeavePublisher(dockerEndpoint, weaveAddr, weaveHostname, containerName string) (*app.WeavePublisher, error) { + dockerClient, err := docker.NewDockerClientStub(dockerEndpoint) + if err != nil { + return nil, err + } + weaveClient := weave.NewClient(weaveAddr) + return app.NewWeavePublisher( + weaveClient, + dockerClient, + app.Interfaces, + weaveHostname, + containerName, + ), nil +} diff --git a/prog/probe.go b/prog/probe.go index ac43f68520..dc36aaec8b 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -17,6 +17,8 @@ import ( "github.com/weaveworks/weave/common" "github.com/weaveworks/scope/common/hostname" + "github.com/weaveworks/scope/common/sanitize" + "github.com/weaveworks/scope/common/weave" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe" "github.com/weaveworks/scope/probe/appclient" @@ -33,23 +35,27 @@ import ( // Main runs the probe func probeMain() { var ( - targets = []string{fmt.Sprintf("localhost:%d", xfer.AppPort), fmt.Sprintf("scope.weave.local:%d", xfer.AppPort)} - token = flag.String("token", "default-token", "probe token") - httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server") - publishInterval = flag.Duration("publish.interval", 3*time.Second, "publish (output) interval") - spyInterval = flag.Duration("spy.interval", time.Second, "spy (scan) interval") - spyProcs = flag.Bool("processes", true, "report processes (needs root)") - dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes") - dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes") - dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name") + targets = []string{fmt.Sprintf("localhost:%d", xfer.AppPort)} + token = flag.String("token", "default-token", "probe token") + httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server") + publishInterval = flag.Duration("publish.interval", 3*time.Second, "publish (output) interval") + spyInterval = flag.Duration("spy.interval", time.Second, "spy (scan) interval") + spyProcs = flag.Bool("processes", true, "report processes (needs root)") + procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") + useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections") + insecure = flag.Bool("insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers") + logPrefix = flag.String("log.prefix", "", "prefix for each log line") + + dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes") + dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes") + dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name") + kubernetesEnabled = flag.Bool("kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node") kubernetesAPI = flag.String("kubernetes.api", "", "Address of kubernetes master api") kubernetesInterval = flag.Duration("kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data") - weaveRouterAddr = flag.String("weave.router.addr", "127.0.0.1:6784", "IP address & port of the Weave router") - procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") - useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections") - insecure = flag.Bool("insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers") - logPrefix = flag.String("log.prefix", "", "prefix for each log line") + + weaveRouterAddr = flag.String("weave.router.addr", "127.0.0.1:6784", "IP address & port of the Weave router") + weaveDNSTarget = flag.String("weave.hostname", fmt.Sprintf("scope.weave.local:%d", xfer.AppPort), "Hostname to lookup in weaveDNS") ) flag.Parse() @@ -108,7 +114,7 @@ func probeMain() { }) defer clients.Stop() - resolver := appclient.NewStaticResolver(targets, clients.Set) + resolver := appclient.NewResolver(targets, net.LookupIP, clients.Set) defer resolver.Stop() processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) @@ -149,10 +155,20 @@ func probeMain() { } if *weaveRouterAddr != "" { - weave := overlay.NewWeave(hostID, *weaveRouterAddr) + client := weave.NewClient(sanitize.URL("http://", 6784, "")(*weaveRouterAddr)) + weave := overlay.NewWeave(hostID, client) defer weave.Stop() p.AddTagger(weave) p.AddReporter(weave) + + dockerBridgeIP, err := getFirstAddressOf(*dockerBridge) + if err != nil { + log.Println("Error getting docker bridge ip:", err) + } else { + weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") + weaveResolver := appclient.NewResolver([]string{*weaveDNSTarget}, weaveDNSLookup, clients.Set) + defer weaveResolver.Stop() + } } if *httpListen != "" { @@ -168,3 +184,22 @@ func probeMain() { common.SignalHandlerLoop() } + +func getFirstAddressOf(name string) (string, error) { + inf, err := net.InterfaceByName(name) + if err != nil { + return "", err + } + + addrs, err := inf.Addrs() + if err != nil { + return "", err + } + + switch v := addrs[0].(type) { + case *net.IPNet: + return v.IP.String(), nil + default: + return "", fmt.Errorf("No IP on docker bridge!") + } +} diff --git a/test/exec/exec.go b/test/exec/exec.go index a1464bf13a..7e1dbe9dc9 100644 --- a/test/exec/exec.go +++ b/test/exec/exec.go @@ -60,6 +60,14 @@ func (c *mockCmd) Kill() error { return nil } +func (c *mockCmd) Output() ([]byte, error) { + return ioutil.ReadAll(c.ReadCloser) +} + +func (c *mockCmd) Run() error { + return nil +} + func (b *blockingReader) Read(p []byte) (n int, err error) { <-b.quit return 0, nil diff --git a/test/weave/mock.go b/test/weave/mock.go new file mode 100644 index 0000000000..96719265e3 --- /dev/null +++ b/test/weave/mock.go @@ -0,0 +1,71 @@ +package weave + +import ( + "net" + + "github.com/weaveworks/scope/common/weave" +) + +// Constants used for testing +const ( + MockWeavePeerName = "winnebago" + MockWeavePeerNickName = "winny" + MockContainerID = "83183a667c01" + MockContainerMAC = "d6:f2:5a:12:36:a8" + MockContainerIP = "10.0.0.123" + MockHostname = "hostname.weave.local" +) + +// MockClient is a mock version of weave.Client +type MockClient struct{} + +// Status implements weave.Client +func (MockClient) Status() (weave.Status, error) { + return weave.Status{ + Router: weave.Router{ + Peers: []struct { + Name string + NickName string + }{ + { + Name: MockWeavePeerName, + NickName: MockWeavePeerNickName, + }, + }, + }, + DNS: weave.DNS{ + Entries: []struct { + Hostname string + ContainerID string + Tombstone int64 + }{ + { + Hostname: MockHostname + ".", + ContainerID: MockContainerID, + Tombstone: 0, + }, + }, + }, + }, nil +} + +// AddDNSEntry implements weave.Client +func (MockClient) AddDNSEntry(fqdn, containerid string, ip net.IP) error { + return nil +} + +// PS implements weave.Client +func (MockClient) PS() (map[string]weave.PSEntry, error) { + return map[string]weave.PSEntry{ + MockContainerID: { + ContainerIDPrefix: MockContainerID, + MACAddress: MockContainerMAC, + IPs: []string{MockContainerIP}, + }, + }, nil +} + +// Expose implements weave.Client +func (MockClient) Expose() error { + return nil +}