diff --git a/app/api_topologies.go b/app/api_topologies.go index 7361fa91c3..3d9d68343c 100644 --- a/app/api_topologies.go +++ b/app/api_topologies.go @@ -23,7 +23,7 @@ var ( renderer: render.PodRenderer, Name: "Pods", Options: map[string][]APITopologyOption{"system": { - {"show", "System containers shown", false, nop}, + {"show", "System containers shown", false, render.FilterNoop}, {"hide", "System containers hidden", true, render.FilterSystem}, }}, }, @@ -33,7 +33,7 @@ var ( renderer: render.PodServiceRenderer, Name: "by service", Options: map[string][]APITopologyOption{"system": { - {"show", "System containers shown", false, nop}, + {"show", "System containers shown", false, render.FilterNoop}, {"hide", "System containers hidden", true, render.FilterSystem}, }}, }, @@ -41,6 +41,17 @@ var ( ) func init() { + containerFilters := map[string][]APITopologyOption{ + "system": { + {"show", "System containers shown", false, render.FilterNoop}, + {"hide", "System containers hidden", true, render.FilterSystem}, + }, + "stopped": { + {"show", "Stopped containers shown", false, render.FilterNoop}, + {"hide", "Stopped containers hidden", true, render.FilterStopped}, + }, + } + // Topology option labels should tell the current state. The first item must // be the verb to get to that state topologyRegistry.add( @@ -51,7 +62,7 @@ func init() { Options: map[string][]APITopologyOption{"unconnected": { // Show the user why there are filtered nodes in this view. // Don't give them the option to show those nodes. - {"hide", "Unconnected nodes hidden", true, nop}, + {"hide", "Unconnected nodes hidden", true, render.FilterNoop}, }}, }, APITopologyDesc{ @@ -61,37 +72,28 @@ func init() { Name: "by name", Options: map[string][]APITopologyOption{"unconnected": { // Ditto above. - {"hide", "Unconnected nodes hidden", true, nop}, + {"hide", "Unconnected nodes hidden", true, render.FilterNoop}, }}, }, APITopologyDesc{ id: "containers", renderer: render.ContainerWithImageNameRenderer, Name: "Containers", - Options: map[string][]APITopologyOption{"system": { - {"show", "System containers shown", false, nop}, - {"hide", "System containers hidden", true, render.FilterSystem}, - }}, + Options: containerFilters, }, APITopologyDesc{ id: "containers-by-image", parent: "containers", renderer: render.ContainerImageRenderer, Name: "by image", - Options: map[string][]APITopologyOption{"system": { - {"show", "System containers shown", false, nop}, - {"hide", "System containers hidden", true, render.FilterSystem}, - }}, + Options: containerFilters, }, APITopologyDesc{ id: "containers-by-hostname", parent: "containers", renderer: render.ContainerHostnameRenderer, Name: "by hostname", - Options: map[string][]APITopologyOption{"system": { - {"show", "System containers shown", false, nop}, - {"hide", "System containers hidden", true, render.FilterSystem}, - }}, + Options: containerFilters, }, APITopologyDesc{ id: "hosts", @@ -226,8 +228,6 @@ func decorateWithStats(rpt report.Report, renderer render.Renderer) topologyStat } } -func nop(r render.Renderer) render.Renderer { return r } - func (r *registry) enableKubernetesTopologies() { r.add(kubernetesTopologies...) } diff --git a/app/api_topology_test.go b/app/api_topology_test.go index ef5d806b3c..c4db880d84 100644 --- a/app/api_topology_test.go +++ b/app/api_topology_test.go @@ -60,8 +60,13 @@ func TestAPITopologyContainers(t *testing.T) { if err := json.Unmarshal(body, &topo); err != nil { t.Fatal(err) } + want := expected.RenderedContainers.Copy() + for id, node := range want { + node.ControlNode = "" + want[id] = node + } - if want, have := expected.RenderedContainers, topo.Nodes.Prune(); !reflect.DeepEqual(want, have) { + if have := topo.Nodes.Prune(); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } diff --git a/app/controls.go b/app/controls.go new file mode 100644 index 0000000000..4e59d9c4b4 --- /dev/null +++ b/app/controls.go @@ -0,0 +1,121 @@ +package main + +import ( + "log" + "math/rand" + "net/http" + "net/rpc" + "sync" + + "github.com/gorilla/mux" + + "github.com/weaveworks/scope/xfer" +) + +func registerControlRoutes(router *mux.Router) { + controlRouter := &controlRouter{ + probes: map[string]controlHandler{}, + } + router.Methods("GET").Path("/api/control/ws").HandlerFunc(controlRouter.handleProbeWS) + router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).HandlerFunc(controlRouter.handleControl) +} + +type controlHandler struct { + id int64 + client *rpc.Client +} + +func (ch *controlHandler) handle(req xfer.Request) xfer.Response { + var res xfer.Response + if err := ch.client.Call("control.Handle", req, &res); err != nil { + return xfer.ResponseError(err) + } + return res +} + +type controlRouter struct { + sync.Mutex + probes map[string]controlHandler +} + +func (cr *controlRouter) get(probeID string) (controlHandler, bool) { + cr.Lock() + defer cr.Unlock() + handler, ok := cr.probes[probeID] + return handler, ok +} + +func (cr *controlRouter) set(probeID string, handler controlHandler) { + cr.Lock() + defer cr.Unlock() + cr.probes[probeID] = handler +} + +func (cr *controlRouter) rm(probeID string, handler controlHandler) { + cr.Lock() + defer cr.Unlock() + // NB probe might have reconnected in the mean time, need to ensure we do not + // delete new connection! Also, it might have connected then deleted itself! + if cr.probes[probeID].id == handler.id { + delete(cr.probes, probeID) + } +} + +// handleControl routes control requests from the client to the appropriate +// probe. Its is blocking. +func (cr *controlRouter) handleControl(w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + probeID = vars["probeID"] + nodeID = vars["nodeID"] + control = vars["control"] + ) + handler, ok := cr.get(probeID) + if !ok { + log.Printf("Probe %s is not connected right now...", probeID) + http.NotFound(w, r) + return + } + + result := handler.handle(xfer.Request{ + ID: rand.Int63(), + NodeID: nodeID, + Control: control, + }) + if result.Error != "" { + respondWith(w, http.StatusBadRequest, result.Error) + return + } + respondWith(w, http.StatusOK, result.Value) +} + +// handleProbeWS accepts websocket connections from the probe and registers +// them in the control router, such that HandleControl calls can find them. +func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) { + probeID := r.Header.Get(xfer.ScopeProbeIDHeader) + if probeID == "" { + respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Error upgrading to websocket: %v", err) + return + } + defer conn.Close() + + codec := xfer.NewJSONWebsocketCodec(conn) + client := rpc.NewClientWithCodec(codec) + handler := controlHandler{ + id: rand.Int63(), + client: client, + } + + cr.set(probeID, handler) + + codec.WaitForReadError() + + cr.rm(probeID, handler) + client.Close() +} diff --git a/app/controls_test.go b/app/controls_test.go new file mode 100644 index 0000000000..d5f3a173dc --- /dev/null +++ b/app/controls_test.go @@ -0,0 +1,69 @@ +package main + +import ( + "encoding/json" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/weaveworks/scope/xfer" + + "github.com/gorilla/mux" +) + +func TestControl(t *testing.T) { + router := mux.NewRouter() + registerControlRoutes(router) + server := httptest.NewServer(router) + defer server.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(server.URL, "http://")) + if err != nil { + t.Fatal(err) + } + + probeConfig := xfer.ProbeConfig{ + ProbeID: "foo", + } + client, err := xfer.NewAppClient(probeConfig, ip+":"+port, ip+":"+port) + if err != nil { + t.Fatal(err) + } + defer client.Stop() + + client.ControlConnection(xfer.ControlHandlerFunc(func(req xfer.Request) xfer.Response { + if req.NodeID != "nodeid" { + t.Fatalf("'%s' != 'nodeid'", req.NodeID) + } + + if req.Control != "control" { + t.Fatalf("'%s' != 'control'", req.Control) + } + + return xfer.Response{ + Value: "foo", + } + })) + + time.Sleep(100 * time.Millisecond) + + httpClient := http.Client{ + Timeout: 1 * time.Second, + } + resp, err := httpClient.Post(server.URL+"/api/control/foo/nodeid/control", "", nil) + if err != nil { + t.Fatal(err) + } + + var response string + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + t.Fatal(err) + } + + if response != "foo" { + t.Fatalf("'%s' != 'foo'", response) + } +} diff --git a/app/main.go b/app/main.go index 58b7ff0bd1..64dd102d89 100644 --- a/app/main.go +++ b/app/main.go @@ -14,6 +14,8 @@ import ( "syscall" "time" + "github.com/gorilla/mux" + "github.com/weaveworks/scope/xfer" ) @@ -25,6 +27,19 @@ var ( uniqueID = "0" ) +func registerStatic(router *mux.Router) { + router.Methods("GET").PathPrefix("/").Handler(http.FileServer(FS(false))) +} + +// Router creates the mux for all the various app components. +func Router(c collector) *mux.Router { + router := mux.NewRouter() + registerTopologyRoutes(c, router) + registerControlRoutes(router) + registerStatic(router) + return router +} + func main() { var ( window = flag.Duration("window", 15*time.Second, "window") diff --git a/app/router.go b/app/router.go index d05afa2111..88a4a33bf4 100644 --- a/app/router.go +++ b/app/router.go @@ -11,6 +11,7 @@ import ( "github.com/gorilla/mux" "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" ) // URLMatcher uses request.RequestURI (the raw, unparsed request) to attempt @@ -53,13 +54,7 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc { return handlers.GZIPHandlerFunc(h, nil) } -// Router returns the HTTP dispatcher, managing API and UI requests, and -// accepting reports from probes.. It will always use the embedded HTML -// resources for the UI. -func Router(c collector) *mux.Router { - router := mux.NewRouter() - router.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST") - +func registerTopologyRoutes(c collector, router *mux.Router) { get := router.Methods("GET").Subrouter() get.HandleFunc("/api", gzipHandler(apiHandler)) get.HandleFunc("/api/topology", gzipHandler(topologyRegistry.makeTopologyList(c))) @@ -74,9 +69,9 @@ func Router(c collector) *mux.Router { get.MatcherFunc(URLMatcher("/api/origin/host/{id}")).HandlerFunc( gzipHandler(makeOriginHostHandler(c))) get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c))) - get.PathPrefix("/").Handler(http.FileServer(FS(false))) // everything else is static - return router + post := router.Methods("POST").Subrouter() + post.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST") } func makeReportPostHandler(a Adder) http.HandlerFunc { @@ -106,12 +101,6 @@ func makeReportPostHandler(a Adder) http.HandlerFunc { } } -// APIDetails are some generic details that can be fetched from /api -type APIDetails struct { - ID string `json:"id"` - Version string `json:"version"` -} - func apiHandler(w http.ResponseWriter, r *http.Request) { - respondWith(w, http.StatusOK, APIDetails{ID: uniqueID, Version: version}) + respondWith(w, http.StatusOK, xfer.Details{ID: uniqueID, Version: version}) } diff --git a/app/site_test.go b/app/site_test.go index 1bbc67012f..09b3a96bb5 100644 --- a/app/site_test.go +++ b/app/site_test.go @@ -4,11 +4,15 @@ package main import ( "net/http/httptest" "testing" + + "github.com/gorilla/mux" ) // Test site func TestSite(t *testing.T) { - ts := httptest.NewServer(Router(StaticReport{})) + router := mux.NewRouter() + registerStatic(router) + ts := httptest.NewServer(router) defer ts.Close() is200(t, ts, "/") diff --git a/client/.eslintrc b/client/.eslintrc index 4dce480aff..d5928ee0c6 100644 --- a/client/.eslintrc +++ b/client/.eslintrc @@ -26,6 +26,9 @@ "templateStrings": true, "jsx": true }, + "globals": { + __WS_URL__: false + }, "rules": { /** * Strict mode diff --git a/client/app/scripts/actions/app-actions.js b/client/app/scripts/actions/app-actions.js index 46931a6d3c..3950ec5253 100644 --- a/client/app/scripts/actions/app-actions.js +++ b/client/app/scripts/actions/app-actions.js @@ -66,12 +66,29 @@ module.exports = { }); }, + clearControlError: function() { + AppDispatcher.dispatch({ + type: ActionTypes.CLEAR_CONTROL_ERROR + }); + }, + closeWebsocket: function() { AppDispatcher.dispatch({ type: ActionTypes.CLOSE_WEBSOCKET }); }, + doControl: function(probeId, nodeId, control) { + AppDispatcher.dispatch({ + type: ActionTypes.DO_CONTROL + }); + WebapiUtils.doControl( + probeId, + nodeId, + control, + ); + }, + enterEdge: function(edgeId) { AppDispatcher.dispatch({ type: ActionTypes.ENTER_EDGE, @@ -107,6 +124,19 @@ module.exports = { }); }, + receiveControlError: function(err) { + AppDispatcher.dispatch({ + type: ActionTypes.DO_CONTROL_ERROR, + error: err + }); + }, + + receiveControlSuccess: function() { + AppDispatcher.dispatch({ + type: ActionTypes.DO_CONTROL_SUCCESS + }); + }, + receiveNodeDetails: function(details) { AppDispatcher.dispatch({ type: ActionTypes.RECEIVE_NODE_DETAILS, @@ -139,15 +169,15 @@ module.exports = { receiveApiDetails: function(apiDetails) { AppDispatcher.dispatch({ - type: ActionTypes.RECEIVE_API_DETAILS, - version: apiDetails.version + type: ActionTypes.RECEIVE_API_DETAILS, + version: apiDetails.version }); }, receiveError: function(errorUrl) { AppDispatcher.dispatch({ - errorUrl: errorUrl, - type: ActionTypes.RECEIVE_ERROR + errorUrl: errorUrl, + type: ActionTypes.RECEIVE_ERROR }); }, diff --git a/client/app/scripts/charts/node.js b/client/app/scripts/charts/node.js index 6fd29dfca3..a0dc57af97 100644 --- a/client/app/scripts/charts/node.js +++ b/client/app/scripts/charts/node.js @@ -22,9 +22,9 @@ const Node = React.createClass({ const subLabelOffsetY = labelOffsetY + 17; const isPseudo = !!this.props.pseudo; const color = isPseudo ? '' : this.getNodeColor(this.props.rank); - const onClick = this.props.onClick; const onMouseEnter = this.handleMouseEnter; const onMouseLeave = this.handleMouseLeave; + const onMouseClick = this.handleMouseClick; const classNames = ['node']; const animConfig = [80, 20]; // stiffness, bounce const label = this.ellipsis(props.label, 14, scale(4 * scaleFactor)); @@ -51,7 +51,7 @@ const Node = React.createClass({ const transform = `translate(${interpolated.x.val},${interpolated.y.val})`; return ( + onClick={onMouseClick} onMouseEnter={onMouseEnter} onMouseLeave={onMouseLeave}> {props.highlighted && } @@ -79,6 +79,11 @@ const Node = React.createClass({ return truncatedText; }, + handleMouseClick: function(ev) { + ev.stopPropagation(); + AppActions.clickNode(ev.currentTarget.id); + }, + handleMouseEnter: function(ev) { AppActions.enterNode(ev.currentTarget.id); }, diff --git a/client/app/scripts/charts/nodes-chart.js b/client/app/scripts/charts/nodes-chart.js index 1d0245e28e..f2d3a2dd94 100644 --- a/client/app/scripts/charts/nodes-chart.js +++ b/client/app/scripts/charts/nodes-chart.js @@ -218,7 +218,7 @@ const NodesChart = React.createClass({
{errorEmpty} {errorMaxNodesExceeded} - + {function(interpolated) { let interpolatedTranslate = wasShifted ? interpolated.val : panTranslate; @@ -402,7 +402,7 @@ const NodesChart = React.createClass({ isZooming: false, // distinguish pan/zoom from click - handleMouseUp: function() { + handleMouseClick: function() { if (!this.isZooming) { AppActions.clickCloseDetails(); // allow shifts again diff --git a/client/app/scripts/components/__tests__/node-details-test.js b/client/app/scripts/components/__tests__/node-details-test.js index 655751aa4b..d4b17f41d9 100644 --- a/client/app/scripts/components/__tests__/node-details-test.js +++ b/client/app/scripts/components/__tests__/node-details-test.js @@ -2,6 +2,8 @@ jest.dontMock('../node-details.js'); jest.dontMock('../../mixins/node-color-mixin'); jest.dontMock('../../utils/title-utils'); +__WS_URL__ = false + describe('NodeDetails', () => { let NodeDetails; let nodes; diff --git a/client/app/scripts/components/app.js b/client/app/scripts/components/app.js index 1b30c91d0c..aaeabc08e1 100644 --- a/client/app/scripts/components/app.js +++ b/client/app/scripts/components/app.js @@ -20,6 +20,8 @@ const ESC_KEY_CODE = 27; function getStateFromStores() { return { activeTopologyOptions: AppStore.getActiveTopologyOptions(), + controlError: AppStore.getControlError(), + controlPending: AppStore.isControlPending(), currentTopology: AppStore.getCurrentTopology(), currentTopologyId: AppStore.getCurrentTopologyId(), currentTopologyOptions: AppStore.getCurrentTopologyOptions(), @@ -81,6 +83,8 @@ const App = React.createClass({ return (
{showingDetails &&
} diff --git a/client/app/scripts/components/details.js b/client/app/scripts/components/details.js index 811b7b6426..459e6a66fb 100644 --- a/client/app/scripts/components/details.js +++ b/client/app/scripts/components/details.js @@ -2,7 +2,6 @@ const React = require('react'); const mui = require('material-ui'); const Paper = mui.Paper; -const AppActions = require('../actions/app-actions'); const NodeDetails = require('./node-details'); const Details = React.createClass({ @@ -11,21 +10,10 @@ const Details = React.createClass({ return (
-
-
- -
-
- +
); - }, - - handleClickClose: function(ev) { - ev.preventDefault(); - AppActions.clickCloseDetails(); } }); diff --git a/client/app/scripts/components/node-control-button.js b/client/app/scripts/components/node-control-button.js new file mode 100644 index 0000000000..70f8f10a50 --- /dev/null +++ b/client/app/scripts/components/node-control-button.js @@ -0,0 +1,23 @@ +const React = require('react'); +const AppActions = require('../actions/app-actions'); + +const NodeControlButton = React.createClass({ + + render: function() { + let className = `node-control-button fa ${this.props.control.icon}`; + if (this.props.pending) { + className += ' node-control-button-pending'; + } + return ( + + ); + }, + + handleClick: function(ev) { + ev.preventDefault(); + AppActions.doControl(this.props.control.probeId, this.props.control.nodeId, this.props.control.id); + } + +}); + +module.exports = NodeControlButton; diff --git a/client/app/scripts/components/node-details-controls.js b/client/app/scripts/components/node-details-controls.js new file mode 100644 index 0000000000..7123a7b5a2 --- /dev/null +++ b/client/app/scripts/components/node-details-controls.js @@ -0,0 +1,25 @@ +const React = require('react'); + +const NodeControlButton = require('./node-control-button'); + +const NodeDetailsControls = React.createClass({ + + render: function() { + return ( +
+ {this.props.error &&
+ + {this.props.error} +
} + {this.props.controls && this.props.controls.map(control => { + return ( + + ); + })} +
+ ); + } + +}); + +module.exports = NodeDetailsControls; diff --git a/client/app/scripts/components/node-details.js b/client/app/scripts/components/node-details.js index 235a4f34d8..2ee7ec1678 100644 --- a/client/app/scripts/components/node-details.js +++ b/client/app/scripts/components/node-details.js @@ -1,6 +1,7 @@ const _ = require('lodash'); const React = require('react'); +const NodeDetailsControls = require('./node-details-controls'); const NodeDetailsTable = require('./node-details-table'); const NodeColorMixin = require('../mixins/node-color-mixin'); const TitleUtils = require('../utils/title-utils'); @@ -65,6 +66,8 @@ const NodeDetails = React.createClass({ return (
+

{details.label_major}

diff --git a/client/app/scripts/components/nodes.js b/client/app/scripts/components/nodes.js index c6737d4e54..3e913aae74 100644 --- a/client/app/scripts/components/nodes.js +++ b/client/app/scripts/components/nodes.js @@ -1,7 +1,6 @@ const React = require('react'); const NodesChart = require('../charts/nodes-chart'); -const AppActions = require('../actions/app-actions'); const navbarHeight = 160; const marginTop = 0; @@ -23,10 +22,6 @@ const Nodes = React.createClass({ window.removeEventListener('resize', this.handleResize); }, - onNodeClick: function(ev) { - AppActions.clickNode(ev.currentTarget.id); - }, - render: function() { return ( &1 | grep "probe starting" | sed -n 's/^.*ID \([0-9a-f]*\)$/\1/p') +HOSTID=$(echo $HOST1 | cut -d"." -f1) +assert_raises "curl -f -X POST 'http://$HOST1:4040/api/control/$PROBEID/$HOSTID;$CID/docker_stop_container'" + +sleep 5 +assert "docker_on $HOST1 inspect --format='{{.State.Running}}' alpine" "false" + +scope_end_suite diff --git a/probe/controls/controls.go b/probe/controls/controls.go new file mode 100644 index 0000000000..3dcce5d490 --- /dev/null +++ b/probe/controls/controls.go @@ -0,0 +1,44 @@ +package controls + +import ( + "fmt" + "sync" + + "github.com/weaveworks/scope/xfer" +) + +var ( + mtx = sync.Mutex{} + handlers = map[string]xfer.ControlHandlerFunc{} +) + +// HandleControlRequest performs a control request. +func HandleControlRequest(req xfer.Request) xfer.Response { + mtx.Lock() + handler, ok := handlers[req.Control] + mtx.Unlock() + if !ok { + return xfer.Response{ + ID: req.ID, + Error: fmt.Sprintf("Control '%s' not recognised", req.Control), + } + } + + response := handler(req) + response.ID = req.ID + return response +} + +// Register a new control handler under a given id. +func Register(control string, f xfer.ControlHandlerFunc) { + mtx.Lock() + defer mtx.Unlock() + handlers[control] = f +} + +// Rm deletes the handler for a given name +func Rm(control string) { + mtx.Lock() + defer mtx.Unlock() + delete(handlers, control) +} diff --git a/probe/controls/controls_test.go b/probe/controls/controls_test.go new file mode 100644 index 0000000000..b47a2f3df9 --- /dev/null +++ b/probe/controls/controls_test.go @@ -0,0 +1,45 @@ +package controls_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/probe/controls" + "github.com/weaveworks/scope/test" + "github.com/weaveworks/scope/xfer" +) + +func TestControls(t *testing.T) { + controls.Register("foo", func(req xfer.Request) xfer.Response { + return xfer.Response{ + Value: "bar", + } + }) + defer controls.Rm("foo") + + want := xfer.Response{ + ID: 1234, + Value: "bar", + } + have := controls.HandleControlRequest(xfer.Request{ + ID: 1234, + Control: "foo", + }) + if !reflect.DeepEqual(want, have) { + t.Fatal(test.Diff(want, have)) + } +} + +func TestControlsNotFound(t *testing.T) { + want := xfer.Response{ + ID: 3456, + Error: "Control 'baz' not recognised", + } + have := controls.HandleControlRequest(xfer.Request{ + ID: 3456, + Control: "baz", + }) + if !reflect.DeepEqual(want, have) { + t.Fatal(test.Diff(want, have)) + } +} diff --git a/probe/docker/container.go b/probe/docker/container.go index af721b8cd3..8806310ffe 100644 --- a/probe/docker/container.go +++ b/probe/docker/container.go @@ -29,6 +29,7 @@ const ( ContainerIPs = "docker_container_ips" ContainerHostname = "docker_container_hostname" ContainerIPsWithScopes = "docker_container_ips_with_scopes" + ContainerState = "docker_container_state" NetworkRxDropped = "network_rx_dropped" NetworkRxBytes = "network_rx_bytes" @@ -49,6 +50,12 @@ const ( CPUTotalUsage = "cpu_total_usage" CPUUsageInKernelmode = "cpu_usage_in_kernelmode" CPUSystemCPUUsage = "cpu_system_cpu_usage" + + StateRunning = "running" + StateStopped = "stopped" + StatePaused = "paused" + + stopTimeout = 10 ) // Exported for testing @@ -69,6 +76,8 @@ type ClientConn interface { // Container represents a Docker container type Container interface { + UpdateState(*docker.Container) + ID() string Image() string PID() int @@ -88,7 +97,15 @@ type container struct { // NewContainer creates a new Container func NewContainer(c *docker.Container) Container { - return &container{container: c} + return &container{ + container: c, + } +} + +func (c *container) UpdateState(container *docker.Container) { + c.Lock() + defer c.Unlock() + c.container = container } func (c *container) ID() string { @@ -231,6 +248,15 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node { ipsWithScopes = append(ipsWithScopes, report.MakeScopedAddressNodeID(hostID, ip)) } + var state string + if c.container.State.Paused { + state = StatePaused + } else if c.container.State.Running { + state = StateRunning + } else { + state = StateStopped + } + result := report.MakeNodeWith(map[string]string{ ContainerID: c.ID(), ContainerName: strings.TrimPrefix(c.container.Name, "/"), @@ -238,11 +264,21 @@ func (c *container) GetNode(hostID string, localAddrs []net.IP) report.Node { ContainerCommand: c.container.Path + " " + strings.Join(c.container.Args, " "), ImageID: c.container.Image, ContainerHostname: c.Hostname(), + ContainerState: state, }).WithSets(report.Sets{ ContainerPorts: c.ports(localAddrs), ContainerIPs: report.MakeStringSet(ips...), ContainerIPsWithScopes: report.MakeStringSet(ipsWithScopes...), }) + + if c.container.State.Paused { + result = result.WithControls(UnpauseContainer) + } else if c.container.State.Running { + result = result.WithControls(RestartContainer, StopContainer, PauseContainer) + } else { + result = result.WithControls(StartContainer) + } + AddLabels(result, c.container.Config.Labels) if c.latestStats == nil { diff --git a/probe/docker/container_linux_test.go b/probe/docker/container_test.go similarity index 94% rename from probe/docker/container_linux_test.go rename to probe/docker/container_test.go index 3a73cceaad..fa353543fe 100644 --- a/probe/docker/container_linux_test.go +++ b/probe/docker/container_test.go @@ -74,11 +74,12 @@ func TestContainer(t *testing.T) { "docker_label_foo1": "bar1", "docker_label_foo2": "bar2", "memory_usage": "12345", + "docker_container_state": "running", }).WithSets(report.Sets{ "docker_container_ports": report.MakeStringSet("1.2.3.4:80->80/tcp", "81/tcp"), "docker_container_ips": report.MakeStringSet("1.2.3.4"), "docker_container_ips_with_scopes": report.MakeStringSet("scope;1.2.3.4"), - }) + }).WithControls(docker.RestartContainer, docker.StopContainer, docker.PauseContainer) test.Poll(t, 100*time.Millisecond, want, func() interface{} { node := c.GetNode("scope", []net.IP{}) for k, v := range node.Metadata { @@ -93,7 +94,7 @@ func TestContainer(t *testing.T) { t.Errorf("%s != baz", c.Image()) } if c.PID() != 1 { - t.Errorf("%s != 1", c.PID()) + t.Errorf("%d != 1", c.PID()) } if have := docker.ExtractContainerIPs(c.GetNode("", []net.IP{})); !reflect.DeepEqual(have, []string{"1.2.3.4"}) { t.Errorf("%v != %v", have, []string{"1.2.3.4"}) diff --git a/probe/docker/controls.go b/probe/docker/controls.go new file mode 100644 index 0000000000..0db14c3180 --- /dev/null +++ b/probe/docker/controls.go @@ -0,0 +1,83 @@ +package docker + +import ( + "log" + + "github.com/weaveworks/scope/probe/controls" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" +) + +// Control IDs used by the docker intergation. +const ( + StopContainer = "docker_stop_container" + StartContainer = "docker_start_container" + RestartContainer = "docker_restart_container" + PauseContainer = "docker_pause_container" + UnpauseContainer = "docker_unpause_container" + + waitTime = 10 +) + +func (r *registry) stopContainer(req xfer.Request) xfer.Response { + log.Printf("Stopping container %s", req.NodeID) + + _, containerID, ok := report.ParseContainerNodeID(req.NodeID) + if !ok { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + + return xfer.ResponseError(r.client.StopContainer(containerID, waitTime)) +} + +func (r *registry) startContainer(req xfer.Request) xfer.Response { + log.Printf("Starting container %s", req.NodeID) + + _, containerID, ok := report.ParseContainerNodeID(req.NodeID) + if !ok { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + + return xfer.ResponseError(r.client.StartContainer(containerID, nil)) +} + +func (r *registry) restartContainer(req xfer.Request) xfer.Response { + log.Printf("Restarting container %s", req.NodeID) + + _, containerID, ok := report.ParseContainerNodeID(req.NodeID) + if !ok { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + + return xfer.ResponseError(r.client.RestartContainer(containerID, waitTime)) +} + +func (r *registry) pauseContainer(req xfer.Request) xfer.Response { + log.Printf("Pausing container %s", req.NodeID) + + _, containerID, ok := report.ParseContainerNodeID(req.NodeID) + if !ok { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + + return xfer.ResponseError(r.client.PauseContainer(containerID)) +} + +func (r *registry) unpauseContainer(req xfer.Request) xfer.Response { + log.Printf("Unpausing container %s", req.NodeID) + + _, containerID, ok := report.ParseContainerNodeID(req.NodeID) + if !ok { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + + return xfer.ResponseError(r.client.UnpauseContainer(containerID)) +} + +func (r *registry) registerControls() { + controls.Register(StopContainer, r.stopContainer) + controls.Register(StartContainer, r.startContainer) + controls.Register(RestartContainer, r.restartContainer) + controls.Register(PauseContainer, r.pauseContainer) + controls.Register(UnpauseContainer, r.unpauseContainer) +} diff --git a/probe/docker/controls_test.go b/probe/docker/controls_test.go new file mode 100644 index 0000000000..0a6da47676 --- /dev/null +++ b/probe/docker/controls_test.go @@ -0,0 +1,38 @@ +package docker_test + +import ( + "reflect" + "testing" + "time" + + "github.com/weaveworks/scope/probe/controls" + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" +) + +func TestControls(t *testing.T) { + mdc := newMockClient() + setupStubs(mdc, func() { + registry, _ := docker.NewRegistry(10 * time.Second) + defer registry.Stop() + + for _, tc := range []struct{ command, result string }{ + {docker.StopContainer, "stopped"}, + {docker.StartContainer, "started"}, + {docker.RestartContainer, "restarted"}, + {docker.PauseContainer, "paused"}, + {docker.UnpauseContainer, "unpaused"}, + } { + result := controls.HandleControlRequest(xfer.Request{ + Control: tc.command, + NodeID: report.MakeContainerNodeID("", "a1b2c3d4e5"), + }) + if !reflect.DeepEqual(result, xfer.Response{ + Error: tc.result, + }) { + t.Error(result) + } + } + }) +} diff --git a/probe/docker/registry.go b/probe/docker/registry.go index 0fad353a0b..3c21bd4e76 100644 --- a/probe/docker/registry.go +++ b/probe/docker/registry.go @@ -10,9 +10,13 @@ import ( // Consts exported for testing. const ( - StartEvent = "start" - DieEvent = "die" - endpoint = "unix:///var/run/docker.sock" + CreateEvent = "create" + DestroyEvent = "destroy" + StartEvent = "start" + DieEvent = "die" + PauseEvent = "pause" + UnpauseEvent = "unpause" + endpoint = "unix:///var/run/docker.sock" ) // Vars exported for testing. @@ -47,6 +51,11 @@ type Client interface { ListImages(docker_client.ListImagesOptions) ([]docker_client.APIImages, error) AddEventListener(chan<- *docker_client.APIEvents) error RemoveEventListener(chan *docker_client.APIEvents) error + StopContainer(string, uint) error + StartContainer(string, *docker_client.HostConfig) error + RestartContainer(string, uint) error + PauseContainer(string) error + UnpauseContainer(string) error } func newDockerClient(endpoint string) (Client, error) { @@ -70,6 +79,7 @@ func NewRegistry(interval time.Duration) (Registry, error) { quit: make(chan chan struct{}), } + r.registerControls() go r.loop() return r, nil } @@ -170,9 +180,7 @@ func (r *registry) updateContainers() error { } for _, apiContainer := range apiContainers { - if err := r.addContainer(apiContainer.ID); err != nil { - return err - } + r.updateContainerState(apiContainer.ID) } return nil @@ -197,56 +205,54 @@ func (r *registry) updateImages() error { func (r *registry) handleEvent(event *docker_client.APIEvents) { switch event.Status { - case DieEvent: - containerID := event.ID - r.removeContainer(containerID) - - case StartEvent: - containerID := event.ID - if err := r.addContainer(containerID); err != nil { - log.Printf("docker registry: %s", err) - } + case CreateEvent, StartEvent, DieEvent, DestroyEvent, PauseEvent, UnpauseEvent: + r.updateContainerState(event.ID) } } -func (r *registry) addContainer(containerID string) error { +func (r *registry) updateContainerState(containerID string) { + r.Lock() + defer r.Unlock() + dockerContainer, err := r.client.InspectContainer(containerID) if err != nil { // Don't spam the logs if the container was short lived - if _, ok := err.(*docker_client.NoSuchContainer); ok { - return nil + if _, ok := err.(*docker_client.NoSuchContainer); !ok { + log.Printf("Error processing event for container %s: %v", containerID, err) + return } - return err - } - - if !dockerContainer.State.Running { - // We get events late, and the containers sometimes have already - // stopped. Not an error, so don't return it. - return nil - } - - r.Lock() - defer r.Unlock() - - c := NewContainerStub(dockerContainer) - r.containers[containerID] = c - r.containersByPID[dockerContainer.State.Pid] = c - return c.StartGatheringStats() -} + // Container doesn't exist anymore, so lets stop and remove it + container, ok := r.containers[containerID] + if !ok { + return + } -func (r *registry) removeContainer(containerID string) { - r.Lock() - defer r.Unlock() + delete(r.containers, containerID) + delete(r.containersByPID, container.PID()) + container.StopGatheringStats() + return + } - container, ok := r.containers[containerID] + // Container exists, ensure we have it + c, ok := r.containers[containerID] if !ok { - return + c = NewContainerStub(dockerContainer) + r.containers[containerID] = c + r.containersByPID[dockerContainer.State.Pid] = c + } else { + c.UpdateState(dockerContainer) } - delete(r.containers, containerID) - delete(r.containersByPID, container.PID()) - container.StopGatheringStats() + // And finally, ensure we gather stats for it + if dockerContainer.State.Running { + if err := c.StartGatheringStats(); err != nil { + log.Printf("Error gather stats for container: %s", containerID) + return + } + } else { + c.StopGatheringStats() + } } // LockedPIDLookup runs f under a read lock, and gives f a function for @@ -272,6 +278,13 @@ func (r *registry) WalkContainers(f func(Container)) { } } +func (r *registry) getContainer(id string) (Container, bool) { + r.RLock() + defer r.RUnlock() + c, ok := r.containers[id] + return c, ok +} + // WalkImages runs f on every image of running containers the registry // knows of. f may be run on the same image more than once. func (r *registry) WalkImages(f func(*docker_client.APIImages)) { diff --git a/probe/docker/registry_test.go b/probe/docker/registry_test.go index b5ac445082..6361298fd7 100644 --- a/probe/docker/registry_test.go +++ b/probe/docker/registry_test.go @@ -1,6 +1,7 @@ package docker_test import ( + "fmt" "net" "runtime" "sort" @@ -19,6 +20,8 @@ type mockContainer struct { c *client.Container } +func (c *mockContainer) UpdateState(_ *client.Container) {} + func (c *mockContainer) ID() string { return c.c.ID } @@ -66,7 +69,11 @@ func (m *mockDockerClient) ListContainers(client.ListContainersOptions) ([]clien func (m *mockDockerClient) InspectContainer(id string) (*client.Container, error) { m.RLock() defer m.RUnlock() - return m.containers[id], nil + c, ok := m.containers[id] + if !ok { + return nil, &client.NoSuchContainer{} + } + return c, nil } func (m *mockDockerClient) ListImages(client.ListImagesOptions) ([]client.APIImages, error) { @@ -93,6 +100,26 @@ func (m *mockDockerClient) RemoveEventListener(events chan *client.APIEvents) er return nil } +func (m *mockDockerClient) StartContainer(_ string, _ *client.HostConfig) error { + return fmt.Errorf("started") +} + +func (m *mockDockerClient) StopContainer(_ string, _ uint) error { + return fmt.Errorf("stopped") +} + +func (m *mockDockerClient) RestartContainer(_ string, _ uint) error { + return fmt.Errorf("restarted") +} + +func (m *mockDockerClient) PauseContainer(_ string) error { + return fmt.Errorf("paused") +} + +func (m *mockDockerClient) UnpauseContainer(_ string) error { + return fmt.Errorf("unpaused") +} + func (m *mockDockerClient) send(event *client.APIEvents) { m.RLock() defer m.RUnlock() @@ -259,7 +286,7 @@ func TestRegistryEvents(t *testing.T) { mdc.apiContainers = []client.APIContainers{apiContainer1} delete(mdc.containers, "wiff") mdc.Unlock() - mdc.send(&client.APIEvents{Status: docker.DieEvent, ID: "wiff"}) + mdc.send(&client.APIEvents{Status: docker.DestroyEvent, ID: "wiff"}) runtime.Gosched() want := []docker.Container{&mockContainer{container1}} diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index 01b6fbd7e7..8f7561efc0 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -43,6 +43,31 @@ func (r *Reporter) Report() (report.Report, error) { func (r *Reporter) containerTopology(localAddrs []net.IP) report.Topology { result := report.MakeTopology() + result.Controls.AddControl(report.Control{ + ID: StopContainer, + Human: "Stop", + Icon: "fa-stop", + }) + result.Controls.AddControl(report.Control{ + ID: StartContainer, + Human: "Start", + Icon: "fa-play", + }) + result.Controls.AddControl(report.Control{ + ID: RestartContainer, + Human: "Restart", + Icon: "fa-repeat", + }) + result.Controls.AddControl(report.Control{ + ID: PauseContainer, + Human: "Pause", + Icon: "fa-pause", + }) + result.Controls.AddControl(report.Control{ + ID: UnpauseContainer, + Human: "Unpause", + Icon: "fa-play", + }) r.registry.WalkContainers(func(c Container) { nodeID := report.MakeContainerNodeID(r.hostID, c.ID()) diff --git a/probe/docker/reporter_test.go b/probe/docker/reporter_test.go index 5e765c5563..cbabba81bb 100644 --- a/probe/docker/reporter_test.go +++ b/probe/docker/reporter_test.go @@ -57,6 +57,33 @@ func TestReporter(t *testing.T) { docker.ImageID: "baz", }), }, + Controls: report.Controls{ + docker.RestartContainer: report.Control{ + ID: docker.RestartContainer, + Human: "Restart", + Icon: "fa-repeat", + }, + docker.StartContainer: report.Control{ + ID: docker.StartContainer, + Human: "Start", + Icon: "fa-play", + }, + docker.StopContainer: report.Control{ + ID: docker.StopContainer, + Human: "Stop", + Icon: "fa-stop", + }, + docker.PauseContainer: report.Control{ + ID: docker.PauseContainer, + Human: "Pause", + Icon: "fa-pause", + }, + docker.UnpauseContainer: report.Control{ + ID: docker.UnpauseContainer, + Human: "Unpause", + Icon: "fa-play", + }, + }, } want.ContainerImage = report.Topology{ Nodes: report.Nodes{ @@ -65,6 +92,7 @@ func TestReporter(t *testing.T) { docker.ImageName: "bang", }), }, + Controls: report.Controls{}, } reporter := docker.NewReporter(mockRegistryInstance, "") diff --git a/probe/host/tagger.go b/probe/host/tagger.go index a7992bad16..271556d646 100644 --- a/probe/host/tagger.go +++ b/probe/host/tagger.go @@ -7,17 +7,26 @@ import ( // Tagger tags each node in each topology of a report with the origin host // node ID of this (probe) host. Effectively, a foreign key linking every node // in every topology to an origin host node in the host topology. -type Tagger struct{ hostNodeID string } +type Tagger struct { + hostNodeID string + probeID string +} // NewTagger tags each node with a foreign key linking it to its origin host // in the host topology. -func NewTagger(hostID string) Tagger { - return Tagger{hostNodeID: report.MakeHostNodeID(hostID)} +func NewTagger(hostID, probeID string) Tagger { + return Tagger{ + hostNodeID: report.MakeHostNodeID(hostID), + probeID: probeID, + } } // Tag implements Tagger. func (t Tagger) Tag(r report.Report) (report.Report, error) { - other := report.MakeNodeWith(map[string]string{report.HostNodeID: t.hostNodeID}) + other := report.MakeNodeWith(map[string]string{ + report.HostNodeID: t.hostNodeID, + report.ProbeID: t.probeID, + }) // Explicity don't tag Endpoints and Addresses - These topologies include pseudo nodes, // and as such do their own host tagging diff --git a/probe/host/tagger_test.go b/probe/host/tagger_test.go index 99f4b78771..0a736f4ce6 100644 --- a/probe/host/tagger_test.go +++ b/probe/host/tagger_test.go @@ -12,6 +12,7 @@ import ( func TestTagger(t *testing.T) { var ( hostID = "foo" + probeID = "a1b2c3d4" endpointNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "56789") // hostID ignored nodeMetadata = report.MakeNodeWith(map[string]string{"foo": "bar"}) ) @@ -20,8 +21,9 @@ func TestTagger(t *testing.T) { r.Process.AddNode(endpointNodeID, nodeMetadata) want := nodeMetadata.Merge(report.MakeNodeWith(map[string]string{ report.HostNodeID: report.MakeHostNodeID(hostID), + report.ProbeID: probeID, })) - rpt, _ := host.NewTagger(hostID).Tag(r) + rpt, _ := host.NewTagger(hostID, probeID).Tag(r) have := rpt.Process.Nodes[endpointNodeID].Copy() if !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index 5f319bbb93..36b61f59f3 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -111,36 +111,27 @@ func TestReporter(t *testing.T) { want := report.MakeReport() pod1ID := report.MakePodNodeID("ping", "pong-a") pod2ID := report.MakePodNodeID("ping", "pong-b") - want.Pod = report.Topology{ - Nodes: report.Nodes{ - pod1ID: report.MakeNodeWith(map[string]string{ - kubernetes.PodID: "ping/pong-a", - kubernetes.PodName: "pong-a", - kubernetes.Namespace: "ping", - kubernetes.PodCreated: pod1.Created(), - kubernetes.PodContainerIDs: "container1 container2", - kubernetes.ServiceIDs: "ping/pongservice", - }), - pod2ID: report.MakeNodeWith(map[string]string{ - kubernetes.PodID: "ping/pong-b", - kubernetes.PodName: "pong-b", - kubernetes.Namespace: "ping", - kubernetes.PodCreated: pod1.Created(), - kubernetes.PodContainerIDs: "container3 container4", - kubernetes.ServiceIDs: "ping/pongservice", - }), - }, - } - want.Service = report.Topology{ - Nodes: report.Nodes{ - report.MakeServiceNodeID("ping", "pongservice"): report.MakeNodeWith(map[string]string{ - kubernetes.ServiceID: "ping/pongservice", - kubernetes.ServiceName: "pongservice", - kubernetes.Namespace: "ping", - kubernetes.ServiceCreated: pod1.Created(), - }), - }, - } + want.Pod = report.MakeTopology().AddNode(pod1ID, report.MakeNodeWith(map[string]string{ + kubernetes.PodID: "ping/pong-a", + kubernetes.PodName: "pong-a", + kubernetes.Namespace: "ping", + kubernetes.PodCreated: pod1.Created(), + kubernetes.PodContainerIDs: "container1 container2", + kubernetes.ServiceIDs: "ping/pongservice", + })).AddNode(pod2ID, report.MakeNodeWith(map[string]string{ + kubernetes.PodID: "ping/pong-b", + kubernetes.PodName: "pong-b", + kubernetes.Namespace: "ping", + kubernetes.PodCreated: pod1.Created(), + kubernetes.PodContainerIDs: "container3 container4", + kubernetes.ServiceIDs: "ping/pongservice", + })) + want.Service = report.MakeTopology().AddNode(report.MakeServiceNodeID("ping", "pongservice"), report.MakeNodeWith(map[string]string{ + kubernetes.ServiceID: "ping/pongservice", + kubernetes.ServiceName: "pongservice", + kubernetes.Namespace: "ping", + kubernetes.ServiceCreated: pod1.Created(), + })) reporter := kubernetes.NewReporter(mockClientInstance) have, _ := reporter.Report() diff --git a/probe/main.go b/probe/main.go index 2d92f2cd57..f39b968ed2 100644 --- a/probe/main.go +++ b/probe/main.go @@ -4,16 +4,19 @@ import ( "flag" "fmt" "log" + "math/rand" "net" "net/http" _ "net/http/pprof" "os" "os/signal" + "strconv" "strings" "sync" "syscall" "time" + "github.com/weaveworks/scope/probe/controls" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/host" @@ -66,10 +69,11 @@ func main() { log.Printf("warning: -process=true, but that requires root to find everything") } + rand.Seed(time.Now().UnixNano()) + probeID := strconv.FormatInt(rand.Int63(), 16) var ( hostName = hostname() hostID = hostName // TODO(pb): we should sanitize the hostname - probeID = hostName // TODO(pb): does this need to be a random string instead? ) log.Printf("probe starting, version %s, ID %s", version, probeID) @@ -101,7 +105,14 @@ func main() { publishers := xfer.NewMultiPublisher(factory) defer publishers.Stop() - resolver := newStaticResolver(targets, publishers.Set) + clients := xfer.NewMultiAppClient(xfer.ProbeConfig{ + Token: *token, + ProbeID: probeID, + Insecure: *insecure, + }, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient) + defer clients.Stop() + + resolver := newStaticResolver(targets, publishers.Set, clients.Set) defer resolver.Stop() endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack) @@ -112,7 +123,7 @@ func main() { var ( tickers = []Ticker{processCache} reporters = []Reporter{endpointReporter, host.NewReporter(hostID, hostName, localNets), process.NewReporter(processCache, hostID)} - taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} + taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID, probeID)} ) dockerTagger, dockerReporter, dockerRegistry := func() (*docker.Tagger, *docker.Reporter, docker.Registry) { diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index ac9a489d76..cd73f4252e 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -33,14 +33,13 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { if err != nil { t.Fatal(err) } - if want, have := (report.Topology{ - Nodes: report.Nodes{ - report.MakeOverlayNodeID(mockWeavePeerName): report.MakeNodeWith(map[string]string{ - overlay.WeavePeerName: mockWeavePeerName, - overlay.WeavePeerNickName: mockWeavePeerNickName, - }), - }, - }), have.Overlay; !reflect.DeepEqual(want, have) { + if want, have := report.MakeTopology().AddNode( + report.MakeOverlayNodeID(mockWeavePeerName), + report.MakeNodeWith(map[string]string{ + overlay.WeavePeerName: mockWeavePeerName, + overlay.WeavePeerNickName: mockWeavePeerNickName, + }), + ), have.Overlay; !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } @@ -48,27 +47,19 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { { nodeID := report.MakeContainerNodeID(mockHostID, mockContainerID) want := report.Report{ - Container: report.Topology{ - Nodes: report.Nodes{ - nodeID: report.MakeNodeWith(map[string]string{ - docker.ContainerID: mockContainerID, - overlay.WeaveDNSHostname: mockHostname, - overlay.WeaveMACAddress: mockContainerMAC, - }).WithSets(report.Sets{ - docker.ContainerIPs: report.MakeStringSet(mockContainerIP), - docker.ContainerIPsWithScopes: report.MakeStringSet(mockContainerIPWithScope), - }), - }, - }, + Container: report.MakeTopology().AddNode(nodeID, report.MakeNodeWith(map[string]string{ + docker.ContainerID: mockContainerID, + overlay.WeaveDNSHostname: mockHostname, + overlay.WeaveMACAddress: mockContainerMAC, + }).WithSets(report.Sets{ + docker.ContainerIPs: report.MakeStringSet(mockContainerIP), + docker.ContainerIPsWithScopes: report.MakeStringSet(mockContainerIPWithScope), + })), } have, err := w.Tag(report.Report{ - Container: report.Topology{ - Nodes: report.Nodes{ - nodeID: report.MakeNodeWith(map[string]string{ - docker.ContainerID: mockContainerID, - }), - }, - }, + Container: report.MakeTopology().AddNode(nodeID, report.MakeNodeWith(map[string]string{ + docker.ContainerID: mockContainerID, + })), }) if err != nil { t.Fatal(err) diff --git a/probe/process/reporter_test.go b/probe/process/reporter_test.go index d13b2611ff..6eb3a25f6e 100644 --- a/probe/process/reporter_test.go +++ b/probe/process/reporter_test.go @@ -33,40 +33,42 @@ func TestReporter(t *testing.T) { reporter := process.NewReporter(walker, "") want := report.MakeReport() - want.Process = report.Topology{ - Nodes: report.Nodes{ - report.MakeProcessNodeID("", "1"): report.MakeNodeWith(map[string]string{ - process.PID: "1", - process.Comm: "init", - process.Threads: "0", - }), - report.MakeProcessNodeID("", "2"): report.MakeNodeWith(map[string]string{ - process.PID: "2", - process.Comm: "bash", - process.PPID: "1", - process.Threads: "0", - }), - report.MakeProcessNodeID("", "3"): report.MakeNodeWith(map[string]string{ - process.PID: "3", - process.Comm: "apache", - process.PPID: "1", - process.Threads: "2", - }), - report.MakeProcessNodeID("", "4"): report.MakeNodeWith(map[string]string{ - process.PID: "4", - process.Comm: "ping", - process.PPID: "2", - process.Cmdline: "ping foo.bar.local", - process.Threads: "0", - }), - report.MakeProcessNodeID("", "5"): report.MakeNodeWith(map[string]string{ - process.PID: "5", - process.PPID: "1", - process.Cmdline: "tail -f /var/log/syslog", - process.Threads: "0", - }), - }, - } + want.Process = report.MakeTopology().AddNode( + report.MakeProcessNodeID("", "1"), report.MakeNodeWith(map[string]string{ + process.PID: "1", + process.Comm: "init", + process.Threads: "0", + }), + ).AddNode( + report.MakeProcessNodeID("", "2"), report.MakeNodeWith(map[string]string{ + process.PID: "2", + process.Comm: "bash", + process.PPID: "1", + process.Threads: "0", + }), + ).AddNode( + report.MakeProcessNodeID("", "3"), report.MakeNodeWith(map[string]string{ + process.PID: "3", + process.Comm: "apache", + process.PPID: "1", + process.Threads: "2", + }), + ).AddNode( + report.MakeProcessNodeID("", "4"), report.MakeNodeWith(map[string]string{ + process.PID: "4", + process.Comm: "ping", + process.PPID: "2", + process.Cmdline: "ping foo.bar.local", + process.Threads: "0", + }), + ).AddNode( + report.MakeProcessNodeID("", "5"), report.MakeNodeWith(map[string]string{ + process.PID: "5", + process.PPID: "1", + process.Cmdline: "tail -f /var/log/syslog", + process.Threads: "0", + }), + ) have, err := reporter.Report() if err != nil || !reflect.DeepEqual(want, have) { diff --git a/probe/resolver.go b/probe/resolver.go index 8e8b90b79d..1814b68b1c 100644 --- a/probe/resolver.go +++ b/probe/resolver.go @@ -19,8 +19,10 @@ var ( lookupIP = net.LookupIP ) +type setter func(string, []string) + type staticResolver struct { - set func(string, []string) + setters []setter targets []target quit chan struct{} } @@ -32,10 +34,10 @@ func (t target) String() string { return net.JoinHostPort(t.host, t.port) } // newStaticResolver periodically resolves the targets, and calls the set // function with all the resolved IPs. It explictiy supports targets which // resolve to multiple IPs. -func newStaticResolver(targets []string, set func(target string, endpoints []string)) staticResolver { +func newStaticResolver(targets []string, setters ...setter) staticResolver { r := staticResolver{ targets: prepare(targets), - set: set, + setters: setters, quit: make(chan struct{}), } go r.loop() @@ -80,7 +82,9 @@ func prepare(strs []string) []target { func (r staticResolver) resolve() { for t, endpoints := range resolveMany(r.targets) { - r.set(t.String(), endpoints) + for _, setter := range r.setters { + setter(t.String(), endpoints) + } } } diff --git a/probe/tag_report_test.go b/probe/tag_report_test.go index 0f65661fcb..812dc3176d 100644 --- a/probe/tag_report_test.go +++ b/probe/tag_report_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" ) func TestApply(t *testing.T) { @@ -41,6 +42,7 @@ func TestTagMissingID(t *testing.T) { rpt, _ := newTopologyTagger().Tag(r) have := rpt.Endpoint.Nodes[nodeID].Copy() if !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) t.Error("TopologyTagger erroneously tagged a missing node ID") } } diff --git a/render/detailed_node.go b/render/detailed_node.go index 82dd28267d..f9e8c1ba37 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -24,11 +24,12 @@ const ( // DetailedNode is the data type that's yielded to the JavaScript layer when // we want deep information about an individual node. type DetailedNode struct { - ID string `json:"id"` - LabelMajor string `json:"label_major"` - LabelMinor string `json:"label_minor,omitempty"` - Pseudo bool `json:"pseudo,omitempty"` - Tables []Table `json:"tables"` + ID string `json:"id"` + LabelMajor string `json:"label_major"` + LabelMinor string `json:"label_minor,omitempty"` + Pseudo bool `json:"pseudo,omitempty"` + Tables []Table `json:"tables"` + Controls []ControlInstance `json:"controls"` } // Table is a dataset associated with a node. It will be displayed in the @@ -48,6 +49,14 @@ type Row struct { Expandable bool `json:"expandable,omitempty"` // Whether it can be expanded (hidden by default) } +// ControlInstance contains a control description, and all the info +// needed to execute it. +type ControlInstance struct { + ProbeID string `json:"probeId"` + NodeID string `json:"nodeId"` + report.Control +} + type sortableRows []Row func (r sortableRows) Len() int { return len(r) } @@ -107,6 +116,7 @@ func MakeDetailedNode(r report.Report, n RenderableNode) DetailedNode { LabelMinor: n.LabelMinor, Pseudo: n.Pseudo, Tables: tables, + Controls: controls(r, n), } } @@ -190,22 +200,55 @@ func connectionsTable(connections []Row, r report.Report, n RenderableNode) (Tab return Table{}, false } +func controlsFor(topology report.Topology, nodeID string) []ControlInstance { + result := []ControlInstance{} + node, ok := topology.Nodes[nodeID] + if !ok { + return result + } + + for _, id := range node.Controls { + if control, ok := topology.Controls[id]; ok { + result = append(result, ControlInstance{ + ProbeID: node.Metadata[report.ProbeID], + NodeID: nodeID, + Control: control, + }) + } + } + return result +} + +func controls(r report.Report, n RenderableNode) []ControlInstance { + if _, ok := r.Process.Nodes[n.ControlNode]; ok { + return controlsFor(r.Process, n.ControlNode) + } else if _, ok := r.Container.Nodes[n.ControlNode]; ok { + return controlsFor(r.Container, n.ControlNode) + } else if _, ok := r.ContainerImage.Nodes[n.ControlNode]; ok { + return controlsFor(r.ContainerImage, n.ControlNode) + } else if _, ok := r.Host.Nodes[n.ControlNode]; ok { + return controlsFor(r.Host, n.ControlNode) + } + return []ControlInstance{} +} + // OriginTable produces a table (to be consumed directly by the UI) based on // an origin ID, which is (optimistically) a node ID in one of our topologies. func OriginTable(r report.Report, originID string, addHostTags bool, addContainerTags bool) (Table, bool) { + result, show := Table{}, false if nmd, ok := r.Process.Nodes[originID]; ok { - return processOriginTable(nmd, addHostTags, addContainerTags) + result, show = processOriginTable(nmd, addHostTags, addContainerTags) } if nmd, ok := r.Container.Nodes[originID]; ok { - return containerOriginTable(nmd, addHostTags) + result, show = containerOriginTable(nmd, addHostTags) } if nmd, ok := r.ContainerImage.Nodes[originID]; ok { - return containerImageOriginTable(nmd) + result, show = containerImageOriginTable(nmd) } if nmd, ok := r.Host.Nodes[originID]; ok { - return hostOriginTable(nmd) + result, show = hostOriginTable(nmd) } - return Table{}, false + return result, show } func connectionDetailsRows(topology report.Topology, originID string) []Row { @@ -303,6 +346,7 @@ func processOriginTable(nmd report.Node, addHostTag bool, addContainerTag bool) func containerOriginTable(nmd report.Node, addHostTag bool) (Table, bool) { rows := []Row{} for _, tuple := range []struct{ key, human string }{ + {docker.ContainerState, "State"}, {docker.ContainerID, "ID"}, {docker.ImageID, "Image ID"}, {docker.ContainerPorts, "Ports"}, diff --git a/render/detailed_node_test.go b/render/detailed_node_test.go index d3be039c51..15173918b9 100644 --- a/render/detailed_node_test.go +++ b/render/detailed_node_test.go @@ -14,12 +14,13 @@ func TestOriginTable(t *testing.T) { if _, ok := render.OriginTable(fixture.Report, "not-found", false, false); ok { t.Errorf("unknown origin ID gave unexpected success") } - for originID, want := range map[string]render.Table{fixture.ServerProcessNodeID: { - Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID), - Numeric: false, - Rank: 2, - Rows: []render.Row{}, - }, + for originID, want := range map[string]render.Table{ + fixture.ServerProcessNodeID: { + Title: fmt.Sprintf(`Process "apache" (%s)`, fixture.ServerPID), + Numeric: false, + Rank: 2, + Rows: []render.Row{}, + }, fixture.ServerHostNodeID: { Title: fmt.Sprintf("Host %q", fixture.ServerHostName), Numeric: false, @@ -75,7 +76,6 @@ func TestOriginTable(t *testing.T) { t.Errorf("%q: %s", originID, test.Diff(want, have)) } } - } func TestMakeDetailedHostNode(t *testing.T) { @@ -86,6 +86,7 @@ func TestMakeDetailedHostNode(t *testing.T) { LabelMajor: "client", LabelMinor: "hostname.com", Pseudo: false, + Controls: []render.ControlInstance{}, Tables: []render.Table{ { Title: fmt.Sprintf("Host %q", fixture.ClientHostName), @@ -143,6 +144,7 @@ func TestMakeDetailedContainerNode(t *testing.T) { LabelMajor: "server", LabelMinor: fixture.ServerHostName, Pseudo: false, + Controls: []render.ControlInstance{}, Tables: []render.Table{ { Title: `Container Image "image/server"`, diff --git a/render/expected/expected.go b/render/expected/expected.go index f977fcf702..fd2df18708 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -213,6 +213,7 @@ var ( EgressPacketCount: newu64(30), EgressByteCount: newu64(300), }, + ControlNode: fixture.ClientContainerNodeID, }, fixture.ServerContainerID: { ID: fixture.ServerContainerID, @@ -232,6 +233,7 @@ var ( IngressPacketCount: newu64(210), IngressByteCount: newu64(2100), }, + ControlNode: fixture.ServerContainerNodeID, }, uncontainedServerID: { ID: uncontainedServerID, diff --git a/render/filters.go b/render/filters.go new file mode 100644 index 0000000000..22eb00d2a0 --- /dev/null +++ b/render/filters.go @@ -0,0 +1,190 @@ +package render + +import ( + "strings" + + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/probe/kubernetes" + "github.com/weaveworks/scope/report" +) + +// CustomRenderer allow for mapping functions that recived the entire topology +// in one call - useful for functions that need to consider the entire graph. +// We should minimise the use of this renderer type, as it is very inflexible. +type CustomRenderer struct { + RenderFunc func(RenderableNodes) RenderableNodes + Renderer +} + +// Render implements Renderer +func (c CustomRenderer) Render(rpt report.Report) RenderableNodes { + return c.RenderFunc(c.Renderer.Render(rpt)) +} + +// ColorConnected colors nodes with the IsConnected key if +// they have edges to or from them. +func ColorConnected(r Renderer) Renderer { + return CustomRenderer{ + Renderer: r, + RenderFunc: func(input RenderableNodes) RenderableNodes { + connected := map[string]struct{}{} + void := struct{}{} + + for id, node := range input { + if len(node.Adjacency) == 0 { + continue + } + + connected[id] = void + for _, id := range node.Adjacency { + connected[id] = void + } + } + + for id := range connected { + node := input[id] + node.Metadata[IsConnected] = "true" + input[id] = node + } + return input + }, + } +} + +// Filter removes nodes from a view based on a predicate. +type Filter struct { + Renderer + FilterFunc func(RenderableNode) bool +} + +// Render implements Renderer +func (f Filter) Render(rpt report.Report) RenderableNodes { + nodes, _ := f.render(rpt) + return nodes +} + +func (f Filter) render(rpt report.Report) (RenderableNodes, int) { + output := RenderableNodes{} + inDegrees := map[string]int{} + filtered := 0 + for id, node := range f.Renderer.Render(rpt) { + if f.FilterFunc(node) { + output[id] = node + inDegrees[id] = 0 + } else { + filtered++ + } + } + + // Deleted nodes also need to be cut as destinations in adjacency lists. + for id, node := range output { + newAdjacency := make(report.IDList, 0, len(node.Adjacency)) + for _, dstID := range node.Adjacency { + if _, ok := output[dstID]; ok { + newAdjacency = newAdjacency.Add(dstID) + inDegrees[dstID]++ + } + } + node.Adjacency = newAdjacency + output[id] = node + } + + // Remove unconnected pseudo nodes, see #483. + for id, inDegree := range inDegrees { + if inDegree > 0 { + continue + } + node := output[id] + if !node.Pseudo || len(node.Adjacency) > 0 { + continue + } + delete(output, id) + filtered++ + } + return output, filtered +} + +// Stats implements Renderer +func (f Filter) Stats(rpt report.Report) Stats { + _, filtered := f.render(rpt) + var upstream = f.Renderer.Stats(rpt) + upstream.FilteredNodes += filtered + return upstream +} + +// IsConnected is the key added to Node.Metadata by ColorConnected +// to indicate a node has an edge pointing to it or from it +const IsConnected = "is_connected" + +// FilterUnconnected produces a renderer that filters unconnected nodes +// from the given renderer +func FilterUnconnected(r Renderer) Renderer { + return Filter{ + Renderer: ColorConnected(r), + FilterFunc: func(node RenderableNode) bool { + _, ok := node.Metadata[IsConnected] + return ok + }, + } +} + +// FilterNoop does nothing. +func FilterNoop(in Renderer) Renderer { + return in +} + +// FilterStopped filters out stopped containers. +func FilterStopped(r Renderer) Renderer { + return Filter{ + Renderer: r, + FilterFunc: func(node RenderableNode) bool { + containerState := node.Metadata[docker.ContainerState] + return containerState != docker.StateStopped + }, + } +} + +// FilterSystem is a Renderer which filters out system nodes. +func FilterSystem(r Renderer) Renderer { + return Filter{ + Renderer: r, + FilterFunc: func(node RenderableNode) bool { + containerName := node.Metadata[docker.ContainerName] + if _, ok := systemContainerNames[containerName]; ok { + return false + } + imagePrefix := strings.SplitN(node.Metadata[docker.ImageName], ":", 2)[0] // :( + if _, ok := systemImagePrefixes[imagePrefix]; ok { + return false + } + if node.Metadata[docker.LabelPrefix+"works.weave.role"] == "system" { + return false + } + if node.Metadata[kubernetes.Namespace] == "kube-system" { + return false + } + if strings.HasPrefix(node.Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"], "kube-system/") { + return false + } + return true + }, + } +} + +var systemContainerNames = map[string]struct{}{ + "weavescope": {}, + "weavedns": {}, + "weave": {}, + "weaveproxy": {}, + "weaveexec": {}, + "ecs-agent": {}, +} + +var systemImagePrefixes = map[string]struct{}{ + "weaveworks/scope": {}, + "weaveworks/weavedns": {}, + "weaveworks/weave": {}, + "weaveworks/weaveproxy": {}, + "weaveworks/weaveexec": {}, + "amazon/amazon-ecs-agent": {}, +} diff --git a/render/mapping.go b/render/mapping.go index b381a3ca97..53a07d2c93 100644 --- a/render/mapping.go +++ b/render/mapping.go @@ -134,6 +134,7 @@ func MapContainerIdentity(m RenderableNode, _ report.Networks) RenderableNodes { ) node := NewRenderableNodeWith(id, major, minor, rank, m) + node.ControlNode = m.ID if imageID, ok := m.Metadata[docker.ImageID]; ok { hostID, _, _ := report.ParseContainerNodeID(m.ID) node.Origins = node.Origins.Add(report.MakeContainerNodeID(hostID, imageID)) diff --git a/render/render.go b/render/render.go index 79db7a73a3..8f1de1f02e 100644 --- a/render/render.go +++ b/render/render.go @@ -1,10 +1,6 @@ package render import ( - "strings" - - "github.com/weaveworks/scope/probe/docker" - "github.com/weaveworks/scope/probe/kubernetes" "github.com/weaveworks/scope/report" ) @@ -155,168 +151,3 @@ func (m Map) EdgeMetadata(rpt report.Report, srcRenderableID, dstRenderableID st } return output } - -// CustomRenderer allow for mapping functions that recived the entire topology -// in one call - useful for functions that need to consider the entire graph. -// We should minimise the use of this renderer type, as it is very inflexible. -type CustomRenderer struct { - RenderFunc func(RenderableNodes) RenderableNodes - Renderer -} - -// Render implements Renderer -func (c CustomRenderer) Render(rpt report.Report) RenderableNodes { - return c.RenderFunc(c.Renderer.Render(rpt)) -} - -// ColorConnected colors nodes with the IsConnected key if -// they have edges to or from them. -func ColorConnected(r Renderer) Renderer { - return CustomRenderer{ - Renderer: r, - RenderFunc: func(input RenderableNodes) RenderableNodes { - connected := map[string]struct{}{} - void := struct{}{} - - for id, node := range input { - if len(node.Adjacency) == 0 { - continue - } - - connected[id] = void - for _, id := range node.Adjacency { - connected[id] = void - } - } - - for id := range connected { - node := input[id] - node.Metadata[IsConnected] = "true" - input[id] = node - } - return input - }, - } -} - -// Filter removes nodes from a view based on a predicate. -type Filter struct { - Renderer - FilterFunc func(RenderableNode) bool -} - -// Render implements Renderer -func (f Filter) Render(rpt report.Report) RenderableNodes { - nodes, _ := f.render(rpt) - return nodes -} - -func (f Filter) render(rpt report.Report) (RenderableNodes, int) { - output := RenderableNodes{} - inDegrees := map[string]int{} - filtered := 0 - for id, node := range f.Renderer.Render(rpt) { - if f.FilterFunc(node) { - output[id] = node - inDegrees[id] = 0 - } else { - filtered++ - } - } - - // Deleted nodes also need to be cut as destinations in adjacency lists. - for id, node := range output { - newAdjacency := make(report.IDList, 0, len(node.Adjacency)) - for _, dstID := range node.Adjacency { - if _, ok := output[dstID]; ok { - newAdjacency = newAdjacency.Add(dstID) - inDegrees[dstID]++ - } - } - node.Adjacency = newAdjacency - output[id] = node - } - - // Remove unconnected pseudo nodes, see #483. - for id, inDegree := range inDegrees { - if inDegree > 0 { - continue - } - node := output[id] - if !node.Pseudo || len(node.Adjacency) > 0 { - continue - } - delete(output, id) - filtered++ - } - return output, filtered -} - -// Stats implements Renderer -func (f Filter) Stats(rpt report.Report) Stats { - _, filtered := f.render(rpt) - var upstream = f.Renderer.Stats(rpt) - upstream.FilteredNodes += filtered - return upstream -} - -// IsConnected is the key added to Node.Metadata by ColorConnected -// to indicate a node has an edge pointing to it or from it -const IsConnected = "is_connected" - -// FilterUnconnected produces a renderer that filters unconnected nodes -// from the given renderer -func FilterUnconnected(r Renderer) Renderer { - return Filter{ - Renderer: ColorConnected(r), - FilterFunc: func(node RenderableNode) bool { - _, ok := node.Metadata[IsConnected] - return ok - }, - } -} - -// FilterSystem is a Renderer which filters out system nodes. -func FilterSystem(r Renderer) Renderer { - return Filter{ - Renderer: r, - FilterFunc: func(node RenderableNode) bool { - containerName := node.Metadata[docker.ContainerName] - if _, ok := systemContainerNames[containerName]; ok { - return false - } - imagePrefix := strings.SplitN(node.Metadata[docker.ImageName], ":", 2)[0] // :( - if _, ok := systemImagePrefixes[imagePrefix]; ok { - return false - } - if node.Metadata[docker.LabelPrefix+"works.weave.role"] == "system" { - return false - } - if node.Metadata[kubernetes.Namespace] == "kube-system" { - return false - } - if strings.HasPrefix(node.Metadata[docker.LabelPrefix+"io.kubernetes.pod.name"], "kube-system/") { - return false - } - return true - }, - } -} - -var systemContainerNames = map[string]struct{}{ - "weavescope": {}, - "weavedns": {}, - "weave": {}, - "weaveproxy": {}, - "weaveexec": {}, - "ecs-agent": {}, -} - -var systemImagePrefixes = map[string]struct{}{ - "weaveworks/scope": {}, - "weaveworks/weavedns": {}, - "weaveworks/weave": {}, - "weaveworks/weaveproxy": {}, - "weaveworks/weaveexec": {}, - "amazon/amazon-ecs-agent": {}, -} diff --git a/render/renderable_node.go b/render/renderable_node.go index 7af08e81b6..adead7c047 100644 --- a/render/renderable_node.go +++ b/render/renderable_node.go @@ -8,12 +8,13 @@ import ( // an element of a topology. It should contain information that's relevant // to rendering a node when there are many nodes visible at once. type RenderableNode struct { - ID string `json:"id"` // - LabelMajor string `json:"label_major"` // e.g. "process", human-readable - LabelMinor string `json:"label_minor,omitempty"` // e.g. "hostname", human-readable, optional - Rank string `json:"rank"` // to help the layout engine - Pseudo bool `json:"pseudo,omitempty"` // sort-of a placeholder node, for rendering purposes - Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information + ID string `json:"id"` // + LabelMajor string `json:"label_major"` // e.g. "process", human-readable + LabelMinor string `json:"label_minor,omitempty"` // e.g. "hostname", human-readable, optional + Rank string `json:"rank"` // to help the layout engine + Pseudo bool `json:"pseudo,omitempty"` // sort-of a placeholder node, for rendering purposes + Origins report.IDList `json:"origins,omitempty"` // Core node IDs that contributed information + ControlNode string `json:"-"` // ID of node from which to show the controls in the UI report.EdgeMetadata `json:"metadata"` // Numeric sums report.Node @@ -58,6 +59,7 @@ func NewDerivedNode(id string, node RenderableNode) RenderableNode { Origins: node.Origins.Copy(), EdgeMetadata: node.EdgeMetadata.Copy(), Node: node.Node.Copy(), + ControlNode: "", // Do not propagate ControlNode when making a derived node! } } @@ -97,6 +99,10 @@ func (rn RenderableNode) Merge(other RenderableNode) RenderableNode { result.Rank = other.Rank } + if result.ControlNode == "" { + result.ControlNode = other.ControlNode + } + if result.Pseudo != other.Pseudo { panic(result.ID) } @@ -119,6 +125,7 @@ func (rn RenderableNode) Copy() RenderableNode { Origins: rn.Origins.Copy(), EdgeMetadata: rn.EdgeMetadata.Copy(), Node: rn.Node.Copy(), + ControlNode: rn.ControlNode, } } diff --git a/render/short_lived_connections_test.go b/render/short_lived_connections_test.go index 8f36c9507a..0b98b4f5fa 100644 --- a/render/short_lived_connections_test.go +++ b/render/short_lived_connections_test.go @@ -78,13 +78,14 @@ var ( Origins: report.MakeIDList(randomEndpointNodeID), }, containerID: { - ID: containerID, - LabelMajor: containerName, - LabelMinor: serverHostID, - Rank: "", - Pseudo: false, - Origins: report.MakeIDList(containerNodeID, serverEndpointNodeID, serverHostNodeID), - Node: report.MakeNode(), + ID: containerID, + LabelMajor: containerName, + LabelMinor: serverHostID, + Rank: "", + Pseudo: false, + Origins: report.MakeIDList(containerNodeID, serverEndpointNodeID, serverHostNodeID), + Node: report.MakeNode(), + ControlNode: containerNodeID, }, }).Prune() ) diff --git a/report/controls.go b/report/controls.go new file mode 100644 index 0000000000..679ec08a6d --- /dev/null +++ b/report/controls.go @@ -0,0 +1,34 @@ +package report + +// Controls describe the control tags within the Nodes +type Controls map[string]Control + +// A Control basically describes an RPC +type Control struct { + ID string `json:"id"` + Human string `json:"human"` + Icon string `json:"icon"` // from https://fortawesome.github.io/Font-Awesome/cheatsheet/ please +} + +// Merge merges other with cs, returning a fresh Controls. +func (cs Controls) Merge(other Controls) Controls { + result := cs.Copy() + for k, v := range other { + result[k] = v + } + return result +} + +// Copy produces a copy of cs. +func (cs Controls) Copy() Controls { + result := Controls{} + for k, v := range cs { + result[k] = v + } + return result +} + +// AddControl returns a fresh Controls, c added to cs. +func (cs Controls) AddControl(c Control) { + cs[c.ID] = c +} diff --git a/report/report.go b/report/report.go index 3d9b9a0e34..aca0ad2fde 100644 --- a/report/report.go +++ b/report/report.go @@ -181,4 +181,6 @@ const ( // a node in the host topology. That host node is the origin host, where // the node was originally detected. HostNodeID = "host_node_id" + // ProbeID is the random ID of the probe which generated the specific node. + ProbeID = "probe_id" ) diff --git a/report/topology.go b/report/topology.go index 617536f817..9cc6ab6025 100644 --- a/report/topology.go +++ b/report/topology.go @@ -12,12 +12,14 @@ import ( // in the Node struct. type Topology struct { Nodes // TODO(pb): remove Nodes intermediate type + Controls } // MakeTopology gives you a Topology. func MakeTopology() Topology { return Topology{ - Nodes: map[string]Node{}, + Nodes: map[string]Node{}, + Controls: Controls{}, } } @@ -37,7 +39,8 @@ func (t Topology) AddNode(nodeID string, nmd Node) Topology { // Copy returns a value copy of the Topology. func (t Topology) Copy() Topology { return Topology{ - Nodes: t.Nodes.Copy(), + Nodes: t.Nodes.Copy(), + Controls: t.Controls.Copy(), } } @@ -45,7 +48,8 @@ func (t Topology) Copy() Topology { // The original is not modified. func (t Topology) Merge(other Topology) Topology { return Topology{ - Nodes: t.Nodes.Merge(other.Nodes), + Nodes: t.Nodes.Merge(other.Nodes), + Controls: t.Controls.Merge(other.Controls), } } @@ -84,6 +88,7 @@ type Node struct { Sets Sets `json:"sets,omitempty"` Adjacency IDList `json:"adjacency"` Edges EdgeMetadatas `json:"edges,omitempty"` + Controls IDList `json:"controls,omitempty"` } // MakeNode creates a new Node with no initial metadata. @@ -94,6 +99,7 @@ func MakeNode() Node { Sets: Sets{}, Adjacency: MakeIDList(), Edges: EdgeMetadatas{}, + Controls: MakeIDList(), } } @@ -147,6 +153,13 @@ func (n Node) WithEdge(dst string, md EdgeMetadata) Node { return result } +// WithControls returns a fresh copy of n, with cs added to Controls. +func (n Node) WithControls(cs ...string) Node { + result := n.Copy() + result.Controls = result.Controls.Add(cs...) + return result +} + // Copy returns a value copy of the Node. func (n Node) Copy() Node { cp := MakeNode() @@ -155,6 +168,7 @@ func (n Node) Copy() Node { cp.Sets = n.Sets.Copy() cp.Adjacency = n.Adjacency.Copy() cp.Edges = n.Edges.Copy() + cp.Controls = n.Controls.Copy() return cp } @@ -167,6 +181,7 @@ func (n Node) Merge(other Node) Node { cp.Sets = cp.Sets.Merge(other.Sets) cp.Adjacency = cp.Adjacency.Merge(other.Adjacency) cp.Edges = cp.Edges.Merge(other.Edges) + cp.Controls = cp.Controls.Merge(other.Controls) return cp } diff --git a/xfer/app_client.go b/xfer/app_client.go new file mode 100644 index 0000000000..1b1fd579d8 --- /dev/null +++ b/xfer/app_client.go @@ -0,0 +1,151 @@ +package xfer + +import ( + "encoding/json" + "log" + "net/http" + "net/rpc" + "sync" + "time" + + "github.com/gorilla/websocket" + + "github.com/weaveworks/scope/common/sanitize" +) + +// Details are some generic details that can be fetched from /api +type Details struct { + ID string `json:"id"` + Version string `json:"version"` +} + +// AppClient is a client to an app for dealing with controls. +type AppClient interface { + Details() (Details, error) + ControlConnection(handler ControlHandler) + Stop() +} + +type appClient struct { + ProbeConfig + + quit chan struct{} + target string + insecure bool + client http.Client + + controlServerCodecMtx sync.Mutex + controlServerCodec rpc.ServerCodec +} + +// NewAppClient makes a new AppClient. +func NewAppClient(pc ProbeConfig, hostname, target string) (AppClient, error) { + httpTransport, err := pc.getHTTPTransport(hostname) + if err != nil { + return nil, err + } + + return &appClient{ + ProbeConfig: pc, + quit: make(chan struct{}), + target: target, + client: http.Client{ + Transport: httpTransport, + }, + }, nil +} + +// Stop stops the appClient. +func (c *appClient) Stop() { + c.controlServerCodecMtx.Lock() + defer c.controlServerCodecMtx.Unlock() + close(c.quit) + if c.controlServerCodec != nil { + c.controlServerCodec.Close() + } + c.client.Transport.(*http.Transport).CloseIdleConnections() +} + +// Details fetches the details (version, id) of the app. +func (c *appClient) Details() (Details, error) { + result := Details{} + req, err := c.ProbeConfig.authorizedRequest("GET", sanitize.URL("", 0, "/api")(c.target), nil) + if err != nil { + return result, err + } + resp, err := c.client.Do(req) + if err != nil { + return result, err + } + defer resp.Body.Close() + return result, json.NewDecoder(resp.Body).Decode(&result) +} + +func (c *appClient) controlConnection(handler ControlHandler) error { + dialer := websocket.Dialer{} + headers := http.Header{} + c.ProbeConfig.authorizeHeaders(headers) + // TODO(twilkie) need to update sanitize to work with wss + url := sanitize.URL("ws://", 0, "/api/control/ws")(c.target) + conn, _, err := dialer.Dial(url, headers) + if err != nil { + return err + } + defer func() { + log.Printf("Closing control connection to %s", c.target) + conn.Close() + }() + + codec := NewJSONWebsocketCodec(conn) + server := rpc.NewServer() + if err := server.RegisterName("control", handler); err != nil { + return err + } + + c.controlServerCodecMtx.Lock() + c.controlServerCodec = codec + // At this point we may have tried to quit earlier, so check to see if the + // quit channel has been closed, non-blocking. + select { + default: + case <-c.quit: + codec.Close() + return nil + } + c.controlServerCodecMtx.Unlock() + + server.ServeCodec(codec) + + c.controlServerCodecMtx.Lock() + c.controlServerCodec = nil + c.controlServerCodecMtx.Unlock() + return nil +} + +func (c *appClient) controlConnectionLoop(handler ControlHandler) { + defer log.Printf("Control connection to %s exiting", c.target) + backoff := initialBackoff + + for { + err := c.controlConnection(handler) + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error doing controls for %s, backing off %s: %v", c.target, backoff, err) + select { + case <-time.After(backoff): + case <-c.quit: + return + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +func (c *appClient) ControlConnection(handler ControlHandler) { + go c.controlConnectionLoop(handler) +} diff --git a/xfer/controls.go b/xfer/controls.go new file mode 100644 index 0000000000..d13a4453f5 --- /dev/null +++ b/xfer/controls.go @@ -0,0 +1,141 @@ +package xfer + +import ( + "fmt" + "net/rpc" + "sync" + + "github.com/gorilla/websocket" +) + +// Request is the UI -> App -> Probe message type for control RPCs +type Request struct { + ID int64 + NodeID string + Control string +} + +// Response is the Probe -> App -> UI message type for the control RPCs. +type Response struct { + ID int64 + Value interface{} + Error string +} + +// ControlHandler is interface used in the app and the probe to represent +// a control RPC. +type ControlHandler interface { + Handle(req Request, res *Response) error +} + +// ControlHandlerFunc is a adapter (ala golang's http RequestHandlerFunc) +// for ControlHandler +type ControlHandlerFunc func(Request) Response + +// Handle is an adapter method to make ControlHandlers exposable via golang rpc +func (c ControlHandlerFunc) Handle(req Request, res *Response) error { + *res = c(req) + return nil +} + +// ResponseErrorf creates a new Response with the given formatted error string. +func ResponseErrorf(format string, a ...interface{}) Response { + return Response{ + Error: fmt.Sprintf(format, a...), + } +} + +// ResponseError creates a new Response with the given error. +func ResponseError(err error) Response { + if err != nil { + return Response{ + Error: err.Error(), + } + } + return Response{} +} + +// JSONWebsocketCodec is golang rpc compatible Server and Client Codec +// that transmits and receives RPC messages over a websocker, as JSON. +type JSONWebsocketCodec struct { + sync.Mutex + conn *websocket.Conn + err chan struct{} +} + +// NewJSONWebsocketCodec makes a new JSONWebsocketCodec +func NewJSONWebsocketCodec(conn *websocket.Conn) *JSONWebsocketCodec { + return &JSONWebsocketCodec{ + conn: conn, + err: make(chan struct{}), + } +} + +// WaitForReadError blocks until any read on this codec returns an error. +// This is useful to know when the server has disconnected from the client. +func (j *JSONWebsocketCodec) WaitForReadError() { + <-j.err +} + +// WriteRequest implements rpc.ClientCodec +func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error { + j.Lock() + defer j.Unlock() + + if err := j.conn.WriteJSON(r); err != nil { + return err + } + return j.conn.WriteJSON(v) +} + +// ReadResponseHeader implements rpc.ClientCodec +func (j *JSONWebsocketCodec) ReadResponseHeader(r *rpc.Response) error { + err := j.conn.ReadJSON(r) + if err != nil { + close(j.err) + } + return err +} + +// ReadResponseBody implements rpc.ClientCodec +func (j *JSONWebsocketCodec) ReadResponseBody(v interface{}) error { + err := j.conn.ReadJSON(v) + if err != nil { + close(j.err) + } + return err +} + +// Close implements rpc.ClientCodec and rpc.ServerCodec +func (j *JSONWebsocketCodec) Close() error { + return j.conn.Close() +} + +// ReadRequestHeader implements rpc.ServerCodec +func (j *JSONWebsocketCodec) ReadRequestHeader(r *rpc.Request) error { + err := j.conn.ReadJSON(r) + if err != nil { + close(j.err) + } + return err +} + +// ReadRequestBody implements rpc.ServerCodec +func (j *JSONWebsocketCodec) ReadRequestBody(v interface{}) error { + err := j.conn.ReadJSON(v) + if err != nil { + close(j.err) + } + return err +} + +// WriteResponse implements rpc.ServerCodec +func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error { + j.Lock() + defer j.Unlock() + + if err := j.conn.WriteJSON(r); err != nil { + return err + } + return j.conn.WriteJSON(v) +} diff --git a/xfer/http_publisher.go b/xfer/http_publisher.go index c3da1a18b2..1a80ffe2e3 100644 --- a/xfer/http_publisher.go +++ b/xfer/http_publisher.go @@ -1,62 +1,39 @@ package xfer import ( - "crypto/tls" "encoding/json" "fmt" "io" - "net" "net/http" "time" - "github.com/certifi/gocertifi" - "github.com/weaveworks/scope/common/sanitize" ) // HTTPPublisher publishes buffers by POST to a fixed endpoint. type HTTPPublisher struct { - url string - token string - probeID string - client *http.Client -} + ProbeConfig -func getHTTPTransport(hostname string, insecure bool) (*http.Transport, error) { - if insecure { - return &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, nil - } - - host, _, err := net.SplitHostPort(hostname) - if err != nil { - return nil, err - } - - certPool, err := gocertifi.CACerts() - if err != nil { - return nil, err - } - return &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: certPool, - ServerName: host, - }, - }, nil + url string + client *http.Client } // NewHTTPPublisher returns an HTTPPublisher ready for use. func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (string, *HTTPPublisher, error) { - httpTransport, err := getHTTPTransport(hostname, insecure) + pc := ProbeConfig{ + Token: token, + ProbeID: probeID, + Insecure: insecure, + } + + httpTransport, err := pc.getHTTPTransport(hostname) if err != nil { return "", nil, err } p := &HTTPPublisher{ - url: sanitize.URL("", 0, "/api/report")(target), - token: token, - probeID: probeID, + ProbeConfig: pc, + url: sanitize.URL("", 0, "/api/report")(target), client: &http.Client{ Transport: httpTransport, }, @@ -66,7 +43,7 @@ func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (s Timeout: 5 * time.Second, Transport: httpTransport, } - req, err := p.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil) + req, err := pc.authorizedRequest("GET", sanitize.URL("", 0, "/api")(target), nil) if err != nil { return "", nil, err } @@ -84,22 +61,13 @@ func NewHTTPPublisher(hostname, target, token, probeID string, insecure bool) (s return apiResponse.ID, p, nil } -func (p HTTPPublisher) authorizedRequest(method string, urlStr string, body io.Reader) (*http.Request, error) { - req, err := http.NewRequest(method, urlStr, body) - if err == nil { - req.Header.Set("Authorization", AuthorizationHeader(p.token)) - req.Header.Set(ScopeProbeIDHeader, p.probeID) - } - return req, err -} - func (p HTTPPublisher) String() string { return p.url } // Publish publishes the report to the URL. func (p HTTPPublisher) Publish(r io.Reader) error { - req, err := p.authorizedRequest("POST", p.url, r) + req, err := p.ProbeConfig.authorizedRequest("POST", p.url, r) if err != nil { return err } @@ -125,15 +93,3 @@ func (p *HTTPPublisher) Stop() { // goroutines on the server (see #604) p.client.Transport.(*http.Transport).CloseIdleConnections() } - -// AuthorizationHeader returns a value suitable for an HTTP Authorization -// header, based on the passed token string. -func AuthorizationHeader(token string) string { - return fmt.Sprintf("Scope-Probe token=%s", token) -} - -// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The -// ID is currently set to the probe's hostname. It's designed to deduplicate -// reports from the same probe to the same receiver, in case the probe is -// configured to publish to multiple receivers that resolve to the same app. -const ScopeProbeIDHeader = "X-Scope-Probe-ID" diff --git a/xfer/http_publisher_test.go b/xfer/http_publisher_test.go index 5f3d123d71..9a1bc320f7 100644 --- a/xfer/http_publisher_test.go +++ b/xfer/http_publisher_test.go @@ -4,6 +4,7 @@ import ( "compress/gzip" "encoding/gob" "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -27,7 +28,7 @@ func TestHTTPPublisher(t *testing.T) { ) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have { + if want, have := fmt.Sprintf("Scope-Probe token=%s", token), r.Header.Get("Authorization"); want != have { t.Errorf("want %q, have %q", want, have) } diff --git a/xfer/multi_client.go b/xfer/multi_client.go new file mode 100644 index 0000000000..f3eb13195b --- /dev/null +++ b/xfer/multi_client.go @@ -0,0 +1,118 @@ +package xfer + +import ( + "log" + "sync" + + "github.com/weaveworks/scope/report" +) + +// ClientFactory is a thing thats makes AppClients +type ClientFactory func(ProbeConfig, string, string) (AppClient, error) + +type multiClient struct { + ProbeConfig + + clientFactory ClientFactory + handler ControlHandler + + mtx sync.Mutex + sema semaphore + clients map[string]AppClient // holds map from app id -> client + ids map[string]report.IDList // holds map from hostname -> app ids + quit chan struct{} +} + +type clientTuple struct { + Details + AppClient +} + +// MultiAppClient maintains a set of upstream apps, and ensures we have an +// AppClient for each one. +type MultiAppClient interface { + Set(hostname string, endpoints []string) + Stop() +} + +// NewMultiAppClient creates a new MultiAppClient. +func NewMultiAppClient(pc ProbeConfig, handler ControlHandler, clientFactory ClientFactory) MultiAppClient { + return &multiClient{ + ProbeConfig: pc, + clientFactory: clientFactory, + handler: handler, + + sema: newSemaphore(maxConcurrentGET), + clients: map[string]AppClient{}, + ids: map[string]report.IDList{}, + quit: make(chan struct{}), + } +} + +// Set the list of endpoints for the given hostname. +func (c *multiClient) Set(hostname string, endpoints []string) { + wg := sync.WaitGroup{} + wg.Add(len(endpoints)) + clients := make(chan clientTuple, len(endpoints)) + for _, endpoint := range endpoints { + go func(endpoint string) { + c.sema.acquire() + defer c.sema.release() + + client, err := c.clientFactory(c.ProbeConfig, hostname, endpoint) + if err != nil { + log.Printf("Error creating new app client: %v", err) + return + } + + details, err := client.Details() + if err != nil { + log.Printf("Error fetching app details: %v", err) + } + + clients <- clientTuple{details, client} + wg.Done() + }(endpoint) + } + + wg.Wait() + close(clients) + c.mtx.Lock() + defer c.mtx.Unlock() + + // Start any new apps, and replace the list of app ids for this hostname + hostIDs := report.MakeIDList() + for tuple := range clients { + hostIDs = hostIDs.Add(tuple.ID) + + _, ok := c.clients[tuple.ID] + if !ok { + c.clients[tuple.ID] = tuple.AppClient + tuple.AppClient.ControlConnection(c.handler) + } + } + c.ids[hostname] = hostIDs + + // Remove apps that are no longer referenced (by id) from any hostname + allReferencedIDs := report.MakeIDList() + for _, ids := range c.ids { + allReferencedIDs = allReferencedIDs.Add(ids...) + } + for id, client := range c.clients { + if !allReferencedIDs.Contains(id) { + client.Stop() + delete(c.clients, id) + } + } +} + +// Stop the MultiAppClient. +func (c *multiClient) Stop() { + c.mtx.Lock() + defer c.mtx.Unlock() + for _, c := range c.clients { + c.Stop() + } + c.clients = map[string]AppClient{} + close(c.quit) +} diff --git a/xfer/multi_client_test.go b/xfer/multi_client_test.go new file mode 100644 index 0000000000..936143595b --- /dev/null +++ b/xfer/multi_client_test.go @@ -0,0 +1,78 @@ +package xfer_test + +import ( + "runtime" + "testing" + + "github.com/weaveworks/scope/xfer" +) + +type mockClient struct { + id string + count int + stopped int +} + +func (c *mockClient) Details() (xfer.Details, error) { + return xfer.Details{ID: c.id}, nil +} + +func (c *mockClient) ControlConnection(handler xfer.ControlHandler) { + c.count++ +} + +func (c *mockClient) Stop() { + c.stopped++ +} + +func TestMultiClient(t *testing.T) { + var ( + a1 = &mockClient{id: "1"} // hostname a, app id 1 + a2 = &mockClient{id: "2"} // hostname a, app id 2 + b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate) + b3 = &mockClient{id: "3"} // hostname b, app id 3 + factory = func(_ xfer.ProbeConfig, hostname, target string) (xfer.AppClient, error) { + switch target { + case "a1": + return a1, nil + case "a2": + return a2, nil + case "b2": + return b2, nil + case "b3": + return b3, nil + } + t.Fatal(target) + return a1, nil + } + controlHandler = xfer.ControlHandlerFunc(func(_ xfer.Request) xfer.Response { + return xfer.Response{} + }) + expect = func(i, j int) { + if i != j { + _, file, line, _ := runtime.Caller(1) + t.Fatalf("%s:%d: %d != %d", file, line, i, j) + } + } + ) + + mp := xfer.NewMultiAppClient(xfer.ProbeConfig{}, controlHandler, factory) + defer mp.Stop() + + // Add two hostnames with overlapping apps, check we don't add the same app twice + mp.Set("a", []string{"a1", "a2"}) + mp.Set("b", []string{"b2", "b3"}) + expect(a1.count, 1) + expect(a2.count+b2.count, 1) + expect(b3.count, 1) + + // Now drop the overlap, check we don't remove the app + mp.Set("b", []string{"b3"}) + expect(a1.count, 1) + expect(a2.count+b2.count, 1) + expect(b3.count, 1) + + // Now check we remove apps + mp.Set("b", []string{}) + expect(b3.stopped, 1) +} diff --git a/xfer/probe_config.go b/xfer/probe_config.go new file mode 100644 index 0000000000..071e832c25 --- /dev/null +++ b/xfer/probe_config.go @@ -0,0 +1,59 @@ +package xfer + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + + "github.com/certifi/gocertifi" +) + +// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The +// ID is currently set to the a random string on probe startup. +const ScopeProbeIDHeader = "X-Scope-Probe-ID" + +// ProbeConfig contains all the info needed for a probe to do HTTP requests +type ProbeConfig struct { + Token string + ProbeID string + Insecure bool +} + +func (pc ProbeConfig) authorizeHeaders(headers http.Header) { + headers.Set("Authorization", fmt.Sprintf("Scope-Probe token=%s", pc.Token)) + headers.Set(ScopeProbeIDHeader, pc.ProbeID) +} + +func (pc ProbeConfig) authorizedRequest(method string, urlStr string, body io.Reader) (*http.Request, error) { + req, err := http.NewRequest(method, urlStr, body) + if err == nil { + pc.authorizeHeaders(req.Header) + } + return req, err +} + +func (pc ProbeConfig) getHTTPTransport(hostname string) (*http.Transport, error) { + if pc.Insecure { + return &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, nil + } + + host, _, err := net.SplitHostPort(hostname) + if err != nil { + return nil, err + } + + certPool, err := gocertifi.CACerts() + if err != nil { + return nil, err + } + return &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: certPool, + ServerName: host, + }, + }, nil +}