Skip to content

Commit

Permalink
Refactor app for multitenancy
Browse files Browse the repository at this point in the history
- Add interfaces to allow for alternative implementations for Collector, ControlRouter
  and PipeRouter.
- Pass contexts on http handlers to these interfaces.  Although not used by the current
  (local, in-memory) implementations, the idea is this will be used to pass headers to
  implementations which support multitenancy (by, for instance, putting an authenticating
  reverse proxy in form of the app, and then inspecting the headers of the request for
  a used id).
  • Loading branch information
tomwilkie committed Feb 22, 2016
1 parent 0a2f629 commit 5f7f74b
Show file tree
Hide file tree
Showing 15 changed files with 422 additions and 323 deletions.
9 changes: 5 additions & 4 deletions app/api_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package app

import (
"net/http"

"golang.org/x/net/context"
)

// Raw report handler
func makeRawReportHandler(rep Reporter) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
// r.ParseForm()
respondWith(w, http.StatusOK, rep.Report())
func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, rep.Report(ctx))
}
}
21 changes: 11 additions & 10 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/gorilla/mux"
"golang.org/x/net/context"

"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
Expand Down Expand Up @@ -192,9 +193,9 @@ func (r *registry) walk(f func(APITopologyDesc)) {
}

// makeTopologyList returns a handler that yields an APITopologyList.
func (r *registry) makeTopologyList(rep Reporter) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(), req)
func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(ctx), req)
respondWith(w, http.StatusOK, topologies)
}
}
Expand Down Expand Up @@ -252,27 +253,27 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) render.Render
return renderer
}

type reportRenderHandler func(Reporter, render.Renderer, http.ResponseWriter, *http.Request)
type reportRenderHandler func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request)

func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(mux.Vars(req)["topology"])
if !ok {
http.NotFound(w, req)
return
}
renderer := renderedForRequest(req, topology)
f(rep, renderer, w, req)
f(ctx, rep, renderer, w, req)
}
}

func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(topologyID)
if !ok {
http.NotFound(w, req)
return
}
f(rep, topology.renderer, w, req)
f(ctx, rep, topology.renderer, w, req)
}
}
26 changes: 15 additions & 11 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"golang.org/x/net/context"

"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/render"
Expand All @@ -28,14 +29,14 @@ type APINode struct {
}

// Full topology.
func handleTopology(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{
Nodes: renderer.Render(rep.Report()).Prune(),
Nodes: renderer.Render(rep.Report(ctx)).Prune(),
})
}

// Websocket for the full topology. This route overlaps with the next.
func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleWs(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
Expand All @@ -48,15 +49,15 @@ func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *
return
}
}
handleWebsocket(w, r, rep, renderer, loop)
handleWebsocket(ctx, w, r, rep, renderer, loop)
}

