Skip to content

Commit

Permalink
Beginnings of capability support.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie authored and tomwilkie committed Oct 27, 2015
1 parent 6a4c997 commit 4ec4b3e
Show file tree
Hide file tree
Showing 14 changed files with 642 additions and 42 deletions.
166 changes: 166 additions & 0 deletions app/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main

import (
"encoding/json"
"log"
"net/http"
"math/rand"
"sync"
"time"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"

"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)

const capabilityKey = "capability"

type CapabilityRouter interface {
HandleCapability(xfer.Reporter, APITopologyDesc, http.ResponseWriter, *http.Request)
HandleProbeWS(w http.ResponseWriter, r *http.Request)
Stop()
}

type capabilityRouter struct {
sync.Mutex
quit chan struct{}
probes map[string]xfer.CapabilityHandler
}

func NewCapabilitiyRouter() CapabilityRouter {
return &capabilityRouter{
probes: map[string]xfer.CapabilityHandler{},
}
}

func (cr *capabilityRouter) Stop() {
close(cr.quit)
}

// Capabilities
func (cr *capabilityRouter) HandleCapability(rep xfer.Reporter, t APITopologyDesc, w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
nodeID = vars["id"]
node, nodeOK = t.renderer.Render(rep.Report())[nodeID]
arguments = r.URL.Query()
)
if !nodeOK {
http.NotFound(w, r)
return
}

originHost, ok := node.Metadata[report.HostNodeID]
if !ok {
http.NotFound(w, r)
return
}

handler, ok := cr.probes[originHost]
if !ok {
http.NotFound(w, r)
return
}

capabilities, ok := arguments[capabilityKey]
if !ok || len(capabilities) > 1 {
respondWith(w, http.StatusBadRequest, capabilityKey)
return
}
capability := capabilities[0]
delete(arguments, capabilityKey)

result := handler(xfer.Request{
ID: rand.Int63(),
NodeID: nodeID,
Capability: capability,
})
if result.Error != nil {
respondWith(w, http.StatusBadRequest, result.Error)
return
}
respondWith(w, http.StatusOK, result.Value)
}

func (cr *capabilityRouter) HandleProbeWS(w http.ResponseWriter, r *http.Request) {
probeid := r.Header.Get(xfer.ScopeProbeIDHeader)
if probeid == "" {
respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
return
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Error upgrading to websocket: %v", err)
return
}
defer conn.Close()

incoming, outgoing := make(chan xfer.Response), make(chan xfer.Request)
quit := make(chan struct{})

go func(c *websocket.Conn) {
defer close(quit)

for { // just discard everything the browser sends
_, reader, err := c.NextReader()
if err != nil {
log.Printf("Error reading message from %s: %v", probeid, err)
return
}

var response xfer.Response
if err := json.NewDecoder(reader).Decode(&response); err != nil {
log.Printf("Error parsing message from %s: %v", probeid, err)
return
}

incoming <- response
}
}(conn)

go func(c *websocket.Conn) {
defer close(quit)

for {
msg := <-outgoing
if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil {
log.Printf("Error SetWriteDeadline for %s: %v", probeid, err)
return
}
if err := conn.WriteJSON(msg); err != nil {
log.Printf("Error sending message to %s: %v", probeid, err)
return
}
}
}(conn)

outstanding := xfer.NewCapabilityMux()

cr.Lock()
cr.probes[probeid] = func(req xfer.Request) xfer.Response {
outgoing <- req
return outstanding.WaitForResponse(req.ID)
}
cr.Unlock()

defer func() {
cr.Lock()
defer cr.Unlock()
delete(cr.probes, probeid)
// TODO: cancel outstanding operations
}()

select {
case <-cr.quit:
return
case <-quit:
return
case res := <-incoming:
if err := outstanding.ResponseReceived(res); err != nil {
log.Printf("Invalid message: %v", err)
}
}
}
15 changes: 7 additions & 8 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc {
// resources for the UI.
func Router(c collector) *mux.Router {
router := mux.NewRouter()
router.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST")
capabilitiesRouter := NewCapabilitiyRouter()

get := router.Methods("GET").Subrouter()
get.HandleFunc("/api", gzipHandler(apiHandler))
Expand All @@ -75,8 +75,13 @@ func Router(c collector) *mux.Router {
get.MatcherFunc(URLMatcher("/api/origin/host/{id}")).HandlerFunc(
gzipHandler(makeOriginHostHandler(c)))
get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c)))
get.HandleFunc("/api/capabilities", capabilitiesRouter.HandleProbeWS)
get.PathPrefix("/").Handler(http.FileServer(FS(false))) // everything else is static

post := router.Methods("POST").Subrouter()
post.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST")
post.MatcherFunc(URLMatcher("/api/topology/{topology}/{id}")).HandlerFunc(
topologyRegistry.captureTopology(c, capabilitiesRouter.HandleCapability))
return router
}

