Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch task ID to Task interface throughout. #133

Merged
merged 2 commits into from
Jul 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ _testmain.go
*.exe
*.test
*.prof
cover.out

*.orig
*.swp
Expand Down
11 changes: 11 additions & 0 deletions Documentation/etcd.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ that implementing Metafora with etcd in your own work system is quick and easy.
├── tasks
│ └── <task_id>
│ ├── props JSON value (optional)
│ └── owner Ephemeral, JSON value
├── state Optional, only if using state store
Expand Down Expand Up @@ -50,6 +51,16 @@ The JSON format is:

Note that Metafora does not handle task parameters or configuration.

#### Task Properties

Optionally tasks may have a properties key with a JSON value. The value must be
immutable for the life of the task.

Users may set a custom `NewTask` function on their `EtcdCoordinator` in order
to unmarshal properties into a custom struct. The struct must implement the
`metafora.Task` interface and code that wishes to use implementation specific
methods or fields will have to type assert.

### Node Commands

Metafora clients can send node commands by making a file inside
Expand Down
16 changes: 8 additions & 8 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ var NoDelay = time.Time{}
// BalancerContext is a limited interface exposed to Balancers from the
// Consumer for access to limited Consumer state.
type BalancerContext interface {
// Tasks returns a sorted list of task IDs run by this Consumer. The Consumer
// stops task manipulations during claiming and balancing, so the list will
// be accurate unless a task naturally completes.
Tasks() []Task
// Tasks returns a sorted list of task IDs owned by this Consumer. The
// Consumer stops task manipulations during claiming and balancing, so the
// list will be accurate unless a task naturally completes.
Tasks() []RunningTask
}

