Skip to content

Commit

Permalink
feat(GODT-2500): Add panic handlers everywhere. (#332)
Browse files Browse the repository at this point in the history
* feat(GODT-2500): Add panic handlers everywhere.

* refactor(GODT-2500): Reorganise async methods.
  • Loading branch information
cuthix authored Apr 3, 2023
1 parent 19b8f7b commit d643f02
Show file tree
Hide file tree
Showing 28 changed files with 457 additions and 78 deletions.
2 changes: 1 addition & 1 deletion queue/bool.go → async/bool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package async

import "sync/atomic"

Expand Down
65 changes: 65 additions & 0 deletions async/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package async

import (
"context"
"sync"
)

// Abortable collects groups of functions that can be aborted by calling Abort.
type Abortable struct {
abortFunc []context.CancelFunc
abortLock sync.RWMutex
}

func (a *Abortable) Do(ctx context.Context, fn func(context.Context)) {
fn(a.newCancelCtx(ctx))
}

func (a *Abortable) Abort() {
a.abortLock.RLock()
defer a.abortLock.RUnlock()

for _, fn := range a.abortFunc {
fn()
}
}

func (a *Abortable) newCancelCtx(ctx context.Context) context.Context {
a.abortLock.Lock()
defer a.abortLock.Unlock()

ctx, cancel := context.WithCancel(ctx)

a.abortFunc = append(a.abortFunc, cancel)

return ctx
}

// RangeContext iterates over the given channel until the context is canceled or the
// channel is closed.
func RangeContext[T any](ctx context.Context, ch <-chan T, fn func(T)) {
for {
select {
case v, ok := <-ch:
if !ok {
return
}

fn(v)

case <-ctx.Done():
return
}
}
}

// ForwardContext forwards all values from the src channel to the dst channel until the
// context is canceled or the src channel is closed.
func ForwardContext[T any](ctx context.Context, dst chan<- T, src <-chan T) {
RangeContext(ctx, src, func(v T) {
select {
case dst <- v:
case <-ctx.Done():
}
})
}
214 changes: 214 additions & 0 deletions async/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package async

import (
"context"
"math/rand"
"sync"
"time"
)

// Group is forked and improved version of "github.com/bradenaw/juniper/xsync.Group".
//
// It manages a group of goroutines. The main change to original is posibility
// to wait passed function to finish without canceling it's context and adding
// PanicHandler.
type Group struct {
baseCtx context.Context
ctx context.Context
jobCtx context.Context
cancel context.CancelFunc
finish context.CancelFunc
wg sync.WaitGroup

panicHandler PanicHandler
}

// NewGroup returns a Group ready for use. The context passed to any of the f functions will be a
// descendant of ctx.
func NewGroup(ctx context.Context, panicHandler PanicHandler) *Group {
bgCtx, cancel := context.WithCancel(ctx)
jobCtx, finish := context.WithCancel(ctx)

return &Group{
baseCtx: ctx,
ctx: bgCtx,
jobCtx: jobCtx,
cancel: cancel,
finish: finish,
panicHandler: panicHandler,
}
}

// Once calls f once from another goroutine.
func (g *Group) Once(f func(ctx context.Context)) {
g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

f(g.ctx)
g.wg.Done()
}()
}

// jitterDuration returns a random duration in [d - jitter, d + jitter].
func jitterDuration(d time.Duration, jitter time.Duration) time.Duration {
return d + time.Duration(float64(jitter)*((rand.Float64()*2)-1)) //nolint:gosec
}

// Periodic spawns a goroutine that calls f once per interval +/- jitter.
func (g *Group) Periodic(
interval time.Duration,
jitter time.Duration,
f func(ctx context.Context),
) {
g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

defer g.wg.Done()

t := time.NewTimer(jitterDuration(interval, jitter))
defer t.Stop()

for {
if g.ctx.Err() != nil {
return
}

select {
case <-g.jobCtx.Done():
return
case <-t.C:
}

t.Reset(jitterDuration(interval, jitter))
f(g.ctx)
}
}()
}

// Trigger spawns a goroutine which calls f whenever the returned function is called. If f is
// already running when triggered, f will run again immediately when it finishes.
func (g *Group) Trigger(f func(ctx context.Context)) func() {
c := make(chan struct{}, 1)

g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

defer g.wg.Done()

for {
if g.ctx.Err() != nil {
return
}
select {
case <-g.jobCtx.Done():
return
case <-c:
}
f(g.ctx)
}
}()

return func() {
select {
case c <- struct{}{}:
default:
}
}
}

