Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #393 from vieux/mesos_poc
Browse files Browse the repository at this point in the history
Proposal: Scheduler/Cluster Driver
  • Loading branch information
aluzzardi committed Feb 27, 2015
2 parents 6afdcb9 + 8b7afe2 commit db97473
Show file tree
Hide file tree
Showing 36 changed files with 1,514 additions and 1,419 deletions.
68 changes: 30 additions & 38 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import (

log "github.com/Sirupsen/logrus"
dockerfilters "github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/units"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/cluster/swarm"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/version"
"github.com/gorilla/mux"
Expand All @@ -26,8 +25,7 @@ import (
const APIVERSION = "1.16"

type context struct {
cluster *cluster.Cluster
scheduler *scheduler.Scheduler
cluster cluster.Cluster
eventsHandler *eventsHandler
debug bool
tlsConfig *tls.Config
Expand All @@ -37,23 +35,14 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request)

// GET /info
func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
nodes := c.cluster.Nodes()
driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}}

for _, node := range nodes {
driverStatus = append(driverStatus, [2]string{node.Name, node.Addr})
driverStatus = append(driverStatus, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
driverStatus = append(driverStatus, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.ReservedCpus(), node.Cpus)})
driverStatus = append(driverStatus, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.ReservedMemory())), units.BytesSize(float64(node.Memory)))})
}
info := struct {
Containers int
DriverStatus [][2]string
NEventsListener int
Debug bool
}{
len(c.cluster.Containers()),
driverStatus,
c.cluster.Info(),
c.eventsHandler.Size(),
c.debug,
}
Expand Down Expand Up @@ -98,13 +87,13 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
}

accepteds, _ := filters["node"]
images := []*dockerclient.Image{}
images := []*cluster.Image{}

for _, node := range c.cluster.Nodes() {
for _, image := range c.cluster.Images() {
if len(accepteds) != 0 {
found := false
for _, accepted := range accepteds {
if accepted == node.Name || accepted == node.ID {
if accepted == image.Node.Name() || accepted == image.Node.ID() {
found = true
break
}
Expand All @@ -113,10 +102,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
continue
}
}

for _, image := range node.Images() {
images = append(images, image)
}
images = append(images, image)
}

w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -150,14 +136,14 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
// TODO remove the Node Name in the name when we have a good solution
tmp.Names = make([]string, len(container.Names))
for i, name := range container.Names {
tmp.Names[i] = "/" + container.Node.Name + name
tmp.Names[i] = "/" + container.Node.Name() + name
}
// insert node IP
tmp.Ports = make([]dockerclient.Port, len(container.Ports))
for i, port := range container.Ports {
tmp.Ports[i] = port
if port.IP == "0.0.0.0" {
tmp.Ports[i].IP = container.Node.IP
tmp.Ports[i].IP = container.Node.IP()
}
}
out = append(out, &tmp)
Expand All @@ -179,7 +165,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
}
client, scheme := newClientAndScheme(c.tlsConfig)

resp, err := client.Get(scheme + "://" + container.Node.Addr + "/containers/" + container.Id + "/json")
resp, err := client.Get(scheme + "://" + container.Node.Addr() + "/containers/" + container.Id + "/json")
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -205,7 +191,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", n)), -1)

// insert node IP
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP)), -1)
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP())), -1)

w.Header().Set("Content-Type", "application/json")
w.Write(data)
Expand All @@ -229,7 +215,7 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
return
}

container, err := c.scheduler.CreateContainer(&config, name)
container, err := c.cluster.CreateContainer(&config, name)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -255,7 +241,7 @@ func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) {
httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound)
return
}
if err := c.scheduler.RemoveContainer(container, force); err != nil {
if err := c.cluster.RemoveContainer(container, force); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -291,11 +277,13 @@ func proxyContainerAndForceRefresh(c *context, w http.ResponseWriter, r *http.Re
cb := func(resp *http.Response) {
if resp.StatusCode == http.StatusCreated {
log.Debugf("[REFRESH CONTAINER] --> %s", container.Id)
container.Node.RefreshContainer(container.Id, true)
if n, ok := container.Node.(*swarm.Node); ok {
n.RefreshContainer(container.Id, true)
}
}
}

if err := proxyAsync(c.tlsConfig, container.Node.Addr, w, r, cb); err != nil {
if err := proxyAsync(c.tlsConfig, container.Node.Addr(), w, r, cb); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}

Expand All @@ -309,7 +297,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
return
}

if err := proxy(c.tlsConfig, container.Node.Addr, w, r); err != nil {
if err := proxy(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
Expand All @@ -318,18 +306,22 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]

for _, node := range c.cluster.Nodes() {
if node.Image(name) != nil {
proxy(c.tlsConfig, node.Addr, w, r)
return
}
if image := c.cluster.Image(name); image != nil {
proxy(c.tlsConfig, image.Node.Addr(), w, r)
return
}
httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound)
}

// Proxy a request to a random node
func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
candidates := c.cluster.Nodes()
candidates := []cluster.Node{}

// FIXME: doesn't work if there are no container in the cluster
// remove proxyRandom and implemente the features locally
for _, container := range c.cluster.Containers() {
candidates = append(candidates, container.Node)
}

healthFilter := &filter.HealthFilter{}
accepted, err := healthFilter.Filter(nil, candidates)
Expand All @@ -339,7 +331,7 @@ func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
return
}

