Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor app for multitenancy #997

Merged
merged 1 commit into from
Feb 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions app/api_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package app

import (
"net/http"

"golang.org/x/net/context"
)

// Raw report handler
func makeRawReportHandler(rep Reporter) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
// r.ParseForm()
respondWith(w, http.StatusOK, rep.Report())
func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

respondWith(w, http.StatusOK, rep.Report(ctx))
}
}
21 changes: 11 additions & 10 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/gorilla/mux"
"golang.org/x/net/context"

"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
Expand Down Expand Up @@ -192,9 +193,9 @@ func (r *registry) walk(f func(APITopologyDesc)) {
}

// makeTopologyList returns a handler that yields an APITopologyList.
func (r *registry) makeTopologyList(rep Reporter) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(), req)
func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(ctx), req)
respondWith(w, http.StatusOK, topologies)
}
}
Expand Down Expand Up @@ -252,27 +253,27 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) render.Render
return renderer
}

type reportRenderHandler func(Reporter, render.Renderer, http.ResponseWriter, *http.Request)
type reportRenderHandler func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request)

func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(mux.Vars(req)["topology"])
if !ok {
http.NotFound(w, req)
return
}
renderer := renderedForRequest(req, topology)
f(rep, renderer, w, req)
f(ctx, rep, renderer, w, req)
}
}

func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
func (r *registry) captureRendererWithoutFilters(rep Reporter, topologyID string, f reportRenderHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(topologyID)
if !ok {
http.NotFound(w, req)
return
}
f(rep, topology.renderer, w, req)
f(ctx, rep, topology.renderer, w, req)
}
}
26 changes: 15 additions & 11 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"golang.org/x/net/context"

"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/render"
Expand All @@ -28,14 +29,14 @@ type APINode struct {
}

// Full topology.
func handleTopology(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{
Nodes: renderer.Render(rep.Report()).Prune(),
Nodes: renderer.Render(rep.Report(ctx)).Prune(),
})
}

// Websocket for the full topology. This route overlaps with the next.
func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleWs(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
Expand All @@ -48,15 +49,15 @@ func handleWs(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *
return
}
}
handleWebsocket(w, r, rep, renderer, loop)
handleWebsocket(ctx, w, r, rep, renderer, loop)
}

