Skip to content

Commit

Permalink
Merge pull request #1 from gol4ng/improve-callback
Browse files Browse the repository at this point in the history
⚠️ BC Break
RegisterCallback(stopCallbacks ...Callback) as move to RegisterCallbacks(stopCallbacks ...Callback)
new signature method RegisterCallback(stopCallback Callback) func()

Migration
if you use RegisterCallback(stopCallbacks ...Callback) just replace with RegisterCallbacks(stopCallbacks ...Callback)
  • Loading branch information
instabledesign authored Dec 3, 2020
2 parents 3bb13ea + 8de12af commit bf87932
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 41 deletions.
93 changes: 90 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
# Stop dispatcher

Stop dispatcher can be used to manage Golang application gracefull shutdown.
It allow you to register application shutdown callback (to close some database connection, clean temp folder ...)
You can event use it in sub process to manage when operation end up normally or when it failed.

## Installation

`go get -u github.com/gol4ng/stop-dispatcher`

### Examples

A runner application (like http server)

```go
package main

import (
"context"
"fmt"
"log"

stop_dispatcher "github.com/gol4ng/stop-dispatcher"
Expand All @@ -23,11 +32,21 @@ func main() {
// stop dispatcher with log reason handler
stopDispatcher := stop_dispatcher.NewDispatcher(
stop_dispatcher.WithReasonHandler(reason_handler.Log()),
stop_dispatcher.WithEmitter(
// Listen SIGINT, SIGTERM
stop_emitter.DefaultKillerSignalEmitter(),
),
)

// Register a killer signal emitter
stopDispatcher.RegisterEmitter(
stop_emitter.DefaultKillerSignalEmitter(),
// Start the job
func(stop func(reason stop_dispatcher.Reason)) {
for i := 0; i < 10; i++ {
fmt.Printf("%d time elapsed\n", i)
}
stop("process finished")
},
)

// Register all your stopping callback
Expand All @@ -36,8 +55,76 @@ func main() {
log.Println("Closing all database connection")
return nil
},
)

// Wait will block until stopping reason was received
if err := stopDispatcher.Wait(ctx); err != nil {
log.Printf("error occured during stopping application : %s", err)
}
log.Println("Application stopped")
}
```

A job application (will perfom a task then end)