Expand Down Expand Up @@ -107,12 +112,6 @@ func makeReportPostHandler(a xfer.Adder) http.HandlerFunc {
}
}

// APIDetails are some generic details that can be fetched from /api
type APIDetails struct {
ID string `json:"id"`
Version string `json:"version"`
}

func apiHandler(w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version})
respondWith(w, http.StatusOK, xfer.Details{ID: uniqueID, Version: version})
}
35 changes: 35 additions & 0 deletions probe/capabilities/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package capabilities

import (
"fmt"
"sync"

"github.com/weaveworks/scope/xfer"
)

var (
mtx = sync.Mutex{}
handlers = map[string]xfer.CapabilityHandler{}
)

func HandleCapabilityRequest(req xfer.Request) xfer.Response {
mtx.Lock()
handler, ok := handlers[req.Capability]
mtx.Unlock()
if !ok {
return xfer.Response{
ID: req.ID,
Error: fmt.Errorf("Capability '%s' not recognised", req.Capability),
}
}

response := handler(req)
response.ID = req.ID
return response
}

func Register(capability string, f xfer.CapabilityHandler) {
mtx.Lock()
defer mtx.Unlock()
handlers[capability] = f
}
19 changes: 19 additions & 0 deletions probe/docker/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package docker

import (
"log"

"github.com/weaveworks/scope/probe/capabilities"
"github.com/weaveworks/scope/xfer"
)

const RestartContainer = "docker_restart_container"

func init() {
capabilities.Register(RestartContainer, func(req xfer.Request) xfer.Response {
log.Printf("Got %s", req.Capability)
return xfer.Response{
Value: "Hello World",
}
})
}
2 changes: 1 addition & 1 deletion probe/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node {
ContainerIPs: strings.Join(ips, " "),
ContainerIPsWithScopes: strings.Join(ipsWithScopes, " "),
ContainerHostname: c.Hostname(),
})
}).WithCapabilities(RestartContainer)
AddLabels(result, c.container.Config.Labels)

if c.latestStats == nil {
Expand Down
4 changes: 4 additions & 0 deletions probe/docker/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (r *Reporter) Report() (report.Report, error) {

func (r *Reporter) containerTopology(localAddrs []net.IP) report.Topology {
result := report.MakeTopology()
result.Capabilities[RestartContainer] = report.Capability{
ID: RestartContainer,
Human: "Restart Container",
}

r.registry.WalkContainers(func(c Container) {
nodeID := report.MakeContainerNodeID(r.hostID, c.ID())
Expand Down
9 changes: 8 additions & 1 deletion probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"syscall"
"time"

"github.com/weaveworks/scope/probe/capabilities"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/host"
Expand Down Expand Up @@ -101,7 +102,13 @@ func main() {
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()

resolver := newStaticResolver(targets, publishers.Set)
clients := xfer.NewMultiAppClient(xfer.ProbeConfig{
Token: *token,
ProbeID: probeID,
Insecure: *insecure,
}, capabilities.HandleCapabilityRequest)

resolver := newStaticResolver(targets, publishers.Set, clients.Set)
defer resolver.Stop()

endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
Expand Down
12 changes: 8 additions & 4 deletions probe/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ var (
lookupIP = net.LookupIP
)

type setter func(string, []string)

type staticResolver struct {
set func(string, []string)
setters []setter
targets []target
quit chan struct{}
}
Expand All @@ -32,10 +34,10 @@ func (t target) String() string { return net.JoinHostPort(t.host, t.port) }
// newStaticResolver periodically resolves the targets, and calls the set
// function with all the resolved IPs. It explictiy supports targets which
// resolve to multiple IPs.
func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver {
func newStaticResolver(targets []string, setters ...setter) staticResolver {
r := staticResolver{
targets: prepare(targets),
set: set,
setters: setters,
quit: make(chan struct{}),
}
go r.loop()
Expand Down Expand Up @@ -80,7 +82,9 @@ func prepare(strs []string) []target {

func (r staticResolver) resolve() {
for t, endpoints := range resolveMany(r.targets) {
r.set(t.String(), endpoints)
for _, setter := range r.setters {
setter(t.String(), endpoints)
}
}
}

Expand Down
43 changes: 43 additions & 0 deletions report/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package report

// Capabilities describe the capability tags within the NodeMetadata
type Capabilities map[string]Capability

// A Capability basically describes an RPC
type Capability struct {
ID string
Human string
Args []Arg
}

type Arg struct {
Name string
Human string
Type ArgType
}

type ArgType int

const (
Duration ArgType = iota
)

func (cs Capabilities) Merge(other Capabilities) Capabilities {
result := cs.Copy()
for k, v := range other {
result[k] = v
}
return result
}

func (cs Capabilities) Copy() Capabilities {
result := Capabilities{}
for k, v := range cs {
result[k] = v
}
return result
}

func (cs Capabilities) AddCapability(c Capability) {
cs[c.ID] = c
}
Loading

0 comments on commit 4ec4b3e

Please sign in to comment.