Skip to content

Commit

Permalink
Merge pull request #79 from lytics/export-frozen
Browse files Browse the repository at this point in the history
Export Consumer.Frozen and add http introspection handler
  • Loading branch information
schmichael committed Dec 22, 2014
2 parents 26b85a1 + f266820 commit 4e31a61
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 6 deletions.
37 changes: 37 additions & 0 deletions httputil/httputil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package httputil

import (
"encoding/json"
"net/http"
"time"
)

// Consumer contains just the Metafora methods exposed by the HTTP
// introspection endpoints.
type Consumer interface {
Frozen() bool
Tasks() []string
}

// InfoResponse is the JSON response marshalled by the MakeInfoHandler.
type InfoResponse struct {
Frozen bool `json:"frozen"`
Node string `json:"node"`
Started time.Time `json:"started"`
Tasks []string `json:"tasks"`
}

// MakeInfoHandler returns an HTTP handler which can be added to an exposed
// HTTP server mux by Metafora applications to provide operators with basic
// node introspection.
func MakeInfoHandler(c Consumer, node string, started time.Time) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(&InfoResponse{
Frozen: c.Frozen(),
Node: node,
Started: started,
Tasks: c.Tasks(),
})
}
}
58 changes: 58 additions & 0 deletions httputil/httputil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package httputil_test

import (
"encoding/json"
"net/http/httptest"
"testing"
"time"

"github.com/lytics/metafora"
. "github.com/lytics/metafora/httputil"
)

type tc struct {
stop chan bool
}

func (*tc) Init(metafora.CoordinatorContext) error { return nil }
func (c *tc) Watch() (string, error) {
<-c.stop
return "", nil
}
func (c *tc) Claim(string) bool { return false }
func (c *tc) Release(string) {}
func (c *tc) Done(string) {}
func (c *tc) Command() (metafora.Command, error) {
<-c.stop
return nil, nil
}
func (c *tc) Close() { close(c.stop) }

func TestMakeInfoHandler(t *testing.T) {
t.Parallel()

c, _ := metafora.NewConsumer(&tc{stop: make(chan bool)}, nil, &metafora.DumbBalancer{})
defer c.Shutdown()
name := "test-name"
now := time.Now().Truncate(time.Second)

resp := httptest.NewRecorder()
MakeInfoHandler(c, name, now)(resp, nil)

info := InfoResponse{}
if err := json.Unmarshal(resp.Body.Bytes(), &info); err != nil {
t.Fatalf("Error unmarshalling response body: %v", err)
}
if info.Frozen {
t.Errorf("Consumer should not start frozen.")
}
if !info.Started.Equal(now) {
t.Errorf("Started time %s != %s", info.Started, now)
}
if info.Node != name {
t.Errorf("Node name %s != %s", info.Node, name)
}
if len(info.Tasks) != 0 {
t.Errorf("Unexpected tasks: %v", info.Tasks)
}
}
16 changes: 11 additions & 5 deletions metafora.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (c *Consumer) Run() {

// Main Loop ensures events are processed synchronously
for {
if c.frozen() {
if c.Frozen() {
// Only recv commands while frozen
select {
case <-c.stop:
Expand Down Expand Up @@ -275,7 +275,8 @@ func (c *Consumer) watcher() {

func (c *Consumer) balance() {
for _, task := range c.bal.Balance() {
//TODO Release tasks asynchronously as their shutdown might be slow?
//FIXME Release tasks asynchronously as their shutdown might be slow or
// block indefinitely.
c.release(task)
}
}
Expand Down Expand Up @@ -452,7 +453,12 @@ func (c *Consumer) stopTask(taskID string) bool {
return true
}

func (c *Consumer) frozen() bool {
// Frozen returns true if Metafora is no longer watching for new tasks or
// rebalancing.
//
// Metafora will remain frozen until receiving an Unfreeze command or it is
// restarted (frozen state is not persisted).
func (c *Consumer) Frozen() bool {
c.freezeL.Lock()
r := c.freeze
c.freezeL.Unlock()
Expand All @@ -462,7 +468,7 @@ func (c *Consumer) frozen() bool {
func (c *Consumer) handleCommand(cmd Command) {
switch cmd.Name() {
case cmdFreeze:
if c.frozen() {
if c.Frozen() {
c.logger.Log(LogLevelInfo, "Ignoring freeze command: already frozen")
return
}
Expand All @@ -471,7 +477,7 @@ func (c *Consumer) handleCommand(cmd Command) {
c.freeze = true
c.freezeL.Unlock()
case cmdUnfreeze:
if !c.frozen() {
if !c.Frozen() {
c.logger.Log(LogLevelInfo, "Ignoring unfreeze command: not frozen")
return
}
Expand Down
2 changes: 1 addition & 1 deletion metafora_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestConsumer(t *testing.T) {
hf, tasksRun := newTestHandlerFunc(t)

// Create the consumer and run it
c, _ := NewConsumer(tc, hf, &DumbBalancer{})
c, _ := NewConsumer(tc, hf, bal)
s := make(chan int)
start := time.Now()
go func() {
Expand Down

0 comments on commit 4e31a61

Please sign in to comment.