Skip to content

Commit

Permalink
Merge pull request #332 from porters-xyz/develop
Browse files Browse the repository at this point in the history
Fixed issue with usage metrics not being reported or logged
  • Loading branch information
scermat authored Jul 27, 2024
2 parents 8659821 + bc7e987 commit e0804d8
Show file tree
Hide file tree
Showing 14 changed files with 555 additions and 423 deletions.
39 changes: 23 additions & 16 deletions gateway/common/context.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
package common

import (
"context"
"time"
"context"
log "log/slog"
"time"
)

const (
INSTRUMENT string = "INSTRUMENT_START"
INSTRUMENT string = "INSTRUMENT_START"
)

type Contextable interface {
ContextKey() string
ContextKey() string
}

type Instrument struct {
Timestamp time.Time
Timestamp time.Time
}

func UpdateContext(ctx context.Context, entity Contextable) context.Context {
return context.WithValue(ctx, entity.ContextKey(), entity)
log.Debug("Updating context", "key", entity.ContextKey(), "entity", entity)
return context.WithValue(ctx, entity.ContextKey(), entity)
}

func FromContext(ctx context.Context, contextkey string) (any, bool) {
value := ctx.Value(contextkey)
if value != nil {
return value, true
} else {
return nil, false
}
value := ctx.Value(contextkey)
if value != nil {
return value, true
} else {
return nil, false
}
}

// Leaving here for debugging purposes
func LogContext(ctx context.Context, contextkey string) {
log.Debug("Context Value for", "key", contextkey, "val", ctx.Value(contextkey))
}

func StartInstrument() *Instrument {
return &Instrument{
Timestamp: time.Now(),
}
return &Instrument{
Timestamp: time.Now(),
}
}

func (i *Instrument) ContextKey() string {
return INSTRUMENT
return INSTRUMENT
}
230 changes: 121 additions & 109 deletions gateway/common/tasks.go
Original file line number Diff line number Diff line change
@@ -1,180 +1,192 @@
package common

import (
"errors"
log "log/slog"
"sync"
"time"
"errors"
log "log/slog"
"sync"
"time"
)

type Runnable interface {
error
Run()
error
Run()
}

type Delayable interface {
Runnable
Ready() bool
Runnable
Ready() bool
}

// contains bits necessary to run later
type SimpleTask struct {
run func()
runtime time.Time
run func()
runtime time.Time
}

type RetryTask struct {
SimpleTask
runWithSuccess func() bool
retryCount int
retryEvery time.Duration
retryGen int
SimpleTask
runWithSuccess func() bool
retryCount int
retryEvery time.Duration
retryGen int
}

type TaskQueue struct {
closed bool
tasks chan Runnable
delayed chan Delayable
errors chan error
closed bool
tasks chan Runnable
delayed chan Delayable
errors chan error
}

var qInst *TaskQueue
var qmutex sync.Once

// Another singleton
func GetTaskQueue() *TaskQueue {
qmutex.Do(func() {
bufferSize := GetConfigInt(JOB_BUFFER_SIZE)
qInst = &TaskQueue{
closed: false,
tasks: make(chan Runnable, bufferSize),
delayed: make(chan Delayable, bufferSize),
errors: make(chan error, bufferSize),
}
})
return qInst
qmutex.Do(func() {
bufferSize := GetConfigInt(JOB_BUFFER_SIZE)
qInst = &TaskQueue{
closed: false,
tasks: make(chan Runnable, bufferSize),
delayed: make(chan Delayable, bufferSize),
errors: make(chan error, bufferSize),
}
})
return qInst
}

func (q *TaskQueue) SetupWorkers() {
numWorkers := GetConfigInt(NUM_WORKERS)
for i := 0; i < numWorkers; i++ {
go worker(q)
}
go delayWorker(q)
go errWorker(q)
// When debugging, may be worth setting to 1 worker, to avoid multiple threads cross emitting logs
numWorkers := GetConfigInt(NUM_WORKERS)

for i := 0; i < numWorkers; i++ {
go worker(q)
}
go delayWorker(q)
go errWorker(q)
}

// use this for graceful shutdown
func (q *TaskQueue) CloseQueue() {
close(q.tasks)
close(q.delayed)
q.closed = true

shutdownTime := time.Duration(GetConfigInt(SHUTDOWN_DELAY)) * time.Second
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if len(q.tasks) == 0 {
return
}
case <-time.After(shutdownTime):
log.Warn("workers not finished, work may be lost")
return
}
}
close(q.tasks)
close(q.delayed)
q.closed = true

