diff --git a/app/capabilities.go b/app/capabilities.go new file mode 100644 index 0000000000..93068cd22b --- /dev/null +++ b/app/capabilities.go @@ -0,0 +1,166 @@ +package main + +import ( + "encoding/json" + "log" + "net/http" + "math/rand" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" +) + +const capabilityKey = "capability" + +type CapabilityRouter interface { + HandleCapability(xfer.Reporter, APITopologyDesc, http.ResponseWriter, *http.Request) + HandleProbeWS(w http.ResponseWriter, r *http.Request) + Stop() +} + +type capabilityRouter struct { + sync.Mutex + quit chan struct{} + probes map[string]xfer.CapabilityHandler +} + +func NewCapabilitiyRouter() CapabilityRouter { + return &capabilityRouter{ + probes: map[string]xfer.CapabilityHandler{}, + } +} + +func (cr *capabilityRouter) Stop() { + close(cr.quit) +} + +// Capabilities +func (cr *capabilityRouter) HandleCapability(rep xfer.Reporter, t APITopologyDesc, w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + nodeID = vars["id"] + node, nodeOK = t.renderer.Render(rep.Report())[nodeID] + arguments = r.URL.Query() + ) + if !nodeOK { + http.NotFound(w, r) + return + } + + originHost, ok := node.Metadata[report.HostNodeID] + if !ok { + http.NotFound(w, r) + return + } + + handler, ok := cr.probes[originHost] + if !ok { + http.NotFound(w, r) + return + } + + capabilities, ok := arguments[capabilityKey] + if !ok || len(capabilities) > 1 { + respondWith(w, http.StatusBadRequest, capabilityKey) + return + } + capability := capabilities[0] + delete(arguments, capabilityKey) + + result := handler(xfer.Request{ + ID: rand.Int63(), + NodeID: nodeID, + Capability: capability, + }) + if result.Error != nil { + respondWith(w, http.StatusBadRequest, result.Error) + return + } + respondWith(w, http.StatusOK, result.Value) +} + +func (cr *capabilityRouter) HandleProbeWS(w http.ResponseWriter, r *http.Request) { + probeid := r.Header.Get(xfer.ScopeProbeIDHeader) + if probeid == "" { + respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Error upgrading to websocket: %v", err) + return + } + defer conn.Close() + + incoming, outgoing := make(chan xfer.Response), make(chan xfer.Request) + quit := make(chan struct{}) + + go func(c *websocket.Conn) { + defer close(quit) + + for { // just discard everything the browser sends + _, reader, err := c.NextReader() + if err != nil { + log.Printf("Error reading message from %s: %v", probeid, err) + return + } + + var response xfer.Response + if err := json.NewDecoder(reader).Decode(&response); err != nil { + log.Printf("Error parsing message from %s: %v", probeid, err) + return + } + + incoming <- response + } + }(conn) + + go func(c *websocket.Conn) { + defer close(quit) + + for { + msg := <-outgoing + if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil { + log.Printf("Error SetWriteDeadline for %s: %v", probeid, err) + return + } + if err := conn.WriteJSON(msg); err != nil { + log.Printf("Error sending message to %s: %v", probeid, err) + return + } + } + }(conn) + + outstanding := xfer.NewCapabilityMux() + + cr.Lock() + cr.probes[probeid] = func(req xfer.Request) xfer.Response { + outgoing <- req + return outstanding.WaitForResponse(req.ID) + } + cr.Unlock() + + defer func() { + cr.Lock() + defer cr.Unlock() + delete(cr.probes, probeid) + // TODO: cancel outstanding operations + }() + + select { + case <-cr.quit: + return + case <-quit: + return + case res := <-incoming: + if err := outstanding.ResponseReceived(res); err != nil { + log.Printf("Invalid message: %v", err) + } + } +} diff --git a/app/router.go b/app/router.go index 340c4b8375..d5fb5eb1d0 100644 --- a/app/router.go +++ b/app/router.go @@ -59,7 +59,7 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc { // resources for the UI. func Router(c collector) *mux.Router { router := mux.NewRouter() - router.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST") + capabilitiesRouter := NewCapabilitiyRouter() get := router.Methods("GET").Subrouter() get.HandleFunc("/api", gzipHandler(apiHandler)) @@ -75,8 +75,13 @@ func Router(c collector) *mux.Router { get.MatcherFunc(URLMatcher("/api/origin/host/{id}")).HandlerFunc( gzipHandler(makeOriginHostHandler(c))) get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c))) + get.HandleFunc("/api/capabilities", capabilitiesRouter.HandleProbeWS) get.PathPrefix("/").Handler(http.FileServer(FS(false))) // everything else is static + post := router.Methods("POST").Subrouter() + post.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST") + post.MatcherFunc(URLMatcher("/api/topology/{topology}/{id}")).HandlerFunc( + topologyRegistry.captureTopology(c, capabilitiesRouter.HandleCapability)) return router } @@ -107,12 +112,6 @@ func makeReportPostHandler(a xfer.Adder) http.HandlerFunc { } } -// APIDetails are some generic details that can be fetched from /api -type APIDetails struct { - ID string `json:"id"` - Version string `json:"version"` -} - func apiHandler(w http.ResponseWriter, r *http.Request) { - respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version}) + respondWith(w, http.StatusOK, xfer.Details{ID: uniqueID, Version: version}) } diff --git a/probe/capabilities/capabilities.go b/probe/capabilities/capabilities.go new file mode 100644 index 0000000000..e28bcc4140 --- /dev/null +++ b/probe/capabilities/capabilities.go @@ -0,0 +1,35 @@ +package capabilities + +import ( + "fmt" + "sync" + + "github.com/weaveworks/scope/xfer" +) + +var ( + mtx = sync.Mutex{} + handlers = map[string]xfer.CapabilityHandler{} +) + +func HandleCapabilityRequest(req xfer.Request) xfer.Response { + mtx.Lock() + handler, ok := handlers[req.Capability] + mtx.Unlock() + if !ok { + return xfer.Response{ + ID: req.ID, + Error: fmt.Errorf("Capability '%s' not recognised", req.Capability), + } + } + + response := handler(req) + response.ID = req.ID + return response +} + +func Register(capability string, f xfer.CapabilityHandler) { + mtx.Lock() + defer mtx.Unlock() + handlers[capability] = f +} diff --git a/probe/docker/capabilities.go b/probe/docker/capabilities.go new file mode 100644 index 0000000000..ab494a1e99 --- /dev/null +++ b/probe/docker/capabilities.go @@ -0,0 +1,19 @@ +package docker + +import ( + "log" + + "github.com/weaveworks/scope/probe/capabilities" + "github.com/weaveworks/scope/xfer" +) + +const RestartContainer = "docker_restart_container" + +func init() { + capabilities.Register(RestartContainer, func(req xfer.Request) xfer.Response { + log.Printf("Got %s", req.Capability) + return xfer.Response{ + Value: "Hello World", + } + }) +} diff --git a/probe/docker/container.go b/probe/docker/container.go index 8f78d34bbb..b1fed78aba 100644 --- a/probe/docker/container.go +++ b/probe/docker/container.go @@ -241,7 +241,7 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node { ContainerIPs: strings.Join(ips, " "), ContainerIPsWithScopes: strings.Join(ipsWithScopes, " "), ContainerHostname: c.Hostname(), - }) + }).WithCapabilities(RestartContainer) AddLabels(result, c.container.Config.Labels) if c.latestStats == nil { diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index 01b6fbd7e7..9654aaa814 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -43,6 +43,10 @@ func (r *Reporter) Report() (report.Report, error) { func (r *Reporter) containerTopology(localAddrs []net.IP) report.Topology { result := report.MakeTopology() + result.Capabilities[RestartContainer] = report.Capability{ + ID: RestartContainer, + Human: "Restart Container", + } r.registry.WalkContainers(func(c Container) { nodeID := report.MakeContainerNodeID(r.hostID, c.ID()) diff --git a/probe/main.go b/probe/main.go index 2d92f2cd57..f65e475911 100644 --- a/probe/main.go +++ b/probe/main.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/weaveworks/scope/probe/capabilities" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/host" @@ -101,7 +102,13 @@ func main() { publishers := xfer.NewMultiPublisher(factory) defer publishers.Stop() - resolver := newStaticResolver(targets, publishers.Set) + clients := xfer.NewMultiAppClient(xfer.ProbeConfig{ + Token: *token, + ProbeID: probeID, + Insecure: *insecure, + }, capabilities.HandleCapabilityRequest) + + resolver := newStaticResolver(targets, publishers.Set, clients.Set) defer resolver.Stop() endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) diff --git a/probe/resolver.go b/probe/resolver.go index 8e8b90b79d..1814b68b1c 100644 --- a/probe/resolver.go +++ b/probe/resolver.go @@ -19,8 +19,10 @@ var ( lookupIP = net.LookupIP ) +type setter func(string, []string) + type staticResolver struct { - set func(string, []string) + setters []setter targets []target quit chan struct{} } @@ -32,10 +34,10 @@ func (t target) String() string { return net.JoinHostPort(t.host, t.port) } // newStaticResolver periodically resolves the targets, and calls the set // function with all the resolved IPs. It explictiy supports targets which // resolve to multiple IPs. -func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver { +func newStaticResolver(targets []string, setters ...setter) staticResolver { r := staticResolver{ targets: prepare(targets), - set: set, + setters: setters, quit: make(chan struct{}), } go r.loop() @@ -80,7 +82,9 @@ func prepare(strs []string) []target { func (r staticResolver) resolve() { for t, endpoints := range resolveMany(r.targets) { - r.set(t.String(), endpoints) + for _, setter := range r.setters { + setter(t.String(), endpoints) + } } } diff --git a/report/capabilities.go b/report/capabilities.go new file mode 100644 index 0000000000..28a0816b51 --- /dev/null +++ b/report/capabilities.go @@ -0,0 +1,43 @@ +package report + +// Capabilities describe the capability tags within the NodeMetadata +type Capabilities map[string]Capability + +// A Capability basically describes an RPC +type Capability struct { + ID string + Human string + Args []Arg +} + +type Arg struct { + Name string + Human string + Type ArgType +} + +type ArgType int + +const ( + Duration ArgType = iota +) + +func (cs Capabilities) Merge(other Capabilities) Capabilities { + result := cs.Copy() + for k, v := range other { + result[k] = v + } + return result +} + +func (cs Capabilities) Copy() Capabilities { + result := Capabilities{} + for k, v := range cs { + result[k] = v + } + return result +} + +func (cs Capabilities) AddCapability(c Capability) { + cs[c.ID] = c +} diff --git a/report/topology.go b/report/topology.go index 0fb7b97650..00108886af 100644 --- a/report/topology.go +++ b/report/topology.go @@ -11,12 +11,14 @@ import ( // in the Node struct. type Topology struct { Nodes // TODO(pb): remove Nodes intermediate type + Capabilities } // MakeTopology gives you a Topology. func MakeTopology() Topology { return Topology{ - Nodes: map[string]Node{}, + Nodes: map[string]Node{}, + Capabilities: Capabilities{}, } } @@ -36,7 +38,8 @@ func (t Topology) AddNode(nodeID string, nmd Node) Topology { // Copy returns a value copy of the Topology. func (t Topology) Copy() Topology { return Topology{ - Nodes: t.Nodes.Copy(), + Nodes: t.Nodes.Copy(), + Capabilities: t.Capabilities.Copy(), } } @@ -44,7 +47,8 @@ func (t Topology) Copy() Topology { // The original is not modified. func (t Topology) Merge(other Topology) Topology { return Topology{ - Nodes: t.Nodes.Merge(other.Nodes), + Nodes: t.Nodes.Merge(other.Nodes), + Capabilities: t.Capabilities.Merge(other.Capabilities), } } @@ -78,19 +82,21 @@ func (n Nodes) Merge(other Nodes) Nodes { // given node in a given topology, along with the edges emanating from the // node and metadata about those edges. type Node struct { - Metadata `json:"metadata,omitempty"` - Counters `json:"counters,omitempty"` - Adjacency IDList `json:"adjacency"` - Edges EdgeMetadatas `json:"edges,omitempty"` + Metadata `json:"metadata,omitempty"` + Counters `json:"counters,omitempty"` + Adjacency IDList `json:"adjacency"` + Edges EdgeMetadatas `json:"edges,omitempty"` + Capabilities IDList `json:"capabilities,omitempty"` } // MakeNode creates a new Node with no initial metadata. func MakeNode() Node { return Node{ - Metadata: Metadata{}, - Counters: Counters{}, - Adjacency: MakeIDList(), - Edges: EdgeMetadatas{}, + Metadata: Metadata{}, + Counters: Counters{}, + Adjacency: MakeIDList(), + Edges: EdgeMetadatas{}, + Capabilities: MakeIDList(), } } @@ -129,6 +135,13 @@ func (n Node) WithEdge(dst string, md EdgeMetadata) Node { return result } +// WithCapabilities returns a fresh copy of n, with cs added to Capabilites. +func (n Node) WithCapabilities(cs ...string) Node { + result := n.Copy() + result.Capabilities = result.Capabilities.Add(cs...) + return result +} + // Copy returns a value copy of the Node. func (n Node) Copy() Node { cp := MakeNode() @@ -136,6 +149,7 @@ func (n Node) Copy() Node { cp.Counters = n.Counters.Copy() cp.Adjacency = n.Adjacency.Copy() cp.Edges = n.Edges.Copy() + cp.Capabilities = n.Capabilities.Copy() return cp } @@ -147,6 +161,7 @@ func (n Node) Merge(other Node) Node { cp.Counters = cp.Counters.Merge(other.Counters) cp.Adjacency = cp.Adjacency.Merge(other.Adjacency) cp.Edges = cp.Edges.Merge(other.Edges) + cp.Capabilities = cp.Capabilities.Merge(other.Capabilities) return cp } diff --git a/xfer/app_client.go b/xfer/app_client.go new file mode 100644 index 0000000000..0b5b1ab801 --- /dev/null +++ b/xfer/app_client.go @@ -0,0 +1,154 @@ +package xfer + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + + "github.com/gorilla/websocket" + + "github.com/weaveworks/scope/common/sanitize" +) + +// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The +// ID is currently set to the probe's hostname. It's designed to deduplicate +// reports from the same probe to the same receiver, in case the probe is +// configured to publish to multiple receivers that resolve to the same app. +const ScopeProbeIDHeader = "X-Scope-Probe-ID" + +// Details are some generic details that can be fetched from /api +type Details struct { + ID string `json:"id"` + Version string `json:"version"` +} + +type AppClient interface { + Details() (Details, error) + Capabilities(handler CapabilityHandler) error + Stop() +} + +type appClient struct { + ProbeConfig + + quit chan struct{} + target string + insecure bool + client http.Client +} + +func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) { + httpTransport, err := getHTTPTransport(hostname, pc.Insecure) + if err != nil { + return nil, err + } + + return &appClient{ + ProbeConfig: pc, + quit: make(chan struct{}), + target: target, + client: http.Client{ + Transport: httpTransport, + }, + }, nil +} + +func (c *appClient) Stop() { + close(c.quit) +} + +// AuthorizationHeader returns a value suitable for an HTTP Authorization +// header, based on the passed token string. +func AuthorizationHeader(token string) string { + return fmt.Sprintf("Scope-Probe token=%s", token) +} + +func (c *appClient) authoriseHeaders(headers http.Header) { + headers.Set("Authorization", AuthorizationHeader(c.Token)) + headers.Set(ScopeProbeIDHeader, c.ProbeID) +} + +func (c *appClient) authorizedRequest(method string, urlStr string, body io.Reader) (*http.Request, error) { + req, err := http.NewRequest(method, urlStr, body) + if err == nil { + c.authoriseHeaders(req.Header) + } + return req, err +} + +func (c *appClient) Details() (Details, error) { + result := Details{} + req, err := c.authorizedRequest("GET", sanitize.URL("", 0, "/api")(c.target), nil) + if err != nil { + return result, err + } + resp, err := c.client.Do(req) + if err != nil { + return result, err + } + defer resp.Body.Close() + return result, json.NewDecoder(resp.Body).Decode(&result) +} + +func (c *appClient) Capabilities(handler CapabilityHandler) error { + dialer := websocket.Dialer{} + headers := http.Header{} + c.authoriseHeaders(headers) + // TODO(twilkie) need to update sanitize to work with wss + url := sanitize.URL("ws://", 0, "/api/capabilities")(c.target) + conn, _, err := dialer.Dial(url, headers) + if err != nil { + return err + } + defer conn.Close() + + incoming, outgoing := make(chan Request), make(chan Response) + quit := make(chan struct{}) + + go func(c *websocket.Conn) { + defer close(quit) + + for { // just discard everything the browser sends + _, reader, err := c.NextReader() + if err != nil { + log.Printf("Error reading message from %s: %v", err) + return + } + + var request Request + if err := json.NewDecoder(reader).Decode(&request); err != nil { + log.Printf("Error parsing message from %s: %v", err) + return + } + + incoming <- request + } + }(conn) + + go func(c *websocket.Conn) { + defer close(quit) + + for response := range outgoing { + //if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil { + // log.Printf("Error SetWriteDeadline for %s: %v", err) + // return + //} + if err := conn.WriteJSON(response); err != nil { + log.Printf("Error sending response to %s: %v", err) + return + } + } + }(conn) + + for { + select { + case req := <-incoming: + // TODO process more than one request at once. + outgoing <- handler(req) + case <-c.quit: + case <-quit: + } + } +} diff --git a/xfer/capabilities.go b/xfer/capabilities.go new file mode 100644 index 0000000000..a760eb5198 --- /dev/null +++ b/xfer/capabilities.go @@ -0,0 +1,59 @@ +package xfer + +import ( + "fmt" + "sync" +) + +type Request struct { + ID int64 + NodeID string + Capability string + Args map[string]interface{} +} + +type Response struct { + ID int64 + Value interface{} + Error error +} + +type CapabilityHandler func(Request) Response + +type CapabilityMux interface { + ResponseReceived(Response) error + WaitForResponse(int64) Response +} + +type capabilityMux struct { + sync.Mutex + outstanding map[int64]chan Response +} + +func NewCapabilityMux() CapabilityMux { + return &capabilityMux{ + outstanding: map[int64]chan Response{}, + } +} + +func (c *capabilityMux) ResponseReceived(r Response) error { + c.Lock() + defer c.Unlock() + + callee, ok := c.outstanding[r.ID] + if !ok { + return fmt.Errorf("No pending request for %d", r.ID) + } + + delete(c.outstanding, r.ID) + callee <- r + return nil +} + +func (c *capabilityMux) WaitForResponse(id int64) Response { + wait := make(chan Response) + c.Lock() + c.outstanding[id] = wait + c.Unlock() + return <-wait +} diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index 6accfd7aeb..5b524eada4 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -84,19 +84,23 @@ func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (s return apiResponse.ID, p, nil } +func (p HTTPPublisher) String() string { + return p.url +} + +func (p HTTPPublisher) authoriseHeaders(headers http.Header) { + headers.Set("Authorization", AuthorizationHeader(p.token)) + headers.Set(ScopeProbeIDHeader, p.probeID) +} + func (p HTTPPublisher) authorizedRequest(method string, urlStr string, body io.Reader) (*http.Request, error) { req, err := http.NewRequest(method, urlStr, body) if err == nil { - req.Header.Set("Authorization", AuthorizationHeader(p.token)) - req.Header.Set(ScopeProbeIDHeader, p.probeID) + p.authoriseHeaders(req.Header) } return req, err } -func (p HTTPPublisher) String() string { - return p.url -} - // Publish publishes the report to the URL. func (p HTTPPublisher) Publish(r io.Reader) error { req, err := p.authorizedRequest("POST", p.url, r) @@ -121,14 +125,3 @@ func (p HTTPPublisher) Publish(r io.Reader) error { // Stop implements Publisher func (p HTTPPublisher) Stop() {} -// AuthorizationHeader returns a value suitable for an HTTP Authorization -// header, based on the passed token string. -func AuthorizationHeader(token string) string { - return fmt.Sprintf("Scope-Probe token=%s", token) -} - -// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The -// ID is currently set to the probe's hostname. It's designed to deduplicate -// reports from the same probe to the same receiver, in case the probe is -// configured to publish to multiple receivers that resolve to the same app. -const ScopeProbeIDHeader = "X-Scope-Probe-ID" diff --git a/xfer/multi_client.go b/xfer/multi_client.go new file mode 100644 index 0000000000..65f6476b90 --- /dev/null +++ b/xfer/multi_client.go @@ -0,0 +1,102 @@ +package xfer + +import ( + "log" + "sync" + "time" +) + +type ProbeConfig struct { + Token string + ProbeID string + Insecure bool +} + +type multiClient struct { + handler CapabilityHandler + ProbeConfig + + mtx sync.Mutex + sema semaphore + clients map[string]AppClient + quit chan struct{} +} + +type MultiAppClient interface { + Set(hostname string, endpoints []string) +} + +// NewMultiClient ... +func NewMultiAppClient(pc ProbeConfig, handler CapabilityHandler) MultiAppClient { + return &multiClient{ + ProbeConfig: pc, + handler: handler, + + sema: newSemaphore(maxConcurrentGET), + clients: map[string]AppClient{}, + quit: make(chan struct{}), + } +} + +// Set ... +func (c *multiClient) Set(hostname string, endpoints []string) { + for _, endpoint := range endpoints { + go func(endpoint string) { + c.sema.p() + defer c.sema.v() + + client, err := NewAppClient(c.ProbeConfig, hostname, endpoint) + if err != nil { + log.Printf("Error creating new app client: %v", err) + return + } + + details, err := client.Details() + if err != nil { + log.Printf("Error fetching app details: %v", err) + } + + c.mtx.Lock() + defer c.mtx.Unlock() + _, ok := c.clients[details.ID] + if !ok { + c.clients[details.ID] = client + go c.capabilityLoop(endpoint, client) + } + }(endpoint) + } +} + +func (c *multiClient) capabilityLoop(endpoint string, client AppClient) { + backoff := initialBackoff + + for { + err := client.Capabilities(c.handler) + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error doing capabilities for %s, backing off %s: %v", endpoint, backoff, err) + select { + case <-time.After(backoff): + case <-c.quit: + return + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +// Stop ... +func (c *multiClient) Stop() { + c.mtx.Lock() + defer c.mtx.Unlock() + for _, c := range c.clients { + c.Stop() + } + c.clients = map[string]AppClient{} + close(c.quit) +}