Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work queue #198

Merged
merged 7 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions gateway/common/combiner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package common

import (
"log"
)

// rather than be too chatty, combine multiple tasks into one
// idea here is to eat through the channel combining multiple tasks
// into a single one which can then be persisted
type Combinable interface {
Runnable
Combine(q chan Runnable) // put items back on channel if don't match
key() string // combine with others of same key
value() int // amount to combine
gen() int // allow combiners to mate
}

type SimpleCombiner struct {
SimpleTask
keyVal string
intVal int
genVal int
}

// Implement Combinable if more complex functions are needed
func (c *SimpleCombiner) Combine(q chan Runnable) {
for i := len(q); i > 0; i-- {
task := <- q
switch task.(type) {
case Combinable:
d := task.(Combinable)
if c.keyVal == d.key() {
c.intVal += d.value()
c.genVal = max(c.gen(), d.gen())
} else {
q <- task
}
default:
q <- task
}
}
// after combining, remove the combinable bits
if c.gen() >= 10 {
c.run = c.runner()
q <- &c.SimpleTask
} else {
c.genVal++
q <- c
}
}

func (c *SimpleCombiner) key() string { return c.keyVal }
func (c *SimpleCombiner) value() int { return c.intVal }
func (c *SimpleCombiner) gen() int { return c.genVal }
// This gets set to the SimpleTask.run
// Simple version only logs, need to extend to push to redis, etc
func (c *SimpleCombiner) runner() func() {
return func() {
log.Printf("combined %s to %d", c.keyVal, c.intVal)
}
}
83 changes: 83 additions & 0 deletions gateway/common/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package common

import (
"log"
"os"
"strconv"
"sync"
)

// TODO make it "pluggable" so different types of tasks can operate off queue

type Runnable interface {
Run()
}

// contains bits necessary to run later
type SimpleTask struct {
run func()
}

type TaskQueue struct {
Tasks chan Runnable
errors chan error
}

var q *TaskQueue
var qmutex sync.Once

// Another singleton
func GetTaskQueue() *TaskQueue {
qmutex.Do(func() {
bufferSize := 50 // default
bufferSizeEnv, ok := os.LookupEnv("JOB_BUFFER_SIZE")
bufferSizeInt, err := strconv.Atoi(bufferSizeEnv)
if ok && err == nil {
bufferSize = bufferSizeInt
} else {
log.Println("unable to read JOB_BUFFER_SIZE from env, using default")
}
q = &TaskQueue{
Tasks: make(chan Runnable, bufferSize),
errors: make(chan error, bufferSize),
}
})
return q
}

func (q *TaskQueue) SetupWorkers() {
numWorkers := 10 // default
numWorkersEnv, ok := os.LookupEnv("NUM_WORKERS")
numWorkersInt, err := strconv.Atoi(numWorkersEnv)
if ok && err == nil {
numWorkers = numWorkersInt
} else {
log.Println("unable to read NUM_WORKERS from env, using default")
}
for i := 0; i < numWorkers; i++ {
go worker(q)
}
}

// use this for graceful shutdown
func (q *TaskQueue) CloseQueue() {
close(q.Tasks)
}

func worker(q *TaskQueue) {
for task := range q.Tasks {
switch t := task.(type) {
case Combinable:
task.(Combinable).Combine(q.Tasks)
case Runnable:
task.Run()
default:
log.Println("unspecified task", task, t)
}
}
}

// SimpleTask can be extended if needed
func (t *SimpleTask) Run() {
t.run()
}
59 changes: 59 additions & 0 deletions gateway/common/tasks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package common

import (
"fmt"
"testing"
"time"
)

func TestTask(t *testing.T) {
queue := GetTaskQueue()
queue.SetupWorkers()

for i:=0; i<1000; i++ {
j := i
queue.Tasks <- &SimpleTask{
run: func() {
time.Sleep(10 * time.Millisecond)
t.Logf("done: %d", j)
},
}
}

time.Sleep(500 * time.Millisecond)
if len(queue.Tasks) > 0 {
t.Fatal("queue should have finished")
}
}

// TODO use this test to figure out the right blend
// combining is meant to compact multiple updates to redis into one
// rather than incr + incr + incr just do incrBy 3
// This costs a bit of compute and complicates the job queue
// come back to this when the use is more necessary
func TestCombiner(t *testing.T) {
t.Setenv("JOB_BUFFER_SIZE", "1000")
queue := GetTaskQueue()

for i:=0; i<1000; i++ {
key := fmt.Sprintf("key:%d", i % 2)
queue.Tasks <- &SimpleCombiner{
keyVal: key,
intVal: 1,
}
}

queue.SetupWorkers()

time.Sleep(1000 * time.Millisecond)
if len(queue.Tasks) > 0 {
t.Fatal("combining should have finished")
}
}

func TestClose(t *testing.T) {
queue := GetTaskQueue()
queue.SetupWorkers()

queue.CloseQueue()
}
Loading