Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Consumer.Frozen and add http introspection handler #79

Merged
merged 1 commit into from
Dec 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions httputil/httputil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package httputil

import (
"encoding/json"
"net/http"
"time"
)

// Consumer contains just the Metafora methods exposed by the HTTP
// introspection endpoints.
type Consumer interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Consumer for a name? Im not sure that describes the interface's relationship to the metafora project. From the code it appears to be a metadata provider for the current metafora peer. Anyway, not a blocker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand the name now... Its a consumed by the http handler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, I wasn't even that clever... I just stole the name from metafora.go where it's implemented. I'll give it a few minutes of thought to try to come up with a better interface name before hitting the merge button. You know how I love making things idiomatic! :)

Frozen() bool
Tasks() []string
}

// InfoResponse is the JSON response marshalled by the MakeInfoHandler.
type InfoResponse struct {
Frozen bool `json:"frozen"`
Node string `json:"node"`
Started time.Time `json:"started"`
Tasks []string `json:"tasks"`
}

// MakeInfoHandler returns an HTTP handler which can be added to an exposed
// HTTP server mux by Metafora applications to provide operators with basic
// node introspection.
func MakeInfoHandler(c Consumer, node string, started time.Time) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the Consumer have a Node() method? Not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. For better or worse (I honestly don't know), the node name is an etcd Coordinator-only concept. It's not used or exposed anywhere in the core Consumer or interfaces.

A better idea might be to make the name a Consumer level concept, and the Consumer informs the Coordinator of its name via the Coordinator.Init() method.

Seems out of scope for this PR, so I created #80. Further comments welcome! I think it's a good enhancement.

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(&InfoResponse{
Frozen: c.Frozen(),
Node: node,
Started: started,
Tasks: c.Tasks(),
})
}
}
58 changes: 58 additions & 0 deletions httputil/httputil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package httputil_test

import (
"encoding/json"
"net/http/httptest"
"testing"
"time"

"github.com/lytics/metafora"
. "github.com/lytics/metafora/httputil"
)

type tc struct {
stop chan bool
}

func (*tc) Init(metafora.CoordinatorContext) error { return nil }
func (c *tc) Watch() (string, error) {
<-c.stop
return "", nil
}
func (c *tc) Claim(string) bool { return false }
func (c *tc) Release(string) {}
func (c *tc) Done(string) {}
func (c *tc) Command() (metafora.Command, error) {
<-c.stop
return nil, nil
}
func (c *tc) Close() { close(c.stop) }

func TestMakeInfoHandler(t *testing.T) {
t.Parallel()

c, _ := metafora.NewConsumer(&tc{stop: make(chan bool)}, nil, &metafora.DumbBalancer{})
defer c.Shutdown()
name := "test-name"
now := time.Now().Truncate(time.Second)

resp := httptest.NewRecorder()
MakeInfoHandler(c, name, now)(resp, nil)

info := InfoResponse{}
if err := json.Unmarshal(resp.Body.Bytes(), &info); err != nil {
t.Fatalf("Error unmarshalling response body: %v", err)
}
if info.Frozen {
t.Errorf("Consumer should not start frozen.")
}
if !info.Started.Equal(now) {
t.Errorf("Started time %s != %s", info.Started, now)
}
if info.Node != name {
t.Errorf("Node name %s != %s", info.Node, name)
}
if len(info.Tasks) != 0 {
t.Errorf("Unexpected tasks: %v", info.Tasks)
}
}
16 changes: 11 additions & 5 deletions metafora.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (c *Consumer) Run() {

// Main Loop ensures events are processed synchronously
for {
if c.frozen() {
if c.Frozen() {
// Only recv commands while frozen
select {
case <-c.stop:
Expand Down Expand Up @@ -275,7 +275,8 @@ func (c *Consumer) watcher() {

func (c *Consumer) balance() {
for _, task := range c.bal.Balance() {
//TODO Release tasks asynchronously as their shutdown might be slow?
//FIXME Release tasks asynchronously as their shutdown might be slow or
// block indefinitely.
c.release(task)
}
}
Expand Down Expand Up @@ -452,7 +453,12 @@ func (c *Consumer) stopTask(taskID string) bool {
return true
}

func (c *Consumer) frozen() bool {
// Frozen returns true if Metafora is no longer watching for new tasks or
// rebalancing.
//
// Metafora will remain frozen until receiving an Unfreeze command or it is
// restarted (frozen state is not persisted).
func (c *Consumer) Frozen() bool {
c.freezeL.Lock()
r := c.freeze
c.freezeL.Unlock()
Expand All @@ -462,7 +468,7 @@ func (c *Consumer) frozen() bool {
func (c *Consumer) handleCommand(cmd Command) {
switch cmd.Name() {
case cmdFreeze:
if c.frozen() {
if c.Frozen() {
c.logger.Log(LogLevelInfo, "Ignoring freeze command: already frozen")
return
}
Expand All @@ -471,7 +477,7 @@ func (c *Consumer) handleCommand(cmd Command) {
c.freeze = true
c.freezeL.Unlock()
case cmdUnfreeze:
if !c.frozen() {
if !c.Frozen() {
c.logger.Log(LogLevelInfo, "Ignoring unfreeze command: not frozen")
return
}
Expand Down
2 changes: 1 addition & 1 deletion metafora_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestConsumer(t *testing.T) {
hf, tasksRun := newTestHandlerFunc(t)

// Create the consumer and run it
c, _ := NewConsumer(tc, hf, &DumbBalancer{})
c, _ := NewConsumer(tc, hf, bal)
s := make(chan int)
start := time.Now()
go func() {
Expand Down