shutdownTime := time.Duration(GetConfigInt(SHUTDOWN_DELAY)) * time.Second
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if len(q.tasks) == 0 {
return
}
case <-time.After(shutdownTime):
log.Warn("workers not finished, work may be lost")
return
}
}
}

func (q *TaskQueue) Add(runnable Runnable) {
q.tasks <- runnable
JobGauge.WithLabelValues("task").Inc()
q.tasks <- runnable
JobGauge.WithLabelValues("task").Inc()
}

func (q *TaskQueue) Delay(delayable Delayable) {
q.delayed <- delayable
JobGauge.WithLabelValues("delayed").Inc()
q.delayed <- delayable
JobGauge.WithLabelValues("delayed").Inc()
}

func (q *TaskQueue) ReportError(err error) {
q.errors <- err
JobGauge.WithLabelValues("error").Inc()
q.errors <- err
JobGauge.WithLabelValues("error").Inc()
}

func worker(q *TaskQueue) {
for task := range q.tasks {
switch t := task.(type) {
case Combinable:
task.(Combinable).Combine(q.tasks)
case Runnable:
task.Run()
default:
log.Debug("unspecified task", "task", task, "type", t)
}
JobGauge.WithLabelValues("task").Set(float64(len(q.tasks)))
}
for task := range q.tasks {
processTask(task, q)
JobGauge.WithLabelValues("task").Set(float64(len(q.tasks)))
}
}

func processTask(task Runnable, q *TaskQueue) {
defer func() {
if r := recover(); r != nil {
log.Error("Recovered in worker", "error", r)
}
}()

switch t := task.(type) {
case Combinable:
task.(Combinable).Combine(q.tasks)
case Runnable:
task.Run()
default:
log.Warn("unspecified task", "task", task, "type", t)
}
}

func errWorker(q *TaskQueue) {
for err := range q.errors {
log.Error("error encountered", "err", err)
JobGauge.WithLabelValues("error").Dec()
}
for err := range q.errors {
log.Error("error encountered", "err", err)
JobGauge.WithLabelValues("error").Dec()
}
}

func delayWorker(q *TaskQueue) {
for i:=len(q.delayed); i>0; i-- {
task := <- q.delayed
if q.closed {
q.ReportError(errors.New("Shutting down"))
} else if task.Ready() {
q.Add(task)
JobGauge.WithLabelValues("delayed").Dec()
} else {
q.delayed <- task
}
}
time.Sleep(1 * time.Second)
for i := len(q.delayed); i > 0; i-- {
task := <-q.delayed
if q.closed {
q.ReportError(errors.New("Shutting down"))
} else if task.Ready() {
q.Add(task)
JobGauge.WithLabelValues("delayed").Dec()
} else {
q.delayed <- task
}
}
time.Sleep(1 * time.Second)
}

func NewSimpleTask(run func()) *SimpleTask {
return &SimpleTask{
run: run,
runtime: time.Now(),
}
return &SimpleTask{
run: run,
runtime: time.Now(),
}
}

// SimpleTask can be extended if needed
func (t *SimpleTask) Run() {
t.run()
t.run()
}

func (t *SimpleTask) Ready() bool {
return time.Now().After(t.runtime)
return time.Now().After(t.runtime)
}

func (t *SimpleTask) Error() string {
// Override to include more details
return "error processing async task"
// Override to include more details
return "error processing async task"
}

func NewRetryTask(run func() bool, count int, every time.Duration) *RetryTask {
return &RetryTask{
runWithSuccess: run,
retryCount: count,
retryEvery: every,
}
return &RetryTask{
runWithSuccess: run,
retryCount: count,
retryEvery: every,
}
}

func (r *RetryTask) Run() {
ok := r.runWithSuccess()
if !ok {
q := GetTaskQueue()
if r.retryGen < r.retryCount {
r.runtime = time.Now().Add(r.retryEvery)
r.retryGen++
q.delayed <- r
} else {
q.errors <- r
}
}
ok := r.runWithSuccess()
if !ok {
q := GetTaskQueue()
if r.retryGen < r.retryCount {
r.runtime = time.Now().Add(r.retryEvery)
r.retryGen++
q.delayed <- r
} else {
q.errors <- r
}
}
}
Loading

0 comments on commit e0804d8

Please sign in to comment.