From f26682087b0f3b0cc524087982d82beb037db5ee Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 22 Dec 2014 11:38:31 -0800 Subject: [PATCH] Export Consumer.Frozen and add http introspection handler --- httputil/httputil.go | 37 +++++++++++++++++++++++++ httputil/httputil_test.go | 58 +++++++++++++++++++++++++++++++++++++++ metafora.go | 16 +++++++---- metafora_test.go | 2 +- 4 files changed, 107 insertions(+), 6 deletions(-) create mode 100644 httputil/httputil.go create mode 100644 httputil/httputil_test.go diff --git a/httputil/httputil.go b/httputil/httputil.go new file mode 100644 index 0000000..212ac3d --- /dev/null +++ b/httputil/httputil.go @@ -0,0 +1,37 @@ +package httputil + +import ( + "encoding/json" + "net/http" + "time" +) + +// Consumer contains just the Metafora methods exposed by the HTTP +// introspection endpoints. +type Consumer interface { + Frozen() bool + Tasks() []string +} + +// InfoResponse is the JSON response marshalled by the MakeInfoHandler. +type InfoResponse struct { + Frozen bool `json:"frozen"` + Node string `json:"node"` + Started time.Time `json:"started"` + Tasks []string `json:"tasks"` +} + +// MakeInfoHandler returns an HTTP handler which can be added to an exposed +// HTTP server mux by Metafora applications to provide operators with basic +// node introspection. +func MakeInfoHandler(c Consumer, node string, started time.Time) http.HandlerFunc { + return func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(&InfoResponse{ + Frozen: c.Frozen(), + Node: node, + Started: started, + Tasks: c.Tasks(), + }) + } +} diff --git a/httputil/httputil_test.go b/httputil/httputil_test.go new file mode 100644 index 0000000..d27098e --- /dev/null +++ b/httputil/httputil_test.go @@ -0,0 +1,58 @@ +package httputil_test + +import ( + "encoding/json" + "net/http/httptest" + "testing" + "time" + + "github.com/lytics/metafora" + . "github.com/lytics/metafora/httputil" +) + +type tc struct { + stop chan bool +} + +func (*tc) Init(metafora.CoordinatorContext) error { return nil } +func (c *tc) Watch() (string, error) { + <-c.stop + return "", nil +} +func (c *tc) Claim(string) bool { return false } +func (c *tc) Release(string) {} +func (c *tc) Done(string) {} +func (c *tc) Command() (metafora.Command, error) { + <-c.stop + return nil, nil +} +func (c *tc) Close() { close(c.stop) } + +func TestMakeInfoHandler(t *testing.T) { + t.Parallel() + + c, _ := metafora.NewConsumer(&tc{stop: make(chan bool)}, nil, &metafora.DumbBalancer{}) + defer c.Shutdown() + name := "test-name" + now := time.Now().Truncate(time.Second) + + resp := httptest.NewRecorder() + MakeInfoHandler(c, name, now)(resp, nil) + + info := InfoResponse{} + if err := json.Unmarshal(resp.Body.Bytes(), &info); err != nil { + t.Fatalf("Error unmarshalling response body: %v", err) + } + if info.Frozen { + t.Errorf("Consumer should not start frozen.") + } + if !info.Started.Equal(now) { + t.Errorf("Started time %s != %s", info.Started, now) + } + if info.Node != name { + t.Errorf("Node name %s != %s", info.Node, name) + } + if len(info.Tasks) != 0 { + t.Errorf("Unexpected tasks: %v", info.Tasks) + } +} diff --git a/metafora.go b/metafora.go index 2115cfc..baf3eda 100644 --- a/metafora.go +++ b/metafora.go @@ -179,7 +179,7 @@ func (c *Consumer) Run() { // Main Loop ensures events are processed synchronously for { - if c.frozen() { + if c.Frozen() { // Only recv commands while frozen select { case <-c.stop: @@ -275,7 +275,8 @@ func (c *Consumer) watcher() { func (c *Consumer) balance() { for _, task := range c.bal.Balance() { - //TODO Release tasks asynchronously as their shutdown might be slow? + //FIXME Release tasks asynchronously as their shutdown might be slow or + // block indefinitely. c.release(task) } } @@ -452,7 +453,12 @@ func (c *Consumer) stopTask(taskID string) bool { return true } -func (c *Consumer) frozen() bool { +// Frozen returns true if Metafora is no longer watching for new tasks or +// rebalancing. +// +// Metafora will remain frozen until receiving an Unfreeze command or it is +// restarted (frozen state is not persisted). +func (c *Consumer) Frozen() bool { c.freezeL.Lock() r := c.freeze c.freezeL.Unlock() @@ -462,7 +468,7 @@ func (c *Consumer) frozen() bool { func (c *Consumer) handleCommand(cmd Command) { switch cmd.Name() { case cmdFreeze: - if c.frozen() { + if c.Frozen() { c.logger.Log(LogLevelInfo, "Ignoring freeze command: already frozen") return } @@ -471,7 +477,7 @@ func (c *Consumer) handleCommand(cmd Command) { c.freeze = true c.freezeL.Unlock() case cmdUnfreeze: - if !c.frozen() { + if !c.Frozen() { c.logger.Log(LogLevelInfo, "Ignoring unfreeze command: not frozen") return } diff --git a/metafora_test.go b/metafora_test.go index 6fbb7b9..a8fe9ba 100644 --- a/metafora_test.go +++ b/metafora_test.go @@ -101,7 +101,7 @@ func TestConsumer(t *testing.T) { hf, tasksRun := newTestHandlerFunc(t) // Create the consumer and run it - c, _ := NewConsumer(tc, hf, &DumbBalancer{}) + c, _ := NewConsumer(tc, hf, bal) s := make(chan int) start := time.Now() go func() {