// Individual nodes.
func handleNode(nodeID string) func(Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
func handleNode(nodeID string) func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
var (
rpt = rep.Report()
node, ok = renderer.Render(rep.Report())[nodeID]
rpt = rep.Report(ctx)
node, ok = renderer.Render(rep.Report(ctx))[nodeID]
)
if !ok {
http.NotFound(w, r)
Expand All @@ -71,6 +72,7 @@ var upgrader = websocket.Upgrader{
}

func handleWebsocket(
ctx context.Context,
w http.ResponseWriter,
r *http.Request,
rep Reporter,
Expand All @@ -88,6 +90,7 @@ func handleWebsocket(
go func(c *websocket.Conn) {
for { // just discard everything the browser sends
if _, _, err := c.NextReader(); err != nil {
log.Println("err:", err)
close(quit)
break
}
Expand All @@ -99,15 +102,16 @@ func handleWebsocket(
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
)
rep.WaitOn(wait)
defer rep.UnWait(wait)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)

for {
newTopo := renderer.Render(rep.Report()).Prune()
newTopo := renderer.Render(rep.Report(ctx)).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo

if err := conn.SetWriteDeadline(time.Now().Add(websocketTimeout)); err != nil {
log.Println("err:", err)
return
}

Expand Down
17 changes: 9 additions & 8 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ import (
"time"

"github.com/spaolacci/murmur3"
"golang.org/x/net/context"

"github.com/weaveworks/scope/report"
)

// Reporter is something that can produce reports on demand. It's a convenient
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report() report.Report
WaitOn(chan struct{})
UnWait(chan struct{})
Report(context.Context) report.Report
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
}

// Adder is something that can accept reports. It's a convenient interface for
// parts of the app, and several experimental components.
type Adder interface {
Add(report.Report)
Add(context.Context, report.Report)
}

// A Collector is a Reporter and an Adder
Expand All @@ -45,13 +46,13 @@ type waitableCondition struct {
waiters map[chan struct{}]struct{}
}

func (wc *waitableCondition) WaitOn(waiter chan struct{}) {
func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}

func (wc *waitableCondition) UnWait(waiter chan struct{}) {
func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewCollector(window time.Duration) Collector {
var now = time.Now

// Add adds a report to the collector's internal state. It implements Adder.
func (c *collector) Add(rpt report.Report) {
func (c *collector) Add(_ context.Context, rpt report.Report) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
Expand All @@ -95,7 +96,7 @@ func (c *collector) Add(rpt report.Report) {

// Report returns a merged report over all added reports. It implements
// Reporter.
func (c *collector) Report() report.Report {
func (c *collector) Report(_ context.Context) report.Report {
c.mtx.Lock()
defer c.mtx.Unlock()

Expand Down
18 changes: 11 additions & 7 deletions app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"testing"
"time"

"golang.org/x/net/context"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/reflect"
)

func TestCollector(t *testing.T) {
ctx := context.Background()
window := time.Millisecond
c := app.NewCollector(window)

Expand All @@ -20,32 +23,33 @@ func TestCollector(t *testing.T) {
r2 := report.MakeReport()
r2.Endpoint.AddNode("bar", report.MakeNode())

if want, have := report.MakeReport(), c.Report(); !reflect.DeepEqual(want, have) {
if want, have := report.MakeReport(), c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

c.Add(r1)
if want, have := r1, c.Report(); !reflect.DeepEqual(want, have) {
c.Add(ctx, r1)
if want, have := r1, c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

c.Add(r2)
c.Add(ctx, r2)

merged := report.MakeReport()
merged = merged.Merge(r1)
merged = merged.Merge(r2)
if want, have := merged, c.Report(); !reflect.DeepEqual(want, have) {
if want, have := merged, c.Report(ctx); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}

func TestCollectorWait(t *testing.T) {
ctx := context.Background()
window := time.Millisecond
c := app.NewCollector(window)

waiter := make(chan struct{}, 1)
c.WaitOn(waiter)
defer c.UnWait(waiter)
c.WaitOn(ctx, waiter)
defer c.UnWait(ctx, waiter)
c.(interface {
Broadcast()
}).Broadcast()
Expand Down
69 changes: 69 additions & 0 deletions app/control_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package app

import (
"fmt"
"math/rand"
"sync"

"golang.org/x/net/context"

"github.com/weaveworks/scope/common/xfer"
)

// ControlRouter is a thing that can route control requests and responses
// between the UI and a probe.
type ControlRouter interface {
Handle(ctx context.Context, probeID string, req xfer.Request) (xfer.Response, error)
Register(ctx context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error)
Deregister(ctx context.Context, probeID string, id int64) error
}

// NewLocalControlRouter creates a new ControlRouter that does everything
// locally, in memory.
func NewLocalControlRouter() ControlRouter {
return &localControlRouter{
probes: map[string]probe{},
}
}

type localControlRouter struct {
sync.Mutex
probes map[string]probe
}

type probe struct {
id int64
handler xfer.ControlHandlerFunc
}

func (l *localControlRouter) Handle(_ context.Context, probeID string, req xfer.Request) (xfer.Response, error) {
l.Lock()
probe, ok := l.probes[probeID]
l.Unlock()
if !ok {
return xfer.Response{}, fmt.Errorf("Probe %s is not connected right now...", probeID)
}
return probe.handler(req), nil
}

func (l *localControlRouter) Register(_ context.Context, probeID string, handler xfer.ControlHandlerFunc) (int64, error) {
l.Lock()
defer l.Unlock()
id := rand.Int63()
l.probes[probeID] = probe{
id: id,
handler: handler,
}
return id, nil
}

func (l *localControlRouter) Deregister(_ context.Context, probeID string, id int64) error {
l.Lock()
defer l.Unlock()
// NB probe might have reconnected in the mean time, need to ensure we do not
// delete new connection! Also, it might have connected then deleted itself!
if l.probes[probeID].id == id {
delete(l.probes, probeID)
}
return nil
}
Loading