if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil {
if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr(), w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
Expand All @@ -352,7 +344,7 @@ func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) {
return
}

if err := hijack(c.tlsConfig, container.Node.Addr, w, r); err != nil {
if err := hijack(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
Expand Down
15 changes: 6 additions & 9 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ package api

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/version"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
)

func serveRequest(c *cluster.Cluster, s *scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error {
func serveRequest(c cluster.Cluster, w http.ResponseWriter, req *http.Request) error {
context := &context{
cluster: c,
scheduler: s,
cluster: c,
}

r := createRouter(context, false)
Expand All @@ -28,7 +25,7 @@ func TestGetVersion(t *testing.T) {
req, err := http.NewRequest("GET", "/version", nil)
assert.NoError(t, err)

assert.NoError(t, serveRequest(nil, nil, r, req))
assert.NoError(t, serveRequest(nil, r, req))
assert.Equal(t, r.Code, http.StatusOK)

v := struct {
Expand Down
10 changes: 5 additions & 5 deletions api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func (eh *eventsHandler) Handle(e *cluster.Event) error {
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:%q,%q:%q,%q:%q,%q:%q}",
"status", e.Status,
"id", e.Id,
"from", e.From+" node:"+e.Node.Name,
"from", e.From+" node:"+e.Node.Name(),
"time", e.Time,
"node_name", e.Node.Name,
"node_id", e.Node.ID,
"node_addr", e.Node.Addr,
"node_ip", e.Node.IP)
"node_name", e.Node.Name(),
"node_id", e.Node.ID(),
"node_addr", e.Node.Addr(),
"node_ip", e.Node.IP())

for key, w := range eh.ws {
if _, err := fmt.Fprintf(w, str); err != nil {
Expand Down
25 changes: 19 additions & 6 deletions api/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ func (fw *FakeWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}

type FakeNode struct{}

func (fn *FakeNode) ID() string { return "node_id" }
func (fn *FakeNode) Name() string { return "node_name" }
func (fn *FakeNode) IP() string { return "node_ip" }
func (fn *FakeNode) Addr() string { return "node_addr" }
func (fn *FakeNode) Images() []*cluster.Image { return nil }
func (fn *FakeNode) Image(_ string) *cluster.Image { return nil }
func (fn *FakeNode) Containers() []*cluster.Container { return nil }
func (fn *FakeNode) Container(_ string) *cluster.Container { return nil }
func (fn *FakeNode) TotalCpus() int64 { return 0 }
func (fn *FakeNode) UsedCpus() int64 { return 0 }
func (fn *FakeNode) TotalMemory() int64 { return 0 }
func (fn *FakeNode) UsedMemory() int64 { return 0 }
func (fn *FakeNode) Labels() map[string]string { return nil }
func (fn *FakeNode) IsHealthy() bool { return true }

func TestHandle(t *testing.T) {
eh := NewEventsHandler()
assert.Equal(t, eh.Size(), 0)
Expand All @@ -27,13 +44,9 @@ func TestHandle(t *testing.T) {
assert.Equal(t, eh.Size(), 1)

event := &cluster.Event{
Node: &cluster.Node{
Name: "node_name",
ID: "node_id",
Addr: "node_addr",
IP: "node_ip",
},
Node: &FakeNode{},
}

event.Event.Status = "status"
event.Event.Id = "id"
event.Event.From = "from"
Expand Down
7 changes: 2 additions & 5 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
)

const DefaultDockerPort = ":2375"
Expand All @@ -29,14 +28,12 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
return l, nil
}

func ListenAndServe(c *cluster.Cluster, s *scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config, eventsHandler *eventsHandler) error {
context := &context{
cluster: c,
scheduler: s,
eventsHandler: NewEventsHandler(),
eventsHandler: eventsHandler,
tlsConfig: tlsConfig,
}
c.Events(context.eventsHandler)
r := createRouter(context, enableCors)
chErrors := make(chan error, len(hosts))

Expand Down
Loading

0 comments on commit db97473

Please sign in to comment.