From 81e8ba223f9de118853fe108b1d62af1f91aa762 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 22 Jul 2015 17:02:16 +0100 Subject: [PATCH 1/5] Do network attach on docker event rather than intercepting start and restart commands. This means it also works when the docker daemon restarts a container. --- common/docker/client.go | 22 ++++++++++-- ipam/allocator.go | 2 ++ nameserver/nameserver.go | 2 ++ proxy/proxy.go | 51 ++++++++++++++++++++++++---- proxy/start_container_interceptor.go | 47 ------------------------- site/features.md | 7 ++++ 6 files changed, 76 insertions(+), 55 deletions(-) delete mode 100644 proxy/start_container_interceptor.go diff --git a/common/docker/client.go b/common/docker/client.go index 1f2ee0df56..6a4eaff1de 100644 --- a/common/docker/client.go +++ b/common/docker/client.go @@ -7,6 +7,7 @@ import ( // An observer for container events type ContainerObserver interface { + ContainerStarted(ident string) ContainerDied(ident string) } @@ -22,12 +23,26 @@ func NewClient(apiPath string) (*Client, error) { } client := &Client{dc} - env, err := client.Version() + return client, client.checkWorking(apiPath) +} + +func NewVersionedClient(apiPath string, apiVersionString string) (*Client, error) { + dc, err := docker.NewVersionedClient(apiPath, apiVersionString) if err != nil { return nil, err } + client := &Client{dc} + + return client, client.checkWorking(apiPath) +} + +func (c *Client) checkWorking(apiPath string) error { + env, err := c.Version() + if err != nil { + return err + } Log.Infof("[docker] Using Docker API on %s: %v", apiPath, env) - return client, nil + return nil } // AddObserver adds an observer for docker events @@ -41,6 +56,9 @@ func (c *Client) AddObserver(ob ContainerObserver) error { go func() { for event := range events { switch event.Status { + case "start": + id := event.ID + ob.ContainerStarted(id) case "die": id := event.ID ob.ContainerDied(id) diff --git a/ipam/allocator.go b/ipam/allocator.go index 28f2e449a3..e3765aca82 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -226,6 +226,8 @@ func (alloc *Allocator) ContainerDied(ident string) { } } +func (alloc *Allocator) ContainerStarted(ident string) {} + // Delete (Sync) - release all IP addresses for container with given name func (alloc *Allocator) Delete(ident string) error { errChan := make(chan error) diff --git a/nameserver/nameserver.go b/nameserver/nameserver.go index dd6bcd6bbf..356e61e0f0 100644 --- a/nameserver/nameserver.go +++ b/nameserver/nameserver.go @@ -130,6 +130,8 @@ func (n *Nameserver) ReverseLookup(ip address.Address) (string, error) { return match.Hostname, nil } +func (n *Nameserver) ContainerStarted(ident string) {} + func (n *Nameserver) ContainerDied(ident string) { n.Lock() entries := n.entries.tombstone(n.ourName, func(e *Entry) bool { diff --git a/proxy/proxy.go b/proxy/proxy.go index beb349152e..88ec508505 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -11,8 +11,9 @@ import ( "strings" "syscall" - "github.com/fsouza/go-dockerclient" + docker "github.com/fsouza/go-dockerclient" . "github.com/weaveworks/weave/common" + weavedocker "github.com/weaveworks/weave/common/docker" ) const ( @@ -25,7 +26,6 @@ const ( var ( containerCreateRegexp = regexp.MustCompile("^(/v[0-9\\.]*)?/containers/create$") - containerStartRegexp = regexp.MustCompile("^(/v[0-9\\.]*)?/containers/[^/]*/(re)?start$") execCreateRegexp = regexp.MustCompile("^(/v[0-9\\.]*)?/containers/[^/]*/exec$") ErrWeaveCIDRNone = errors.New("the container was created with the '-e WEAVE_CIDR=none' option") @@ -64,11 +64,11 @@ func NewProxy(c Config) (*Proxy, error) { // to insulate ourselves from breaking changes to the API, as // happened in 1.20 (Docker 1.8.0) when the presentation of // volumes changed in `inspect`. - client, err := docker.NewVersionedClient(dockerSockUnix, "1.15") + client, err := weavedocker.NewVersionedClient(dockerSockUnix, "1.15") if err != nil { return nil, err } - p.client = client + p.client = client.Client if !p.WithoutDNS { dockerBridgeIP, stderr, err := callWeave("docker-bridge-ip") @@ -88,6 +88,8 @@ func NewProxy(c Config) (*Proxy, error) { return nil, err } + client.AddObserver(p) + return p, nil } @@ -121,8 +123,6 @@ func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case containerCreateRegexp.MatchString(path): i = &createContainerInterceptor{proxy} - case containerStartRegexp.MatchString(path): - i = &startContainerInterceptor{proxy} case execCreateRegexp.MatchString(path): i = &createExecInterceptor{proxy} default: @@ -224,6 +224,45 @@ func (proxy *Proxy) listen(protoAndAddr string) (net.Listener, string, error) { return listener, fmt.Sprintf("%s://%s", proto, addr), nil } +// weavedocker.ContainerObserver interface +func (proxy *Proxy) ContainerStarted(ident string) { + container, err := proxy.client.InspectContainer(ident) + if err != nil { + Log.Warningf("Error inspecting container %s: %v", ident, err) + return + } + // If this was a container we modified the entrypoint for, attach it to the network + if len(container.Config.Entrypoint) > 0 && container.Config.Entrypoint[0] == weaveWaitEntrypoint[0] { + proxy.attach(container) + } +} + +func (proxy *Proxy) ContainerDied(ident string) { +} + +func (proxy *Proxy) attach(container *docker.Container) error { + cidrs, err := proxy.weaveCIDRsFromConfig(container.Config, container.HostConfig) + if err != nil { + Log.Infof("Leaving container %s alone because %s", container.ID, err) + return nil + } + Log.Infof("Attaching container %s with WEAVE_CIDR \"%s\" to weave network", container.ID, strings.Join(cidrs, " ")) + args := []string{"attach"} + args = append(args, cidrs...) + if !proxy.NoRewriteHosts { + args = append(args, "--rewrite-hosts") + } + args = append(args, "--or-die", container.ID) + if _, stderr, err := callWeave(args...); err != nil { + Log.Warningf("Attaching container %s to weave network failed: %s", container.ID, string(stderr)) + return errors.New(string(stderr)) + } else if len(stderr) > 0 { + Log.Warningf("Attaching container %s to weave network: %s", container.ID, string(stderr)) + } + + return nil +} + func (proxy *Proxy) weaveCIDRsFromConfig(config *docker.Config, hostConfig *docker.HostConfig) ([]string, error) { netMode := "" if hostConfig != nil { diff --git a/proxy/start_container_interceptor.go b/proxy/start_container_interceptor.go deleted file mode 100644 index 267f68c492..0000000000 --- a/proxy/start_container_interceptor.go +++ /dev/null @@ -1,47 +0,0 @@ -package proxy - -import ( - "errors" - "net/http" - "strings" - - . "github.com/weaveworks/weave/common" -) - -type startContainerInterceptor struct{ proxy *Proxy } - -func (i *startContainerInterceptor) InterceptRequest(r *http.Request) error { - return nil -} - -func (i *startContainerInterceptor) InterceptResponse(r *http.Response) error { - if r.StatusCode < 200 || r.StatusCode >= 300 { // Docker didn't do the start - return nil - } - - container, err := inspectContainerInPath(i.proxy.client, r.Request.URL.Path) - if err != nil { - return err - } - - cidrs, err := i.proxy.weaveCIDRsFromConfig(container.Config, container.HostConfig) - if err != nil { - Log.Infof("Leaving container %s alone because %s", container.ID, err) - return nil - } - Log.Infof("Attaching container %s with WEAVE_CIDR \"%s\" to weave network", container.ID, strings.Join(cidrs, " ")) - args := []string{"attach"} - args = append(args, cidrs...) - if !i.proxy.NoRewriteHosts { - args = append(args, "--rewrite-hosts") - } - args = append(args, "--or-die", container.ID) - if _, stderr, err := callWeave(args...); err != nil { - Log.Warningf("Attaching container %s to weave network failed: %s", container.ID, string(stderr)) - return errors.New(string(stderr)) - } else if len(stderr) > 0 { - Log.Warningf("Attaching container %s to weave network: %s", container.ID, string(stderr)) - } - - return nil -} diff --git a/site/features.md b/site/features.md index 189f9365a1..84e295fd8c 100644 --- a/site/features.md +++ b/site/features.md @@ -64,6 +64,10 @@ or [remote API](https://docs.docker.com/reference/api/docker_remote_api/) are attached to the weave network before they begin execution. +Containers started in this way that subsequently restart, either by an +explicit `docker restart` command or by Docker restart policy, are +re-attached to the weave network by the weave Docker API proxy. + ### Address allocation Containers are automatically allocated an IP address that is unique @@ -248,6 +252,9 @@ invocation: host1$ weave detach net:default net:10.2.2.0/24 net:10.2.3.0/24 $C 10.2.1.3 10.2.2.3 10.2.3.1 +Note that addresses added by dynamic attachment are not re-attached +if the container restarts. + ### Security In order to connect containers across untrusted networks, weave peers From 9ae643531767c2276cb16b5b77277a8d6a78751c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 23 Jul 2015 11:26:48 +0100 Subject: [PATCH 2/5] Attach existing weave containers at startup --- prog/weaveproxy/main.go | 4 +++- proxy/proxy.go | 14 +++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/prog/weaveproxy/main.go b/prog/weaveproxy/main.go index 23073b745f..60d3cc80e4 100644 --- a/prog/weaveproxy/main.go +++ b/prog/weaveproxy/main.go @@ -59,6 +59,8 @@ func main() { Log.Fatalf("Could not start proxy: %s", err) } - go p.ListenAndServe() + listeners := p.Listen() + p.AttachExistingContainers() + go p.Serve(listeners) SignalHandlerLoop() } diff --git a/proxy/proxy.go b/proxy/proxy.go index 88ec508505..cff7f77365 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -93,6 +93,15 @@ func NewProxy(c Config) (*Proxy, error) { return p, nil } +func (proxy *Proxy) AttachExistingContainers() { + containers, _ := proxy.client.ListContainers(docker.ListContainersOptions{}) + for _, cont := range containers { + if strings.HasPrefix(cont.Command, weaveWaitEntrypoint[0]) { + proxy.ContainerStarted(cont.ID) + } + } +} + func (proxy *Proxy) Dial() (net.Conn, error) { return net.Dial("unix", dockerSock) } @@ -131,7 +140,7 @@ func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { proxy.Intercept(i, w, r) } -func (proxy *Proxy) ListenAndServe() { +func (proxy *Proxy) Listen() []net.Listener { listeners := []net.Listener{} addrs := []string{} for _, addr := range proxy.ListenAddrs { @@ -146,7 +155,10 @@ func (proxy *Proxy) ListenAndServe() { for _, addr := range addrs { Log.Infoln("proxy listening on", addr) } + return listeners +} +func (proxy *Proxy) Serve(listeners []net.Listener) { errs := make(chan error) for _, listener := range listeners { go func(listener net.Listener) { From b712ba1e04c7425edad0bdfb27be8049cf9ac631 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 23 Jul 2015 12:56:44 +0100 Subject: [PATCH 3/5] Extend test to cover containers restarting automatically --- test/640_proxy_restart_reattaches_test.sh | 25 ++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/test/640_proxy_restart_reattaches_test.sh b/test/640_proxy_restart_reattaches_test.sh index a2c04ebd16..428092f3af 100755 --- a/test/640_proxy_restart_reattaches_test.sh +++ b/test/640_proxy_restart_reattaches_test.sh @@ -6,14 +6,29 @@ C1=10.2.0.78 C2=10.2.0.34 NAME=seetwo.weave.local +check_attached() { + assert_raises "proxy exec_on $HOST1 c2 $CHECK_ETHWE_UP" + assert_dns_record $HOST1 c1 $NAME $C2 +} + start_suite "Proxy restart reattaches networking to containers" weave_on $HOST1 launch -proxy_start_container $HOST1 -e WEAVE_CIDR=$C2/24 -dt --name=c2 -h $NAME -proxy_start_container_with_dns $HOST1 -e WEAVE_CIDR=$C1/24 -dt --name=c1 +proxy_start_container $HOST1 -e WEAVE_CIDR=$C2/24 -di --name=c2 --restart=always -h $NAME +proxy_start_container_with_dns $HOST1 -e WEAVE_CIDR=$C1/24 -di --name=c1 --restart=always + +proxy docker_on $HOST1 restart -t=1 c2 +check_attached -proxy docker_on $HOST1 restart c2 -assert_raises "proxy exec_on $HOST1 c2 $CHECK_ETHWE_UP" -assert_dns_record $HOST1 c1 $NAME $C2 +# Kill outside of Docker so Docker will restart it +run_on $HOST1 sudo kill -KILL $(docker_on $HOST1 inspect --format='{{.State.Pid}}' c2) +sleep 1 +check_attached + +# Restart docker itself, using different commands for systemd- and upstart-managed. +run_on $HOST1 sh -c "command -v systemctl >/dev/null && sudo systemctl restart docker || sudo service docker restart" +sleep 1 +weave_on $HOST1 launch +check_attached end_suite From 1e2344341d92f2cbd282249f7d07512c28d617fa Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 23 Jul 2015 15:05:37 +0100 Subject: [PATCH 4/5] Wait for container to be attached before returning from {re}start request --- proxy/proxy.go | 58 +++++++++++++++++++++++++++- proxy/start_container_interceptor.go | 24 ++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 proxy/start_container_interceptor.go diff --git a/proxy/proxy.go b/proxy/proxy.go index cff7f77365..10d0b72ce7 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -9,6 +9,7 @@ import ( "os" "regexp" "strings" + "sync" "syscall" docker "github.com/fsouza/go-dockerclient" @@ -26,6 +27,7 @@ const ( var ( containerCreateRegexp = regexp.MustCompile("^(/v[0-9\\.]*)?/containers/create$") + containerStartRegexp = regexp.MustCompile("^(/v[0-9\\.]*)?/containers/[^/]*/(re)?start$") execCreateRegexp = regexp.MustCompile("^(/v[0-9\\.]*)?/containers/[^/]*/exec$") ErrWeaveCIDRNone = errors.New("the container was created with the '-e WEAVE_CIDR=none' option") @@ -44,16 +46,23 @@ type Config struct { WithoutDNS bool } +type wait struct { + ident string + ch chan struct{} +} + type Proxy struct { + sync.RWMutex Config client *docker.Client dockerBridgeIP string hostnameMatchRegexp *regexp.Regexp weaveWaitVolume string + waiters map[*http.Request]*wait } func NewProxy(c Config) (*Proxy, error) { - p := &Proxy{Config: c} + p := &Proxy{Config: c, waiters: make(map[*http.Request]*wait)} if err := p.TLSConfig.loadCerts(); err != nil { Log.Fatalf("Could not configure tls for proxy: %s", err) @@ -132,6 +141,8 @@ func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case containerCreateRegexp.MatchString(path): i = &createContainerInterceptor{proxy} + case containerStartRegexp.MatchString(path): + i = &startContainerInterceptor{proxy} case execCreateRegexp.MatchString(path): i = &createExecInterceptor{proxy} default: @@ -244,9 +255,52 @@ func (proxy *Proxy) ContainerStarted(ident string) { return } // If this was a container we modified the entrypoint for, attach it to the network - if len(container.Config.Entrypoint) > 0 && container.Config.Entrypoint[0] == weaveWaitEntrypoint[0] { + if containerShouldAttach(container) { proxy.attach(container) } + proxy.notifyWaiters(container.ID) +} + +func containerShouldAttach(container *docker.Container) bool { + return len(container.Config.Entrypoint) > 0 && container.Config.Entrypoint[0] == weaveWaitEntrypoint[0] +} + +func (proxy *Proxy) createWait(r *http.Request, ident string) { + proxy.Lock() + ch := make(chan struct{}) + proxy.waiters[r] = &wait{ident: ident, ch: ch} + proxy.Unlock() +} + +func (proxy *Proxy) removeWait(r *http.Request) { + proxy.Lock() + delete(proxy.waiters, r) + proxy.Unlock() +} + +func (proxy *Proxy) notifyWaiters(ident string) { + proxy.Lock() + for _, wait := range proxy.waiters { + if ident == wait.ident && wait.ch != nil { + close(wait.ch) + wait.ch = nil + } + } + proxy.Unlock() +} + +func (proxy *Proxy) waitForStart(r *http.Request) { + var ch chan struct{} + proxy.Lock() + wait, found := proxy.waiters[r] + if found { + ch = wait.ch + } + proxy.Unlock() + if found { + Log.Debugf("Wait for start of container %s", wait.ident) + <-ch + } } func (proxy *Proxy) ContainerDied(ident string) { diff --git a/proxy/start_container_interceptor.go b/proxy/start_container_interceptor.go new file mode 100644 index 0000000000..6d549cf2b2 --- /dev/null +++ b/proxy/start_container_interceptor.go @@ -0,0 +1,24 @@ +package proxy + +import ( + "net/http" +) + +type startContainerInterceptor struct{ proxy *Proxy } + +func (i *startContainerInterceptor) InterceptRequest(r *http.Request) error { + container, err := inspectContainerInPath(i.proxy.client, r.URL.Path) + if err == nil && containerShouldAttach(container) { + i.proxy.createWait(r, container.ID) + } + return err +} + +func (i *startContainerInterceptor) InterceptResponse(r *http.Response) error { + defer i.proxy.removeWait(r.Request) + if r.StatusCode < 200 || r.StatusCode >= 300 { // Docker didn't do the start + return nil + } + i.proxy.waitForStart(r.Request) + return nil +} From 76895c76a1d0038d779bc5beb941b216a50dba72 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 14 Sep 2015 17:11:56 +0100 Subject: [PATCH 5/5] Use Mutex instead of RWMutex, for simplicity --- proxy/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/proxy.go b/proxy/proxy.go index 10d0b72ce7..993e6be877 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -52,7 +52,7 @@ type wait struct { } type Proxy struct { - sync.RWMutex + sync.Mutex Config client *docker.Client dockerBridgeIP string