Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

make proxy attach network on docker start event #1210

Merged
merged 5 commits into from
Sep 15, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions common/docker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

// An observer for container events
type ContainerObserver interface {
ContainerStarted(ident string)
ContainerDied(ident string)
}

Expand All @@ -22,12 +23,26 @@ func NewClient(apiPath string) (*Client, error) {
}
client := &Client{dc}

env, err := client.Version()
return client, client.checkWorking(apiPath)
}

func NewVersionedClient(apiPath string, apiVersionString string) (*Client, error) {
dc, err := docker.NewVersionedClient(apiPath, apiVersionString)
if err != nil {
return nil, err
}
client := &Client{dc}

return client, client.checkWorking(apiPath)
}

func (c *Client) checkWorking(apiPath string) error {
env, err := c.Version()
if err != nil {
return err
}
Log.Infof("[docker] Using Docker API on %s: %v", apiPath, env)
return client, nil
return nil
}

// AddObserver adds an observer for docker events
Expand All @@ -41,6 +56,9 @@ func (c *Client) AddObserver(ob ContainerObserver) error {
go func() {
for event := range events {
switch event.Status {
case "start":
id := event.ID
ob.ContainerStarted(id)
case "die":
id := event.ID
ob.ContainerDied(id)
Expand Down
2 changes: 2 additions & 0 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (alloc *Allocator) ContainerDied(ident string) {
}
}

func (alloc *Allocator) ContainerStarted(ident string) {}

// Delete (Sync) - release all IP addresses for container with given name
func (alloc *Allocator) Delete(ident string) error {
errChan := make(chan error)
Expand Down
2 changes: 2 additions & 0 deletions nameserver/nameserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func (n *Nameserver) ReverseLookup(ip address.Address) (string, error) {
return match.Hostname, nil
}

func (n *Nameserver) ContainerStarted(ident string) {}

func (n *Nameserver) ContainerDied(ident string) {
n.Lock()
entries := n.entries.tombstone(n.ourName, func(e *Entry) bool {
Expand Down
4 changes: 3 additions & 1 deletion prog/weaveproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func main() {
Log.Fatalf("Could not start proxy: %s", err)
}

go p.ListenAndServe()
listeners := p.Listen()
p.AttachExistingContainers()
go p.Serve(listeners)
SignalHandlerLoop()
}
115 changes: 110 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"os"
"regexp"
"strings"
"sync"
"syscall"

"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
. "github.com/weaveworks/weave/common"
weavedocker "github.com/weaveworks/weave/common/docker"
)

const (
Expand Down Expand Up @@ -44,16 +46,23 @@ type Config struct {
WithoutDNS bool
}

type wait struct {
ident string
ch chan struct{}
}

type Proxy struct {
sync.RWMutex
Config
client *docker.Client
dockerBridgeIP string
hostnameMatchRegexp *regexp.Regexp
weaveWaitVolume string
waiters map[*http.Request]*wait
}

func NewProxy(c Config) (*Proxy, error) {
p := &Proxy{Config: c}
p := &Proxy{Config: c, waiters: make(map[*http.Request]*wait)}

if err := p.TLSConfig.loadCerts(); err != nil {
Log.Fatalf("Could not configure tls for proxy: %s", err)
Expand All @@ -64,11 +73,11 @@ func NewProxy(c Config) (*Proxy, error) {
// to insulate ourselves from breaking changes to the API, as
// happened in 1.20 (Docker 1.8.0) when the presentation of
// volumes changed in `inspect`.
client, err := docker.NewVersionedClient(dockerSockUnix, "1.15")
client, err := weavedocker.NewVersionedClient(dockerSockUnix, "1.15")
if err != nil {
return nil, err
}
p.client = client
p.client = client.Client

if !p.WithoutDNS {
dockerBridgeIP, stderr, err := callWeave("docker-bridge-ip")
Expand All @@ -88,9 +97,20 @@ func NewProxy(c Config) (*Proxy, error) {
return nil, err
}

client.AddObserver(p)

return p, nil
}

func (proxy *Proxy) AttachExistingContainers() {
containers, _ := proxy.client.ListContainers(docker.ListContainersOptions{})
for _, cont := range containers {
if strings.HasPrefix(cont.Command, weaveWaitEntrypoint[0]) {
proxy.ContainerStarted(cont.ID)
}
}
}

func (proxy *Proxy) Dial() (net.Conn, error) {
return net.Dial("unix", dockerSock)
}
Expand Down Expand Up @@ -131,7 +151,7 @@ func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
proxy.Intercept(i, w, r)
}

func (proxy *Proxy) ListenAndServe() {
func (proxy *Proxy) Listen() []net.Listener {
listeners := []net.Listener{}
addrs := []string{}
for _, addr := range proxy.ListenAddrs {
Expand All @@ -146,7 +166,10 @@ func (proxy *Proxy) ListenAndServe() {
for _, addr := range addrs {
Log.Infoln("proxy listening on", addr)
}
return listeners
}

func (proxy *Proxy) Serve(listeners []net.Listener) {
errs := make(chan error)
for _, listener := range listeners {
go func(listener net.Listener) {
Expand Down Expand Up @@ -224,6 +247,88 @@ func (proxy *Proxy) listen(protoAndAddr string) (net.Listener, string, error) {
return listener, fmt.Sprintf("%s://%s", proto, addr), nil
}

// weavedocker.ContainerObserver interface
func (proxy *Proxy) ContainerStarted(ident string) {
container, err := proxy.client.InspectContainer(ident)
if err != nil {
Log.Warningf("Error inspecting container %s: %v", ident, err)
return
}
// If this was a container we modified the entrypoint for, attach it to the network
if containerShouldAttach(container) {
proxy.attach(container)
}
proxy.notifyWaiters(container.ID)
}

func containerShouldAttach(container *docker.Container) bool {
return len(container.Config.Entrypoint) > 0 && container.Config.Entrypoint[0] == weaveWaitEntrypoint[0]
}

func (proxy *Proxy) createWait(r *http.Request, ident string) {
proxy.Lock()
ch := make(chan struct{})
proxy.waiters[r] = &wait{ident: ident, ch: ch}
proxy.Unlock()
}

func (proxy *Proxy) removeWait(r *http.Request) {
proxy.Lock()
delete(proxy.waiters, r)
proxy.Unlock()
}

func (proxy *Proxy) notifyWaiters(ident string) {
proxy.Lock()
for _, wait := range proxy.waiters {
if ident == wait.ident && wait.ch != nil {
close(wait.ch)
wait.ch = nil
}
}
proxy.Unlock()
}

func (proxy *Proxy) waitForStart(r *http.Request) {
var ch chan struct{}
proxy.Lock()

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

wait, found := proxy.waiters[r]
if found {
ch = wait.ch
}
proxy.Unlock()
if found {
Log.Debugf("Wait for start of container %s", wait.ident)
<-ch
}
}

func (proxy *Proxy) ContainerDied(ident string) {
}

func (proxy *Proxy) attach(container *docker.Container) error {
cidrs, err := proxy.weaveCIDRsFromConfig(container.Config, container.HostConfig)
if err != nil {
Log.Infof("Leaving container %s alone because %s", container.ID, err)
return nil
}
Log.Infof("Attaching container %s with WEAVE_CIDR \"%s\" to weave network", container.ID, strings.Join(cidrs, " "))
args := []string{"attach"}
args = append(args, cidrs...)
if !proxy.NoRewriteHosts {
args = append(args, "--rewrite-hosts")
}
args = append(args, "--or-die", container.ID)
if _, stderr, err := callWeave(args...); err != nil {
Log.Warningf("Attaching container %s to weave network failed: %s", container.ID, string(stderr))
return errors.New(string(stderr))
} else if len(stderr) > 0 {
Log.Warningf("Attaching container %s to weave network: %s", container.ID, string(stderr))
}

return nil
}

func (proxy *Proxy) weaveCIDRsFromConfig(config *docker.Config, hostConfig *docker.HostConfig) ([]string, error) {
netMode := ""
if hostConfig != nil {
Expand Down
37 changes: 7 additions & 30 deletions proxy/start_container_interceptor.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,24 @@
package proxy

import (
"errors"
"net/http"
"strings"

. "github.com/weaveworks/weave/common"
)

type startContainerInterceptor struct{ proxy *Proxy }

func (i *startContainerInterceptor) InterceptRequest(r *http.Request) error {
return nil
container, err := inspectContainerInPath(i.proxy.client, r.URL.Path)
if err == nil && containerShouldAttach(container) {
i.proxy.createWait(r, container.ID)
}
return err
}

func (i *startContainerInterceptor) InterceptResponse(r *http.Response) error {
defer i.proxy.removeWait(r.Request)
if r.StatusCode < 200 || r.StatusCode >= 300 { // Docker didn't do the start
return nil
}

container, err := inspectContainerInPath(i.proxy.client, r.Request.URL.Path)
if err != nil {
return err
}

cidrs, err := i.proxy.weaveCIDRsFromConfig(container.Config, container.HostConfig)
if err != nil {
Log.Infof("Leaving container %s alone because %s", container.ID, err)
return nil
}
Log.Infof("Attaching container %s with WEAVE_CIDR \"%s\" to weave network", container.ID, strings.Join(cidrs, " "))
args := []string{"attach"}
args = append(args, cidrs...)
if !i.proxy.NoRewriteHosts {
args = append(args, "--rewrite-hosts")
}
args = append(args, "--or-die", container.ID)
if _, stderr, err := callWeave(args...); err != nil {
Log.Warningf("Attaching container %s to weave network failed: %s", container.ID, string(stderr))
return errors.New(string(stderr))
} else if len(stderr) > 0 {
Log.Warningf("Attaching container %s to weave network: %s", container.ID, string(stderr))
}

i.proxy.waitForStart(r.Request)
return nil
}
7 changes: 7 additions & 0 deletions site/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ or
[remote API](https://docs.docker.com/reference/api/docker_remote_api/)
are attached to the weave network before they begin execution.

Containers started in this way that subsequently restart, either by an
explicit `docker restart` command or by Docker restart policy, are
re-attached to the weave network by the weave Docker API proxy.

### <a name="addressing"></a>Address allocation

Containers are automatically allocated an IP address that is unique
Expand Down Expand Up @@ -248,6 +252,9 @@ invocation:
host1$ weave detach net:default net:10.2.2.0/24 net:10.2.3.0/24 $C
10.2.1.3 10.2.2.3 10.2.3.1

Note that addresses added by dynamic attachment are not re-attached
if the container restarts.

### <a name="security"></a>Security

In order to connect containers across untrusted networks, weave peers
Expand Down
25 changes: 20 additions & 5 deletions test/640_proxy_restart_reattaches_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,29 @@ C1=10.2.0.78
C2=10.2.0.34
NAME=seetwo.weave.local

check_attached() {
assert_raises "proxy exec_on $HOST1 c2 $CHECK_ETHWE_UP"
assert_dns_record $HOST1 c1 $NAME $C2
}

start_suite "Proxy restart reattaches networking to containers"

weave_on $HOST1 launch
proxy_start_container $HOST1 -e WEAVE_CIDR=$C2/24 -dt --name=c2 -h $NAME
proxy_start_container_with_dns $HOST1 -e WEAVE_CIDR=$C1/24 -dt --name=c1
proxy_start_container $HOST1 -e WEAVE_CIDR=$C2/24 -di --name=c2 --restart=always -h $NAME
proxy_start_container_with_dns $HOST1 -e WEAVE_CIDR=$C1/24 -di --name=c1 --restart=always

proxy docker_on $HOST1 restart -t=1 c2
check_attached

proxy docker_on $HOST1 restart c2
assert_raises "proxy exec_on $HOST1 c2 $CHECK_ETHWE_UP"
assert_dns_record $HOST1 c1 $NAME $C2
# Kill outside of Docker so Docker will restart it
run_on $HOST1 sudo kill -KILL $(docker_on $HOST1 inspect --format='{{.State.Pid}}' c2)
sleep 1
check_attached

# Restart docker itself, using different commands for systemd- and upstart-managed.
run_on $HOST1 sh -c "command -v systemctl >/dev/null && sudo systemctl restart docker || sudo service docker restart"
sleep 1
weave_on $HOST1 launch
check_attached

end_suite