From 5f7f74bf1be62ac9a812e46d4e3bd09d898202c1 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 22 Feb 2016 11:36:07 +0000 Subject: [PATCH] Refactor app for multitenancy - Add interfaces to allow for alternative implementations for Collector, ControlRouter and PipeRouter. - Pass contexts on http handlers to these interfaces. Although not used by the current (local, in-memory) implementations, the idea is this will be used to pass headers to implementations which support multitenancy (by, for instance, putting an authenticating reverse proxy in form of the app, and then inspecting the headers of the request for a used id). --- app/api_report.go | 9 +- app/api_topologies.go | 21 +++-- app/api_topology.go | 26 +++--- app/collector.go | 17 ++-- app/collector_test.go | 18 ++-- app/control_router.go | 69 ++++++++++++++ app/controls.go | 158 ++++++++++++-------------------- app/controls_test.go | 2 +- app/mock_reporter_test.go | 10 +- app/pipe_router.go | 180 ++++++++++++++++++++++++++++++++++++ app/pipes.go | 181 ++++--------------------------------- app/pipes_internal_test.go | 10 +- app/router.go | 36 +++++--- app/router_test.go | 4 +- prog/app.go | 4 +- 15 files changed, 422 insertions(+), 323 deletions(-) create mode 100644 app/control_router.go create mode 100644 app/pipe_router.go diff --git a/app/api_report.go b/app/api_report.go index f96cdc0308..0aa046f043 100644 --- a/app/api_report.go +++ b/app/api_report.go @@ -2,12 +2,13 @@ package app import ( "net/http" + + "golang.org/x/net/context" ) // Raw report handler -func makeRawReportHandler(rep Reporter) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - // r.ParseForm() - respondWith(w, http.StatusOK, rep.Report()) +func makeRawReportHandler(rep Reporter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + respondWith(w, http.StatusOK, rep.Report(ctx)) } } diff --git a/app/api_topologies.go b/app/api_topologies.go index 37f4f20eb7..4d2e424e8b 100644 --- a/app/api_topologies.go +++ b/app/api_topologies.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/gorilla/mux" + "golang.org/x/net/context" "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/report" @@ -192,9 +193,9 @@ func (r *registry) walk(f func(APITopologyDesc)) { } // makeTopologyList returns a handler that yields an APITopologyList. -func (r *registry) makeTopologyList(rep Reporter) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, req *http.Request) { - topologies := r.renderTopologies(rep.Report(), req) +func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, req *http.Request) { + topologies := r.renderTopologies(rep.Report(ctx), req) respondWith(w, http.StatusOK, topologies) } } @@ -252,27 +253,27 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) render.Render return renderer } -type reportRenderHandler func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) +type reportRenderHandler func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) -func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { +func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, req *http.Request) { topology, ok := r.get(mux.Vars(req)["topology"]) if !ok { http.NotFound(w, req) return } renderer := renderedForRequest(req, topology) - f(rep, renderer, w, req) + f(ctx, rep, renderer, w, req) } } -func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { +func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, req *http.Request) { topology, ok := r.get(topologyID) if !ok { http.NotFound(w, req) return } - f(rep, topology.renderer, w, req) + f(ctx, rep, topology.renderer, w, req) } } diff --git a/app/api_topology.go b/app/api_topology.go index 65aba207b4..855e671fe3 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -6,6 +6,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/gorilla/websocket" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/render" @@ -28,14 +29,14 @@ type APINode struct { } // Full topology. -func handleTopology(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { +func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { respondWith(w, http.StatusOK, APITopology{ - Nodes: renderer.Render(rep.Report()).Prune(), + Nodes: renderer.Render(rep.Report(ctx)).Prune(), }) } // Websocket for the full topology. This route overlaps with the next. -func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { +func handleWs(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { respondWith(w, http.StatusInternalServerError, err.Error()) return @@ -48,15 +49,15 @@ func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r * return } } - handleWebsocket(w, r, rep, renderer, loop) + handleWebsocket(ctx, w, r, rep, renderer, loop) } // Individual nodes. -func handleNode(nodeID string) func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) { - return func(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { +func handleNode(nodeID string) func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) { + return func(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) { var ( - rpt = rep.Report() - node, ok = renderer.Render(rep.Report())[nodeID] + rpt = rep.Report(ctx) + node, ok = renderer.Render(rep.Report(ctx))[nodeID] ) if !ok { http.NotFound(w, r) @@ -71,6 +72,7 @@ var upgrader = websocket.Upgrader{ } func handleWebsocket( + ctx context.Context, w http.ResponseWriter, r *http.Request, rep Reporter, @@ -88,6 +90,7 @@ func handleWebsocket( go func(c *websocket.Conn) { for { // just discard everything the browser sends if _, _, err := c.NextReader(); err != nil { + log.Println("err:", err) close(quit) break } @@ -99,15 +102,16 @@ func handleWebsocket( tick = time.Tick(loop) wait = make(chan struct{}, 1) ) - rep.WaitOn(wait) - defer rep.UnWait(wait) + rep.WaitOn(ctx, wait) + defer rep.UnWait(ctx, wait) for { - newTopo := renderer.Render(rep.Report()).Prune() + newTopo := renderer.Render(rep.Report(ctx)).Prune() diff := render.TopoDiff(previousTopo, newTopo) previousTopo = newTopo if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil { + log.Println("err:", err) return } diff --git a/app/collector.go b/app/collector.go index fff99788c0..6c2f72730b 100644 --- a/app/collector.go +++ b/app/collector.go @@ -6,6 +6,7 @@ import ( "time" "github.com/spaolacci/murmur3" + "golang.org/x/net/context" "github.com/weaveworks/scope/report" ) @@ -13,15 +14,15 @@ import ( // Reporter is something that can produce reports on demand. It's a convenient // interface for parts of the app, and several experimental components. type Reporter interface { - Report() report.Report - WaitOn(chan struct{}) - UnWait(chan struct{}) + Report(context.Context) report.Report + WaitOn(context.Context, chan struct{}) + UnWait(context.Context, chan struct{}) } // Adder is something that can accept reports. It's a convenient interface for // parts of the app, and several experimental components. type Adder interface { - Add(report.Report) + Add(context.Context, report.Report) } // A Collector is a Reporter and an Adder @@ -45,13 +46,13 @@ type waitableCondition struct { waiters map[chan struct{}]struct{} } -func (wc *waitableCondition) WaitOn(waiter chan struct{}) { +func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) { wc.Lock() wc.waiters[waiter] = struct{}{} wc.Unlock() } -func (wc *waitableCondition) UnWait(waiter chan struct{}) { +func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) { wc.Lock() delete(wc.waiters, waiter) wc.Unlock() @@ -82,7 +83,7 @@ func NewCollector(window time.Duration) Collector { var now = time.Now // Add adds a report to the collector's internal state. It implements Adder. -func (c *collector) Add(rpt report.Report) { +func (c *collector) Add(_ context.Context, rpt report.Report) { c.mtx.Lock() defer c.mtx.Unlock() c.reports = append(c.reports, timestampReport{now(), rpt}) @@ -95,7 +96,7 @@ func (c *collector) Add(rpt report.Report) { // Report returns a merged report over all added reports. It implements // Reporter. -func (c *collector) Report() report.Report { +func (c *collector) Report(_ context.Context) report.Report { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/app/collector_test.go b/app/collector_test.go index 672e4f4334..4e8400cdf2 100644 --- a/app/collector_test.go +++ b/app/collector_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" @@ -11,6 +13,7 @@ import ( ) func TestCollector(t *testing.T) { + ctx := context.Background() window := time.Millisecond c := app.NewCollector(window) @@ -20,32 +23,33 @@ func TestCollector(t *testing.T) { r2 := report.MakeReport() r2.Endpoint.AddNode("bar", report.MakeNode()) - if want, have := report.MakeReport(), c.Report(); !reflect.DeepEqual(want, have) { + if want, have := report.MakeReport(), c.Report(ctx); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } - c.Add(r1) - if want, have := r1, c.Report(); !reflect.DeepEqual(want, have) { + c.Add(ctx, r1) + if want, have := r1, c.Report(ctx); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } - c.Add(r2) + c.Add(ctx, r2) merged := report.MakeReport() merged = merged.Merge(r1) merged = merged.Merge(r2) - if want, have := merged, c.Report(); !reflect.DeepEqual(want, have) { + if want, have := merged, c.Report(ctx); !reflect.DeepEqual(want, have) { t.Error(test.Diff(want, have)) } } func TestCollectorWait(t *testing.T) { + ctx := context.Background() window := time.Millisecond c := app.NewCollector(window) waiter := make(chan struct{}, 1) - c.WaitOn(waiter) - defer c.UnWait(waiter) + c.WaitOn(ctx, waiter) + defer c.UnWait(ctx, waiter) c.(interface { Broadcast() }).Broadcast() diff --git a/app/control_router.go b/app/control_router.go new file mode 100644 index 0000000000..dd28febec6 --- /dev/null +++ b/app/control_router.go @@ -0,0 +1,69 @@ +package app + +import ( + "fmt" + "math/rand" + "sync" + + "golang.org/x/net/context" + + "github.com/weaveworks/scope/common/xfer" +) + +// ControlRouter is a thing that can route control requests and responses +// between the UI and a probe. +type ControlRouter interface { + Handle(ctx context.Context, probeID string, req xfer.Request) (xfer.Response, error) + Register(ctx context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) + Deregister(ctx context.Context, probeID string, id int64) error +} + +// NewLocalControlRouter creates a new ControlRouter that does everything +// locally, in memory. +func NewLocalControlRouter() ControlRouter { + return &localControlRouter{ + probes: map[string]probe{}, + } +} + +type localControlRouter struct { + sync.Mutex + probes map[string]probe +} + +type probe struct { + id int64 + handler xfer.ControlHandlerFunc +} + +func (l *localControlRouter) Handle(_ context.Context, probeID string, req xfer.Request) (xfer.Response, error) { + l.Lock() + probe, ok := l.probes[probeID] + l.Unlock() + if !ok { + return xfer.Response{}, fmt.Errorf("Probe %s is not connected right now...", probeID) + } + return probe.handler(req), nil +} + +func (l *localControlRouter) Register(_ context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) { + l.Lock() + defer l.Unlock() + id := rand.Int63() + l.probes[probeID] = probe{ + id: id, + handler: handler, + } + return id, nil +} + +func (l *localControlRouter) Deregister(_ context.Context, probeID string, id int64) error { + l.Lock() + defer l.Unlock() + // NB probe might have reconnected in the mean time, need to ensure we do not + // delete new connection! Also, it might have connected then deleted itself! + if l.probes[probeID].id == id { + delete(l.probes, probeID) + } + return nil +} diff --git a/app/controls.go b/app/controls.go index 0f5c38f6b1..791d922c76 100644 --- a/app/controls.go +++ b/app/controls.go @@ -1,122 +1,84 @@ package app import ( - "math/rand" "net/http" "net/rpc" - "sync" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/xfer" ) // RegisterControlRoutes registers the various control routes with a http mux. -func RegisterControlRoutes(router *mux.Router) { - controlRouter := &controlRouter{ - probes: map[string]controlHandler{}, - } - router.Methods("GET").Path("/api/control/ws").HandlerFunc(controlRouter.handleProbeWS) - router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).HandlerFunc(controlRouter.handleControl) -} - -type controlHandler struct { - id int64 - client *rpc.Client -} - -type controlRouter struct { - sync.Mutex - probes map[string]controlHandler -} - -func (ch *controlHandler) handle(req xfer.Request) xfer.Response { - var res xfer.Response - if err := ch.client.Call("control.Handle", req, &res); err != nil { - return xfer.ResponseError(err) - } - return res -} - -func (cr *controlRouter) get(probeID string) (controlHandler, bool) { - cr.Lock() - defer cr.Unlock() - handler, ok := cr.probes[probeID] - return handler, ok -} - -func (cr *controlRouter) set(probeID string, handler controlHandler) { - cr.Lock() - defer cr.Unlock() - cr.probes[probeID] = handler -} - -func (cr *controlRouter) rm(probeID string, handler controlHandler) { - cr.Lock() - defer cr.Unlock() - // NB probe might have reconnected in the mean time, need to ensure we do not - // delete new connection! Also, it might have connected then deleted itself! - if cr.probes[probeID].id == handler.id { - delete(cr.probes, probeID) - } +func RegisterControlRoutes(router *mux.Router, cr ControlRouter) { + router.Methods("GET").Path("/api/control/ws"). + HandlerFunc(requestContextDecorator(handleProbeWS(cr))) + router.Methods("POST").MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")). + HandlerFunc(requestContextDecorator(handleControl(cr))) } // handleControl routes control requests from the client to the appropriate // probe. Its is blocking. -func (cr *controlRouter) handleControl(w http.ResponseWriter, r *http.Request) { - var ( - vars = mux.Vars(r) - probeID = vars["probeID"] - nodeID = vars["nodeID"] - control = vars["control"] - ) - handler, ok := cr.get(probeID) - if !ok { - log.Errorf("Probe %s is not connected right now...", probeID) - http.NotFound(w, r) - return +func handleControl(cr ControlRouter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + probeID = vars["probeID"] + nodeID = vars["nodeID"] + control = vars["control"] + ) + result, err := cr.Handle(ctx, probeID, xfer.Request{ + AppID: UniqueID, + NodeID: nodeID, + Control: control, + }) + if err != nil { + respondWith(w, http.StatusBadRequest, err.Error()) + return + } + if result.Error != "" { + respondWith(w, http.StatusBadRequest, result.Error) + return + } + respondWith(w, http.StatusOK, result) } - - result := handler.handle(xfer.Request{ - AppID: UniqueID, - NodeID: nodeID, - Control: control, - }) - if result.Error != "" { - respondWith(w, http.StatusBadRequest, result.Error) - return - } - respondWith(w, http.StatusOK, result) } // handleProbeWS accepts websocket connections from the probe and registers // them in the control router, such that HandleControl calls can find them. -func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) { - probeID := r.Header.Get(xfer.ScopeProbeIDHeader) - if probeID == "" { - respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) - return - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Errorf("Error upgrading to websocket: %v", err) - return - } - defer conn.Close() - - codec := xfer.NewJSONWebsocketCodec(conn) - client := rpc.NewClientWithCodec(codec) - handler := controlHandler{ - id: rand.Int63(), - client: client, +func handleProbeWS(cr ControlRouter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + probeID := r.Header.Get(xfer.ScopeProbeIDHeader) + if probeID == "" { + respondWith(w, http.StatusBadRequest, xfer.ScopeProbeIDHeader) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Error upgrading control websocket: %v", err) + return + } + defer conn.Close() + + codec := xfer.NewJSONWebsocketCodec(conn) + client := rpc.NewClientWithCodec(codec) + defer client.Close() + + id, err := cr.Register(ctx, probeID, func(req xfer.Request) xfer.Response { + var res xfer.Response + if err := client.Call("control.Handle", req, &res); err != nil { + return xfer.ResponseError(err) + } + return res + }) + if err != nil { + respondWith(w, http.StatusBadRequest, err.Error()) + return + } + defer cr.Deregister(ctx, probeID, id) + codec.WaitForReadError() } - - cr.set(probeID, handler) - - codec.WaitForReadError() - - cr.rm(probeID, handler) - client.Close() } diff --git a/app/controls_test.go b/app/controls_test.go index ff01023e85..2e1a7c5209 100644 --- a/app/controls_test.go +++ b/app/controls_test.go @@ -18,7 +18,7 @@ import ( func TestControl(t *testing.T) { router := mux.NewRouter() - app.RegisterControlRoutes(router) + app.RegisterControlRoutes(router, app.NewLocalControlRouter()) server := httptest.NewServer(router) defer server.Close() diff --git a/app/mock_reporter_test.go b/app/mock_reporter_test.go index 625791dec7..b5efe5e5b6 100644 --- a/app/mock_reporter_test.go +++ b/app/mock_reporter_test.go @@ -3,12 +3,14 @@ package app_test import ( "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test/fixture" + + "golang.org/x/net/context" ) // StaticReport is used as a fixture in tests. It emulates an xfer.Collector. type StaticReport struct{} -func (s StaticReport) Report() report.Report { return fixture.Report } -func (s StaticReport) Add(report.Report) {} -func (s StaticReport) WaitOn(chan struct{}) {} -func (s StaticReport) UnWait(chan struct{}) {} +func (s StaticReport) Report(context.Context) report.Report { return fixture.Report } +func (s StaticReport) Add(context.Context, report.Report) {} +func (s StaticReport) WaitOn(context.Context, chan struct{}) {} +func (s StaticReport) UnWait(context.Context, chan struct{}) {} diff --git a/app/pipe_router.go b/app/pipe_router.go new file mode 100644 index 0000000000..5bdfdf9c48 --- /dev/null +++ b/app/pipe_router.go @@ -0,0 +1,180 @@ +package app + +import ( + "io" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "golang.org/x/net/context" + + "github.com/weaveworks/scope/common/mtime" + "github.com/weaveworks/scope/common/xfer" +) + +const ( + gcInterval = 30 * time.Second // we check all the pipes every 30s + pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute + gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten +) + +// End is an enum for either end of the pipe. +type End int + +// Valid values of type End +const ( + UIEnd = iota + ProbeEnd +) + +// PipeRouter stores pipes and allows you to connect to either end of them. +type PipeRouter interface { + Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, bool) + Release(context.Context, string, End) + Delete(context.Context, string) + Stop() +} + +// PipeRouter connects incoming and outgoing pipes. +type localPipeRouter struct { + sync.Mutex + wait sync.WaitGroup + quit chan struct{} + pipes map[string]*pipe +} + +// for each end of the pipe, we keep a reference count & lastUsedTIme, +// such that we can timeout pipes when either end is inactive. +type pipe struct { + xfer.Pipe + + tombstoneTime time.Time + + ui, probe end +} + +type end struct { + refCount int + lastUsedTime time.Time +} + +func (p *pipe) end(end End) (*end, io.ReadWriter) { + ui, probe := p.Ends() + if end == UIEnd { + return &p.ui, ui + } + return &p.probe, probe +} + +// NewLocalPipeRouter returns a new local (in-memory) pipe router. +func NewLocalPipeRouter() PipeRouter { + pipeRouter := &localPipeRouter{ + quit: make(chan struct{}), + pipes: map[string]*pipe{}, + } + pipeRouter.wait.Add(1) + go pipeRouter.gcLoop() + return pipeRouter +} + +func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, bool) { + pr.Lock() + defer pr.Unlock() + p, ok := pr.pipes[id] + if !ok { + log.Infof("Creating pipe id %s", id) + p = &pipe{ + ui: end{lastUsedTime: mtime.Now()}, + probe: end{lastUsedTime: mtime.Now()}, + Pipe: xfer.NewPipe(), + } + pr.pipes[id] = p + } + if p.Closed() { + return nil, nil, false + } + end, endIO := p.end(e) + end.refCount++ + return p, endIO, true +} + +func (pr *localPipeRouter) Release(_ context.Context, id string, e End) { + pr.Lock() + defer pr.Unlock() + + p, ok := pr.pipes[id] + if !ok { + // uh oh + return + } + + end, _ := p.end(e) + end.refCount-- + if end.refCount > 0 { + return + } + + if !p.Closed() { + end.lastUsedTime = mtime.Now() + } +} + +func (pr *localPipeRouter) Delete(_ context.Context, id string) { + pr.Lock() + defer pr.Unlock() + p, ok := pr.pipes[id] + if !ok { + return + } + p.Close() + p.tombstoneTime = mtime.Now() +} + +func (pr *localPipeRouter) Stop() { + close(pr.quit) + pr.wait.Wait() +} + +func (pr *localPipeRouter) gcLoop() { + defer pr.wait.Done() + ticker := time.Tick(gcInterval) + for { + select { + case <-pr.quit: + return + case <-ticker: + } + + pr.timeout() + pr.garbageCollect() + } +} + +func (pr *localPipeRouter) timeout() { + pr.Lock() + defer pr.Unlock() + now := mtime.Now() + for id, pipe := range pr.pipes { + if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) { + continue + } + + if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) || + (pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) { + log.Infof("Timing out pipe %s", id) + pipe.Close() + pipe.tombstoneTime = now + } + } +} + +func (pr *localPipeRouter) garbageCollect() { + pr.Lock() + defer pr.Unlock() + now := mtime.Now() + for pipeID, pipe := range pr.pipes { + if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout { + delete(pr.pipes, pipeID) + } + } +} diff --git a/app/pipes.go b/app/pipes.go index bed35309b7..9f5cb71cc5 100644 --- a/app/pipes.go +++ b/app/pipes.go @@ -1,199 +1,54 @@ package app import ( - "io" "net/http" - "sync" - "time" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" - - "github.com/weaveworks/scope/common/mtime" - "github.com/weaveworks/scope/common/xfer" -) - -const ( - gcInterval = 30 * time.Second // we check all the pipes every 30s - pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute - gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten + "golang.org/x/net/context" ) -// PipeRouter connects incoming and outgoing pipes. -type PipeRouter struct { - sync.Mutex - wait sync.WaitGroup - quit chan struct{} - pipes map[string]*pipe -} - -// for each end of the pipe, we keep a reference count & lastUsedTIme, -// such that we can timeout pipes when either end is inactive. -type end struct { - refCount int - lastUsedTime time.Time -} - -type pipe struct { - ui, probe end - tombstoneTime time.Time - - xfer.Pipe -} - // RegisterPipeRoutes registers the pipe routes -func RegisterPipeRoutes(router *mux.Router) *PipeRouter { - pipeRouter := &PipeRouter{ - quit: make(chan struct{}), - pipes: map[string]*pipe{}, - } - pipeRouter.wait.Add(1) - go pipeRouter.gcLoop() +func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) { router.Methods("GET"). Path("/api/pipe/{pipeID}"). - HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) { - uiEnd, _ := p.Ends() - return &p.ui, uiEnd - })) + HandlerFunc(requestContextDecorator(handlePipeWs(pr, UIEnd))) + router.Methods("GET"). Path("/api/pipe/{pipeID}/probe"). - HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) { - _, probeEnd := p.Ends() - return &p.probe, probeEnd - })) + HandlerFunc(requestContextDecorator(handlePipeWs(pr, ProbeEnd))) + router.Methods("DELETE", "POST"). Path("/api/pipe/{pipeID}"). - HandlerFunc(pipeRouter.delete) - return pipeRouter + HandlerFunc(requestContextDecorator(deletePipe(pr))) } -// Stop stops the pipeRouter -func (pr *PipeRouter) Stop() { - close(pr.quit) - pr.wait.Wait() -} - -func (pr *PipeRouter) gcLoop() { - defer pr.wait.Done() - ticker := time.Tick(gcInterval) - for { - select { - case <-pr.quit: - return - case <-ticker: - } - - pr.timeout() - pr.garbageCollect() - } -} - -func (pr *PipeRouter) timeout() { - pr.Lock() - defer pr.Unlock() - now := mtime.Now() - for id, pipe := range pr.pipes { - if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) { - continue - } - - if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) || - (pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) { - log.Infof("Timing out pipe %s", id) - pipe.Close() - pipe.tombstoneTime = now - } - } -} - -func (pr *PipeRouter) garbageCollect() { - pr.Lock() - defer pr.Unlock() - now := mtime.Now() - for pipeID, pipe := range pr.pipes { - if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout { - delete(pr.pipes, pipeID) - } - } -} - -func (pr *PipeRouter) getOrCreate(id string) (*pipe, bool) { - pr.Lock() - defer pr.Unlock() - p, ok := pr.pipes[id] - if !ok { - log.Infof("Creating pipe id %s", id) - p = &pipe{ - ui: end{lastUsedTime: mtime.Now()}, - probe: end{lastUsedTime: mtime.Now()}, - Pipe: xfer.NewPipe(), - } - pr.pipes[id] = p - } - if p.Closed() { - return nil, false - } - return p, true -} - -func (pr *PipeRouter) retain(id string, pipe *pipe, end *end) bool { - pr.Lock() - defer pr.Unlock() - if pipe.Closed() { - return false - } - end.refCount++ - return true -} - -func (pr *PipeRouter) release(id string, pipe *pipe, end *end) { - pr.Lock() - defer pr.Unlock() - - end.refCount-- - if end.refCount != 0 { - return - } - - if !pipe.Closed() { - end.lastUsedTime = mtime.Now() - } -} - -func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - pipeID := mux.Vars(r)["pipeID"] - pipe, ok := pr.getOrCreate(pipeID) +func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["pipeID"] + pipe, endIO, ok := pr.Get(ctx, id, end) if !ok { http.NotFound(w, r) return } - - endRef, endIO := endSelector(pipe) - if !pr.retain(pipeID, pipe, endRef) { - http.NotFound(w, r) - return - } - defer pr.release(pipeID, pipe, endRef) + defer pr.Release(ctx, id, end) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - log.Errorf("Error upgrading to websocket: %v", err) + log.Errorf("Error upgrading pipe %s (%d) websocket: %v", id, end, err) return } defer conn.Close() + log.Infof("Pipe success %s (%d)", id, end) pipe.CopyToWebsocket(endIO, conn) } } -func (pr *PipeRouter) delete(w http.ResponseWriter, r *http.Request) { - pipeID := mux.Vars(r)["pipeID"] - pipe, ok := pr.getOrCreate(pipeID) - if ok && pr.retain(pipeID, pipe, &pipe.ui) { +func deletePipe(pr PipeRouter) CtxHandlerFunc { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + pipeID := mux.Vars(r)["pipeID"] log.Infof("Closing pipe %s", pipeID) - pipe.Close() - pipe.tombstoneTime = mtime.Now() - pr.release(pipeID, pipe, &pipe.ui) + pr.Delete(ctx, pipeID) } } diff --git a/app/pipes_internal_test.go b/app/pipes_internal_test.go index 69615c5ac8..1a6643933e 100644 --- a/app/pipes_internal_test.go +++ b/app/pipes_internal_test.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/mtime" "github.com/weaveworks/scope/common/xfer" @@ -22,7 +23,8 @@ import ( func TestPipeTimeout(t *testing.T) { router := mux.NewRouter() - pr := RegisterPipeRoutes(router) + pr := NewLocalPipeRouter().(*localPipeRouter) + RegisterPipeRoutes(router, pr) pr.Stop() // we don't want the loop running in the background mtime.NowForce(time.Now()) @@ -30,7 +32,8 @@ func TestPipeTimeout(t *testing.T) { // create a new pipe. id := "foo" - pipe, ok := pr.getOrCreate(id) + ctx := context.Background() + pipe, _, ok := pr.Get(ctx, id, UIEnd) if !ok { t.Fatalf("not ok") } @@ -65,7 +68,8 @@ func (a adapter) PipeClose(_, pipeID string) error { func TestPipeClose(t *testing.T) { router := mux.NewRouter() - pr := RegisterPipeRoutes(router) + pr := NewLocalPipeRouter() + RegisterPipeRoutes(router, pr) defer pr.Stop() server := httptest.NewServer(router) diff --git a/app/router.go b/app/router.go index e8c25abff8..a414950fdf 100644 --- a/app/router.go +++ b/app/router.go @@ -10,6 +10,7 @@ import ( "github.com/PuerkitoBio/ghost/handlers" "github.com/gorilla/mux" "github.com/ugorji/go/codec" + "golang.org/x/net/context" "github.com/weaveworks/scope/common/hostname" "github.com/weaveworks/scope/common/xfer" @@ -24,6 +25,19 @@ var ( UniqueID = "0" ) +// RequestCtxKey is key used for request entry in context +const RequestCtxKey = "request" + +// CtxHandlerFunc is a http.HandlerFunc, with added contexts +type CtxHandlerFunc func(context.Context, http.ResponseWriter, *http.Request) + +func requestContextDecorator(f CtxHandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(context.Background(), RequestCtxKey, r) + f(ctx, w, r) + } +} + // URLMatcher uses request.RequestURI (the raw, unparsed request) to attempt // to match pattern. It does this as go's URL.Parse method is broken, and // mistakenly unescapes the Path before parsing it. This breaks %2F (encoded @@ -76,13 +90,13 @@ func gzipHandler(h http.HandlerFunc) http.HandlerFunc { // routes should be added to a router and passed to postRoutes. func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) http.Handler { get := preRoutes.Methods("GET").Subrouter() - get.HandleFunc("/api", gzipHandler(apiHandler)) - get.HandleFunc("/api/topology", gzipHandler(topologyRegistry.makeTopologyList(c))) + get.HandleFunc("/api", gzipHandler(requestContextDecorator(apiHandler))) + get.HandleFunc("/api/topology", gzipHandler(requestContextDecorator(topologyRegistry.makeTopologyList(c)))) get.HandleFunc("/api/topology/{topology}", - gzipHandler(topologyRegistry.captureRenderer(c, handleTopology))) + gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(c, handleTopology)))) get.HandleFunc("/api/topology/{topology}/ws", - topologyRegistry.captureRenderer(c, handleWs)) // NB not gzip! - get.HandleFunc("/api/report", gzipHandler(makeRawReportHandler(c))) + requestContextDecorator(topologyRegistry.captureRenderer(c, handleWs))) // NB not gzip! + get.HandleFunc("/api/report", gzipHandler(requestContextDecorator(makeRawReportHandler(c)))) // We pull in the http.DefaultServeMux to get the pprof routes preRoutes.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) @@ -107,9 +121,9 @@ func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) return } - handler := gzipHandler(topologyRegistry.captureRendererWithoutFilters( + handler := gzipHandler(requestContextDecorator(topologyRegistry.captureRendererWithoutFilters( c, topologyID, handleNode(nodeID), - )) + ))) handler.ServeHTTP(w, r) }) } @@ -117,7 +131,7 @@ func TopologyHandler(c Reporter, preRoutes *mux.Router, postRoutes http.Handler) // RegisterReportPostHandler registers the handler for report submission func RegisterReportPostHandler(a Adder, router *mux.Router) { post := router.Methods("POST").Subrouter() - post.HandleFunc("/api/report", func(w http.ResponseWriter, r *http.Request) { + post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( rpt report.Report reader = r.Body @@ -142,15 +156,15 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { http.Error(w, err.Error(), http.StatusBadRequest) return } - a.Add(rpt) + a.Add(ctx, rpt) if len(rpt.Pod.Nodes) > 0 { topologyRegistry.enableKubernetesTopologies() } w.WriteHeader(http.StatusOK) - }) + })) } -func apiHandler(w http.ResponseWriter, r *http.Request) { +func apiHandler(_ context.Context, w http.ResponseWriter, r *http.Request) { respondWith(w, http.StatusOK, xfer.Details{ ID: UniqueID, Version: Version, diff --git a/app/router_test.go b/app/router_test.go index 6a369497a4..cc80b522cd 100644 --- a/app/router_test.go +++ b/app/router_test.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/ugorji/go/codec" + "golang.org/x/net/context" "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/test" @@ -73,7 +74,8 @@ func TestReportPostHandler(t *testing.T) { t.Fatalf("Error posting report: %d", resp.StatusCode) } - if want, have := fixture.Report.Endpoint.Nodes, c.Report().Endpoint.Nodes; len(have) == 0 || len(want) != len(have) { + ctx := context.Background() + if want, have := fixture.Report.Endpoint.Nodes, c.Report(ctx).Endpoint.Nodes; len(have) == 0 || len(want) != len(have) { t.Fatalf("Content-Type %s: %v", contentType, test.Diff(have, want)) } } diff --git a/prog/app.go b/prog/app.go index a53ec62f0d..fc71d7657d 100644 --- a/prog/app.go +++ b/prog/app.go @@ -23,8 +23,8 @@ import ( func router(c app.Collector) http.Handler { router := mux.NewRouter() app.RegisterReportPostHandler(c, router) - app.RegisterControlRoutes(router) - app.RegisterPipeRoutes(router) + app.RegisterControlRoutes(router, app.NewLocalControlRouter()) + app.RegisterPipeRoutes(router, app.NewLocalPipeRouter()) return app.TopologyHandler(c, router, http.FileServer(FS(false))) }