```go
package main

import (
"context"
"fmt"
"log"
"net"
"net/http"
"strconv"
"sync/atomic"

stop_dispatcher "github.com/gol4ng/stop-dispatcher"
"github.com/gol4ng/stop-dispatcher/reason_handler"
"github.com/gol4ng/stop-dispatcher/stop_emitter"
)

func main() {
// root context
ctx := context.Background()

// stop dispatcher with log reason handler
stopDispatcher := stop_dispatcher.NewDispatcher(
stop_dispatcher.WithReasonHandler(reason_handler.Log()),
stop_dispatcher.WithEmitter(
// Listen SIGINT, SIGTERM
stop_emitter.DefaultKillerSignalEmitter(),
),
)

i := int64(0)
httpServer := &http.Server{
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.Write([]byte(strconv.Itoa(int(atomic.LoadInt64(&i)))))
atomic.AddInt64(&i, 1)
}),
BaseContext: func(_ net.Listener) context.Context {
return ctx
},
}

// Register a killer signal emitter
stopDispatcher.RegisterEmitter(
// Start the httpServer and listen when it stop
func(stop func(reason stop_dispatcher.Reason)) {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
stop(fmt.Errorf("http server[%s] : %w", httpServer.Addr, err))
}
},
)

// Register all your stopping callback
stopDispatcher.RegisterCallbacks(
func(ctx context.Context) error {
log.Println("Closing all server connection")
return httpServer.Shutdown(ctx)
},
func(ctx context.Context) error {
log.Println("Closing all database connection")
return nil
},
)
Expand All @@ -48,4 +135,4 @@ func main() {
}
log.Println("Application stopped")
}
```
```
45 changes: 41 additions & 4 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package stop_dispatcher

import (
"context"
"sync"

local_error "github.com/gol4ng/stop-dispatcher/error"
)

// use to replace the callback when they unregistered
var nopFunc = func(ctx context.Context) error { return nil }

// Reason is the stopping given value
type Reason interface{}

Expand All @@ -20,8 +24,11 @@ type Callback func(ctx context.Context) error

// Dispatcher implementation provide Reason dispatcher
type Dispatcher struct {
stopChan chan Reason
stopChan chan Reason

mu sync.RWMutex
stopCallbacks []Callback

reasonHandler ReasonHandler
}

Expand All @@ -37,8 +44,27 @@ func (t *Dispatcher) RegisterEmitter(stopEmitters ...Emitter) {
}
}

// RegisterCallback is used to register all the wanted stopping callback
func (t *Dispatcher) RegisterCallback(stopCallbacks ...Callback) {
// RegisterCallback is used to register stopping callback
// It return a func to unregister the callback
func (t *Dispatcher) RegisterCallback(stopCallback Callback) func() {
i := len(t.stopCallbacks)
t.mu.Lock()
defer t.mu.Unlock()
t.stopCallbacks = append(t.stopCallbacks, stopCallback)

return func() {
t.mu.Lock()
defer t.mu.Unlock()
t.stopCallbacks[i] = nopFunc
}
}

// RegisterCallbacks is used to register all the wanted stopping callback
// With this method you cannot unregister a callback
// If you want to unregister callback you should use RegisterCallback
func (t *Dispatcher) RegisterCallbacks(stopCallbacks ...Callback) {
t.mu.Lock()
defer t.mu.Unlock()
t.stopCallbacks = append(t.stopCallbacks, stopCallbacks...)
}

Expand All @@ -49,7 +75,10 @@ func (t *Dispatcher) Wait(ctx context.Context) error {
shutdownCtx, cancel := context.WithCancel(ctx)
defer cancel()
errs := local_error.List{}
for _, fn := range t.stopCallbacks {
t.mu.RLock()
stopCallbacks := t.stopCallbacks
defer t.mu.RUnlock()
for _, fn := range stopCallbacks {
if err := fn(shutdownCtx); err != nil {
errs.Add(err)
}
Expand All @@ -61,6 +90,7 @@ func (t *Dispatcher) Wait(ctx context.Context) error {
func NewDispatcher(options ...DispatcherOption) *Dispatcher {
dispatcher := &Dispatcher{
stopChan: make(chan Reason),
mu: sync.RWMutex{},
stopCallbacks: []Callback{},
reasonHandler: func(Reason) {},
}
Expand All @@ -81,3 +111,10 @@ func WithReasonHandler(reasonHandler ReasonHandler) DispatcherOption {
dispatcher.reasonHandler = reasonHandler
}
}

// WithEmitter is a helpers to register during the construction
func WithEmitter(emitters ...Emitter) DispatcherOption {
return func(dispatcher *Dispatcher) {
dispatcher.RegisterEmitter(emitters...)
}
}
58 changes: 56 additions & 2 deletions dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stop_dispatcher_test
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand All @@ -29,7 +30,7 @@ func Test_Dispatcher_Stop(t *testing.T) {
func Test_Dispatcher_Error(t *testing.T) {
callbackCalled := false
d := stop_dispatcher.NewDispatcher()
d.RegisterCallback(
d.RegisterCallbacks(
func(ctx context.Context) error {
callbackCalled = true
return errors.New("fake_error")
Expand Down Expand Up @@ -64,6 +65,59 @@ func Test_Dispatcher_WithReasonHandler(t *testing.T) {
assert.True(t, reasonHandlerCalled)
}

func Test_Dispatcher_WithEmitter(t *testing.T) {
d := stop_dispatcher.NewDispatcher(
stop_dispatcher.WithEmitter(func(stopFn func(stop_dispatcher.Reason)) {
time.AfterFunc(10*time.Millisecond, func() {
stopFn("fake_reason")
})
}),
)

err := d.Wait(context.TODO())
assert.NoError(t, err)
}

func Test_Dispatcher_UnregisterCallback(t *testing.T) {
safeInnerStopFn := sync.Mutex{}
var innerStopFn func(stop_dispatcher.Reason)
d := stop_dispatcher.NewDispatcher(
stop_dispatcher.WithEmitter(func(stopFn func(stop_dispatcher.Reason)) {
safeInnerStopFn.Lock()
innerStopFn = stopFn
safeInnerStopFn.Unlock()
}),
)
callbackCalled := false
unregisterCallbackFunc := d.RegisterCallback(func(ctx context.Context) error {
callbackCalled = true
return nil
})
go func(){
time.AfterFunc(10*time.Millisecond, func() {
safeInnerStopFn.Lock()
innerStopFn("fake_reason")
safeInnerStopFn.Unlock()
})
}()
err := d.Wait(context.TODO())
assert.NoError(t, err)
assert.True(t, callbackCalled)

callbackCalled = false
unregisterCallbackFunc()
go func(){
time.AfterFunc(10*time.Millisecond, func() {
safeInnerStopFn.Lock()
innerStopFn("fake_reason")
safeInnerStopFn.Unlock()
})
}()
err1 := d.Wait(context.TODO())
assert.NoError(t, err1)
assert.False(t, callbackCalled)
}

func Test_Dispatcher(t *testing.T) {
callbackCalled := false
d := stop_dispatcher.NewDispatcher()
Expand All @@ -80,4 +134,4 @@ func Test_Dispatcher(t *testing.T) {
err := d.Wait(context.TODO())
assert.NoError(t, err)
assert.True(t, callbackCalled)
}
}
31 changes: 30 additions & 1 deletion stop_callback/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"log"
"sync"
"testing"
"time"

Expand All @@ -12,7 +13,7 @@ import (

func TestTimedout(t *testing.T) {
// configure log to capture data
buf := bytes.NewBuffer([]byte{})
buf := SafeWrapBuffer(bytes.NewBuffer([]byte{}))
log.SetFlags(0)
log.SetOutput(buf)

Expand All @@ -26,3 +27,31 @@ func TestTimedout(t *testing.T) {
time.Sleep(200 * time.Millisecond)
assert.Equal(t, "Shutdown timeout exceeded 100ms\n", buf.String())
}

type Buffer struct {
b *bytes.Buffer
m sync.Mutex
}

func (b *Buffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}
func (b *Buffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
func (b *Buffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}

func SafeWrapBuffer(buffer *bytes.Buffer) *Buffer {
return &Buffer{
b: buffer,
m: sync.Mutex{},
}
}
Loading

0 comments on commit bf87932

Please sign in to comment.