// Balancer is the core task balancing interface. Without a master Metafora
Expand All @@ -38,7 +38,7 @@ type Balancer interface {
//
// When denying a claim by returning false, CanClaim should return the time
// at which to reconsider the task for claiming.
CanClaim(taskID string) (ignoreUntil time.Time, claim bool)
CanClaim(task Task) (ignoreUntil time.Time, claim bool)

// Balance should return the list of Task IDs that should be released. The
// criteria used to determine which tasks should be released is left up to
Expand All @@ -56,7 +56,7 @@ type dumbBalancer struct{}
func (dumbBalancer) Init(BalancerContext) {}

// CanClaim always returns true.
func (dumbBalancer) CanClaim(string) (time.Time, bool) { return NoDelay, true }
func (dumbBalancer) CanClaim(Task) (time.Time, bool) { return NoDelay, true }

// Balance never returns any tasks to balance.
func (dumbBalancer) Balance() []string { return nil }
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *FairBalancer) Init(s BalancerContext) {

// CanClaim rejects tasks for a period of time if the last balance released
// tasks. Otherwise all tasks are accepted.
func (e *FairBalancer) CanClaim(taskid string) (time.Time, bool) {
func (e *FairBalancer) CanClaim(task Task) (time.Time, bool) {
if e.delay.After(time.Now()) {
// Return delay set by Balance()
return e.delay, false
Expand Down Expand Up @@ -147,7 +147,7 @@ func (e *FairBalancer) Balance() []string {

random := rand.New(rand.NewSource(time.Now().UnixNano()))
for len(releasetasks) < shouldrelease {
tid := nodetasks[random.Intn(len(nodetasks))].ID()
tid := nodetasks[random.Intn(len(nodetasks))].Task().ID()
if _, ok := releaseset[tid]; !ok {
releasetasks = append(releasetasks, tid)
releaseset[tid] = struct{}{}
Expand Down
12 changes: 6 additions & 6 deletions balancer_res.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func (b *ResourceBalancer) Balance() []string {
}

// Release the oldest task that isn't already stopping
var task Task
var oldest RunningTask
for _, t := range b.ctx.Tasks() {
if t.Stopped().IsZero() && (task == nil || task.Started().After(t.Started())) {
task = t
if t.Stopped().IsZero() && (oldest == nil || oldest.Started().After(t.Started())) {
oldest = t
}
}

// No tasks or all tasks are stopping, don't bother rebalancing
if task == nil {
if oldest == nil {
return nil
}

Infof("Releasing task %s (started %s) because %d > %d (%d of %d %s used)",
task.ID(), task.Started(), threshold, b.releaseLimit, used, total, b.reporter)
return []string{task.ID()}
oldest.Task().ID(), oldest.Started(), threshold, b.releaseLimit, used, total, b.reporter)
return []string{oldest.Task().ID()}
}
24 changes: 15 additions & 9 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestFairBalancerOneNode(t *testing.T) {
fb := NewDefaultFairBalancer("node1", clusterstate)
fb.Init(consumerstate)

if _, ok := fb.CanClaim("23"); !ok {
if _, ok := fb.CanClaim(testTask{"23"}); !ok {
t.Fatal("Expected claim to be true")
}

Expand All @@ -50,7 +50,7 @@ func TestFairBalanceOver(t *testing.T) {
fb := NewDefaultFairBalancer("node1", clusterstate)
fb.Init(consumerstate)

if _, ok := fb.CanClaim("23"); !ok {
if _, ok := fb.CanClaim(testTask{"23"}); !ok {
t.Fatal("Expected claim to be true")
}

Expand All @@ -77,7 +77,7 @@ func TestFairBalanceNothing(t *testing.T) {
fb := NewDefaultFairBalancer("node1", clusterstate)
fb.Init(consumerstate)

if _, ok := fb.CanClaim("23"); !ok {
if _, ok := fb.CanClaim(testTask{"23"}); !ok {
t.Fatal("Expected claim to be true")
}

Expand All @@ -89,6 +89,12 @@ func TestFairBalanceNothing(t *testing.T) {

}

type testTask struct {
id string
}

func (t testTask) ID() string { return t.id }

type TestClusterState struct {
Current map[string]int
Err error
Expand All @@ -106,10 +112,10 @@ type TestConsumerState struct {
Current []string
}

func (tc *TestConsumerState) Tasks() []Task {
tasks := []Task{}
func (tc *TestConsumerState) Tasks() []RunningTask {
tasks := []RunningTask{}
for _, id := range tc.Current {
tasks = append(tasks, newTask(id, nil))
tasks = append(tasks, newTask(testTask{id}, nil))
}
return tasks
}
Expand All @@ -121,10 +127,10 @@ type sbCtx struct {
tasks []string
}

func (ctx *sbCtx) Tasks() []Task {
tasks := []Task{}
func (ctx *sbCtx) Tasks() []RunningTask {
tasks := []RunningTask{}
for _, id := range ctx.tasks {
tasks = append(tasks, newTask(id, nil))
tasks = append(tasks, newTask(testTask{id}, nil))
}
return tasks
}
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package metafora

type Client interface {
// SubmitTask submits a task to the system, the task id must be unique.
SubmitTask(taskId string) error
SubmitTask(Task) error

// Delete a task
DeleteTask(taskId string) error
Expand Down
21 changes: 11 additions & 10 deletions coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type CoordinatorContext interface {
//
// Since this implies there is a window of time where the task is executing
// more than once, this is a sign of an unhealthy cluster.
Lost(taskID string)
Lost(Task)
}

// Coordinator is the core interface Metafora uses to discover, claim, and
Expand All @@ -18,23 +18,23 @@ type Coordinator interface {
// implementations. NewConsumer will return Init's return value.
Init(CoordinatorContext) error

// Watch the broker for tasks. Watch blocks until Close is called or it
// encounters an error. Tasks are sent to consumer via the tasks chan.
Watch(tasks chan<- string) (err error)
// Watch the broker for claimable tasks. Watch blocks until Close is called
// or it encounters an error. Tasks are sent to consumer via the tasks chan.
Watch(tasks chan<- Task) (err error)

// Claim is called by the Consumer when a Balancer has determined that a task
// ID can be claimed. Claim returns false if another consumer has already
// claimed the ID.
Claim(taskID string) bool
Claim(Task) bool

// Release a task for other consumers to claim. May be called after Close.
Release(taskID string)
Release(Task)

// Done is called by Metafora when a task has been completed and should never
// be scheduled to run again (in other words: deleted from the broker).
//
// May be called after Close.
Done(taskID string)
Done(Task)

// Command blocks until a command for this node is received from the broker
// by the coordinator. Command must return (nil, nil) when Close is called.
Expand All @@ -55,7 +55,8 @@ type coordinatorContext struct {

// Lost is a light wrapper around Coordinator.stopTask to make it suitable for
// calling by Coordinator implementations via the CoordinatorContext interface.
func (ctx *coordinatorContext) Lost(taskID string) {
Errorf("Lost task %s", taskID)
ctx.stopTask(taskID)
func (ctx *coordinatorContext) Lost(t Task) {
tid := t.ID()
Errorf("Lost task %s", tid)
ctx.stopTask(tid)
}
8 changes: 4 additions & 4 deletions embedded/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package embedded

import "github.com/lytics/metafora"

func NewEmbeddedClient(taskchan chan string, cmdchan chan *NodeCommand, nodechan chan []string) metafora.Client {
func NewEmbeddedClient(taskchan chan metafora.Task, cmdchan chan *NodeCommand, nodechan chan []string) metafora.Client {
return &EmbeddedClient{taskchan, cmdchan, nodechan}
}

type EmbeddedClient struct {
taskchan chan<- string
taskchan chan<- metafora.Task
cmdchan chan<- *NodeCommand
nodechan <-chan []string
}

func (ec *EmbeddedClient) SubmitTask(taskid string) error {
ec.taskchan <- taskid
func (ec *EmbeddedClient) SubmitTask(t metafora.Task) error {
ec.taskchan <- t
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions embedded/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/lytics/metafora"
)

func NewEmbeddedCoordinator(nodeid string, taskchan chan string, cmdchan chan *NodeCommand, nodechan chan []string) metafora.Coordinator {
func NewEmbeddedCoordinator(nodeid string, taskchan chan metafora.Task, cmdchan chan *NodeCommand, nodechan chan []string) metafora.Coordinator {
e := &EmbeddedCoordinator{inchan: taskchan, cmdchan: cmdchan, stopchan: make(chan struct{}), nodechan: nodechan}
// HACK - need to respond to node requests, assuming a single coordinator/client pair
go func() {
Expand All @@ -26,7 +26,7 @@ func NewEmbeddedCoordinator(nodeid string, taskchan chan string, cmdchan chan *N
type EmbeddedCoordinator struct {
nodeid string
ctx metafora.CoordinatorContext
inchan chan string
inchan chan metafora.Task
cmdchan chan *NodeCommand
nodechan chan<- []string
stopchan chan struct{}
Expand All @@ -37,7 +37,7 @@ func (e *EmbeddedCoordinator) Init(c metafora.CoordinatorContext) error {
return nil
}

func (e *EmbeddedCoordinator) Watch(out chan<- string) error {
func (e *EmbeddedCoordinator) Watch(out chan<- metafora.Task) error {
for {
// wait for incoming tasks
select {
Expand All @@ -56,23 +56,23 @@ func (e *EmbeddedCoordinator) Watch(out chan<- string) error {
}
}

func (e *EmbeddedCoordinator) Claim(taskID string) bool {
func (e *EmbeddedCoordinator) Claim(task metafora.Task) bool {
// We recieved on a channel, we are the only ones to pull that value
return true
}

func (e *EmbeddedCoordinator) Release(taskID string) {
func (e *EmbeddedCoordinator) Release(task metafora.Task) {
// Releasing should be async to avoid deadlocks (and better reflect the
// behavior of "real" coordinators)
go func() {
select {
case e.inchan <- taskID:
case e.inchan <- task:
case <-e.stopchan:
}
}()
}

func (e *EmbeddedCoordinator) Done(taskID string) {}
func (e *EmbeddedCoordinator) Done(task metafora.Task) {}

func (e *EmbeddedCoordinator) Command() (metafora.Command, error) {
select {
Expand Down
12 changes: 6 additions & 6 deletions embedded/embedded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ func TestEmbedded(t *testing.T) {
tc := newTestCounter()
adds := make(chan string, 4)

thfunc := metafora.SimpleHandler(func(id string, _ <-chan bool) bool {
tc.Add(id)
adds <- id
thfunc := metafora.SimpleHandler(func(task metafora.Task, _ <-chan bool) bool {
tc.Add(task.ID())
adds <- task.ID()
return true
})

Expand All @@ -25,7 +25,7 @@ func TestEmbedded(t *testing.T) {
go runner.Run()

for _, taskid := range []string{"one", "two", "three", "four"} {
err := client.SubmitTask(taskid)
err := client.SubmitTask(&Task{TID: taskid})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestEmbeddedShutdown(t *testing.T) {
const n = 4
runs := make(chan int, n)
stops := make(chan int, n)
thfunc := metafora.SimpleHandler(func(id string, s <-chan bool) bool {
thfunc := metafora.SimpleHandler(func(_ metafora.Task, s <-chan bool) bool {
runs <- 1
select {
case <-s:
Expand All @@ -75,7 +75,7 @@ func TestEmbeddedShutdown(t *testing.T) {

// submit tasks
for _, taskid := range tasks {
err := client.SubmitTask(taskid)
err := client.SubmitTask(&Task{TID: taskid})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions embedded/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package embedded
import (
"sync"

"github.com/lytics/metafora"
"github.com/lytics/metafora/statemachine"
)

Expand Down Expand Up @@ -32,21 +33,21 @@ func NewStateStore() statemachine.StateStore {
}
}

func (s *StateStore) Load(tid string) (*statemachine.State, error) {
func (s *StateStore) Load(task metafora.Task) (*statemachine.State, error) {
s.mu.RLock()
defer s.mu.RUnlock()
state, ok := s.store[tid]
state, ok := s.store[task.ID()]
if !ok {
return &statemachine.State{Code: statemachine.Runnable}, nil
}
return state, nil
}

func (s *StateStore) Store(tid string, state *statemachine.State) error {
func (s *StateStore) Store(task metafora.Task, state *statemachine.State) error {
s.mu.Lock()
s.store[tid] = state
s.store[task.ID()] = state
s.mu.Unlock()
stored := StateChanged{TaskID: tid, State: state}
stored := StateChanged{TaskID: task.ID(), State: state}
select {
case s.Stored <- stored:
default:
Expand Down
8 changes: 8 additions & 0 deletions embedded/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package embedded

// Task is the embedded coorindator's metafora.Task implemenation.
type Task struct {
TID string
}

func (t *Task) ID() string { return t.TID }
Loading