// Individual nodes.
func handleNode(nodeID string) func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleNode(nodeID string) func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
var (
rpt = rep.Report()
node, ok = renderer.Render(rep.Report())[nodeID]
rpt = rep.Report(ctx)
node, ok = renderer.Render(rep.Report(ctx))[nodeID]
)
if !ok {
http.NotFound(w, r)
Expand All @@ -71,6 +72,7 @@ var upgrader = websocket.Upgrader{
}

func handleWebsocket(
ctx context.Context,
w http.ResponseWriter,
r *http.Request,
rep Reporter,
Expand All @@ -88,6 +90,7 @@ func handleWebsocket(
go func(c *websocket.Conn) {
for { // just discard everything the browser sends
if _, _, err := c.NextReader(); err != nil {
log.Println("err:", err)
close(quit)
break
}
Expand All @@ -99,15 +102,16 @@ func handleWebsocket(
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
)
rep.WaitOn(wait)
defer rep.UnWait(wait)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)

for {
newTopo := renderer.Render(rep.Report()).Prune()
newTopo := renderer.Render(rep.Report(ctx)).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo

if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil {
log.Println("err:", err)
return
}

Expand Down
17 changes: 9 additions & 8 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ import (
"time"

"github.com/spaolacci/murmur3"
"golang.org/x/net/context"

"github.com/weaveworks/scope/report"
)

// Reporter is something that can produce reports on demand. It's a convenient
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report() report.Report
WaitOn(chan struct{})
UnWait(chan struct{})
Report(context.Context) report.Report
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
}

// Adder is something that can accept reports. It's a convenient interface for
// parts of the app, and several experimental components.
type Adder interface {
Add(report.Report)
Add(context.Context, report.Report)
}

// A Collector is a Reporter and an Adder
Expand All @@ -45,13 +46,13 @@ type waitableCondition struct {
waiters map[chan struct{}]struct{}
}

func (wc *waitableCondition) WaitOn(waiter chan struct{}) {
func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}

func (wc *waitableCondition) UnWait(waiter chan struct{}) {
func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewCollector(window time.Duration) Collector {
var now = time.Now

// Add adds a report to the collector's internal state. It implements Adder.
func (c *collector) Add(rpt report.Report) {
func (c *collector) Add(_ context.Context, rpt report.Report) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
Expand All @@ -95,7 +96,7 @@ func (c *collector) Add(rpt report.Report) {

// Report returns a merged report over all added reports. It implements
// Reporter.
func (c *collector) Report() report.Report {
func (c *collector) Report(_ context.Context) report.Report {
c.mtx.Lock()
defer c.mtx.Unlock()

Expand Down
18 changes: 11 additions & 7 deletions app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"testing"
"time"

"golang.org/x/net/context"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/reflect"
)

func TestCollector(t *testing.T) {
ctx := context.Background()
window := time.Millisecond
c := app.NewCollector(window)

Expand All @@ -20,32 +23,33 @@ func TestCollector(t *testing.T) {
r2 := report.MakeReport()
r2.Endpoint.AddNode("bar", report.MakeNode())

if want, have := report.MakeReport(), c.Report(); !reflect.DeepEqual(want, have) {
if want, have := report.MakeReport(), c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

c.Add(r1)
if want, have := r1, c.Report(); !reflect.DeepEqual(want, have) {
c.Add(ctx, r1)
if want, have := r1, c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

c.Add(r2)
c.Add(ctx, r2)

merged := report.MakeReport()
merged = merged.Merge(r1)
merged = merged.Merge(r2)
if want, have := merged, c.Report(); !reflect.DeepEqual(want, have) {
if want, have := merged, c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}

func TestCollectorWait(t *testing.T) {
ctx := context.Background()
window := time.Millisecond
c := app.NewCollector(window)

waiter := make(chan struct{}, 1)
c.WaitOn(waiter)
defer c.UnWait(waiter)
c.WaitOn(ctx, waiter)
defer c.UnWait(ctx, waiter)
c.(interface {
Broadcast()
}).Broadcast()
Expand Down
69 changes: 69 additions & 0 deletions app/control_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package app

import (
"fmt"
"math/rand"
"sync"

"golang.org/x/net/context"

"github.com/weaveworks/scope/common/xfer"
)

// ControlRouter is a thing that can route control requests and responses
// between the UI and a probe.
type ControlRouter interface {
Handle(ctx context.Context, probeID string, req xfer.Request) (xfer.Response, error)
Register(ctx context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error)
Deregister(ctx context.Context, probeID string, id int64) error
}

// NewLocalControlRouter creates a new ControlRouter that does everything
// locally, in memory.
func NewLocalControlRouter() ControlRouter {
return &localControlRouter{
probes: map[string]probe{},
}
}

type localControlRouter struct {
sync.Mutex
probes map[string]probe
}

type probe struct {
id int64
handler xfer.ControlHandlerFunc
}

func (l *localControlRouter) Handle(_ context.Context, probeID string, req xfer.Request) (xfer.Response, error) {
l.Lock()
probe, ok := l.probes[probeID]
l.Unlock()
if !ok {
return xfer.Response{}, fmt.Errorf("Probe %s is not connected right now...", probeID)
}
return probe.handler(req), nil
}

func (l *localControlRouter) Register(_ context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) {
l.Lock()
defer l.Unlock()
id := rand.Int63()
l.probes[probeID] = probe{
id: id,
handler: handler,
}
return id, nil
}

func (l *localControlRouter) Deregister(_ context.Context, probeID string, id int64) error {
l.Lock()
defer l.Unlock()
// NB probe might have reconnected in the mean time, need to ensure we do not
// delete new connection! Also, it might have connected then deleted itself!
if l.probes[probeID].id == id {
delete(l.probes, probeID)
}
return nil
}
Loading

0 comments on commit 5f7f74b

Please sign in to comment.