// PeriodicOrTrigger spawns a goroutine which calls f whenever the returned function is called. If
// f is already running when triggered, f will run again immediately when it finishes. Also calls f
// when it has been interval+/-jitter since the last trigger.
func (g *Group) PeriodicOrTrigger(
interval time.Duration,
jitter time.Duration,
f func(ctx context.Context),
) func() {
c := make(chan struct{}, 1)

g.wg.Add(1)

go func() {
defer HandlePanic(g.panicHandler)

defer g.wg.Done()

t := time.NewTimer(jitterDuration(interval, jitter))
defer t.Stop()

for {
if g.ctx.Err() != nil {
return
}
select {
case <-g.jobCtx.Done():
return
case <-t.C:
t.Reset(jitterDuration(interval, jitter))
case <-c:
if !t.Stop() {
<-t.C
}

t.Reset(jitterDuration(interval, jitter))
}
f(g.ctx)
}
}()

return func() {
select {
case c <- struct{}{}:
default:
}
}
}

func (g *Group) resetCtx() {
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
g.ctx, g.cancel = context.WithCancel(g.baseCtx)
}

// Cancel is send to all of the spawn goroutines and ends periodic
// or trigger routines.
func (g *Group) Cancel() {
g.cancel()
g.finish()
g.resetCtx()
}

// Finish will ends all periodic or polls routines. It will let
// currently running functions to finish (cancel is not sent).
//
// It is not safe to call Wait concurrently with any other method on g.
func (g *Group) Finish() {
g.finish()
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
}

// CancelAndWait cancels the context passed to any of the spawned goroutines and waits for all spawned
// goroutines to exit.
//
// It is not safe to call Wait concurrently with any other method on g.
func (g *Group) CancelAndWait() {
g.finish()
g.cancel()
g.wg.Wait()
g.resetCtx()
}

// WaitToFinish will ends all periodic or polls routines. It will wait for
// currently running functions to finish (cancel is not sent).
//
// It is not safe to call Wait concurrently with any other method on g.
func (g *Group) WaitToFinish() {
g.finish()
g.wg.Wait()
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
}
15 changes: 15 additions & 0 deletions async/panic_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package async

type PanicHandler interface {
HandlePanic()
}

type NoopPanicHandler struct{}

func (n NoopPanicHandler) HandlePanic() {}

func HandlePanic(panicHandler PanicHandler) {
if panicHandler != nil {
panicHandler.HandlePanic()
}
}
6 changes: 3 additions & 3 deletions queue/queued_channel.go → async/queued_channel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package async

import (
"context"
Expand All @@ -18,7 +18,7 @@ type QueuedChannel[T any] struct {
closed atomicBool // Should use atomic.Bool once we use Go 1.19!
}

func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T] {
func NewQueuedChannel[T any](chanBufferSize, queueCapacity int, panicHandler PanicHandler) *QueuedChannel[T] {
queue := &QueuedChannel[T]{
ch: make(chan T, chanBufferSize),
stopCh: make(chan struct{}),
Expand All @@ -30,7 +30,7 @@ func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T
queue.closed.store(false)

// Start the queue consumer.
logging.GoAnnotated(context.Background(), func(ctx context.Context) {
logging.GoAnnotated(context.Background(), panicHandler, func(ctx context.Context) {
defer close(queue.ch)

for {
Expand Down
6 changes: 3 additions & 3 deletions queue/queued_channel_test.go → async/queued_channel_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queue
package async

import (
"testing"
Expand All @@ -11,7 +11,7 @@ func TestQueuedChannel(t *testing.T) {
defer goleak.VerifyNone(t)

// Create a new queued channel.
queue := NewQueuedChannel[int](3, 3)
queue := NewQueuedChannel[int](3, 3, NoopPanicHandler{})

// Push some items to the queue.
require.True(t, queue.Enqueue(1, 2, 3))
Expand Down Expand Up @@ -43,7 +43,7 @@ func TestQueuedChannelDoesNotLeakIfThereAreNoReadersOnCloseAndDiscard(t *testing
defer goleak.VerifyNone(t)

// Create a new queued channel.
queue := NewQueuedChannel[int](1, 3)
queue := NewQueuedChannel[int](1, 3, NoopPanicHandler{})

// Push some items to the queue.
require.True(t, queue.Enqueue(1, 2, 3))
Expand Down
27 changes: 27 additions & 0 deletions async/wait_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package async

import "sync"

type WaitGroup struct {
wg sync.WaitGroup
panicHandler PanicHandler
}

func MakeWaitGroup(panicHandler PanicHandler) WaitGroup {
return WaitGroup{panicHandler: panicHandler}
}

func (wg *WaitGroup) Go(f func()) {
wg.wg.Add(1)

go func() {
defer HandlePanic(wg.panicHandler)

defer wg.wg.Done()
f()
}()
}

func (wg *WaitGroup) Wait() {
wg.wg.Wait()
}
Loading

0 comments on commit d643f02

Please sign in to comment.