From 13c20c172fafe251ef9de9430f4a1320638534a3 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Sun, 13 Dec 2020 12:22:22 -0600 Subject: [PATCH 1/9] Make processes finalizable. --- process_meta.go | 9 ++++++++- process_options.go | 5 +++++ runner.go | 23 +++++++++++++++++------ runner_test.go | 22 ++++++++++++++++++++-- 4 files changed, 50 insertions(+), 9 deletions(-) diff --git a/process_meta.go b/process_meta.go index 992ca76..00731fe 100644 --- a/process_meta.go +++ b/process_meta.go @@ -20,6 +20,7 @@ type ProcessMeta struct { initTimeout time.Duration startTimeout time.Duration shutdownTimeout time.Duration + finalizeTimeout time.Duration } func newProcessMeta(process Process) *ProcessMeta { @@ -39,7 +40,7 @@ func (m *ProcessMeta) Name() string { return m.name } -// Logields returns logging fields registered to this process. +// LogFields returns logging fields registered to this process. func (m *ProcessMeta) LogFields() log.LogFields { return m.logFields } @@ -62,6 +63,12 @@ func (m *ProcessMeta) Stop() (err error) { return } +// FinalizeTimeout returns the maximum timeout allowed for a call to +// the Finalize function. A zero value indicates no timeout. +func (m *ProcessMeta) FinalizeTimeout() time.Duration { + return m.finalizeTimeout +} + // Wrapped returns the underlying process. func (m *ProcessMeta) Wrapped() interface{} { return m.Process diff --git a/process_options.go b/process_options.go index 4337ee5..ea2eb92 100644 --- a/process_options.go +++ b/process_options.go @@ -50,3 +50,8 @@ func WithProcessStartTimeout(timeout time.Duration) ProcessConfigFunc { func WithProcessShutdownTimeout(timeout time.Duration) ProcessConfigFunc { return func(meta *ProcessMeta) { meta.shutdownTimeout = timeout } } + +// WithProcessFinalizerTimeout sets the time limit for the finalizer. +func WithProcessFinalizerTimeout(timeout time.Duration) ProcessConfigFunc { + return func(meta *ProcessMeta) { meta.finalizeTimeout = timeout } +} diff --git a/runner.go b/runner.go index 1833902..5b27852 100644 --- a/runner.go +++ b/runner.go @@ -169,13 +169,23 @@ func (r *runner) runInitializers(config config.Config) bool { return true } -func (r *runner) runFinalizers() bool { - return r.unwindInitializers(r.processes.NumInitializers()) +func (r *runner) runFinalizers(beforeIndex int) bool { + r.logger.Info("Running finalizers") + + success := true + for i := beforeIndex - 1; i >= 0; i-- { + for _, process := range r.processes.GetProcessesAtPriorityIndex(i) { + if err := r.finalizeWithTimeout(process); err != nil { + r.errChan <- errMeta{err: err, source: process} + success = false + } + } + } + + return r.unwindInitializers(r.processes.NumInitializers()) && success } func (r *runner) unwindInitializers(beforeIndex int) bool { - r.logger.Info("Running finalizers") - success := true initializers := r.processes.GetInitializers() @@ -209,7 +219,8 @@ func (r *runner) runProcesses(config config.Config) bool { // stop booting up processes adn simply wait for them to spin down. success := true - for index := 0; index < r.processes.NumPriorities(); index++ { + index := 0 + for ; index < r.processes.NumPriorities(); index++ { if !r.initProcessesAtPriorityIndex(config, index) { success = false break @@ -229,7 +240,7 @@ func (r *runner) runProcesses(config config.Config) bool { go func() { r.wg.Wait() - _ = r.runFinalizers() + _ = r.runFinalizers(index) close(r.errChan) }() diff --git a/runner_test.go b/runner_test.go index cb80855..48cf7b5 100644 --- a/runner_test.go +++ b/runner_test.go @@ -26,7 +26,7 @@ func TestRunnerRunOrder(t *testing.T) { p1 := newTaggedProcess(init, start, stop, "d") p2 := newTaggedProcess(init, start, stop, "e") p3 := newTaggedProcess(init, start, stop, "f") - p4 := newTaggedProcess(init, start, stop, "g") + p4 := newTaggedProcessFinalizer(*newTaggedProcess(init, start, stop, "g"), finalize) p5 := newTaggedProcess(init, start, stop, "h") // Register things @@ -78,7 +78,7 @@ func TestRunnerRunOrder(t *testing.T) { eventually(t, stringChanReceivesUnordered(stop, "d", "e", "f", "g", "h")) // Finalizers - eventually(t, stringChanReceivesUnordered(finalize, "a", "c")) + eventually(t, stringChanReceivesUnordered(finalize, "g", "a", "c")) // Ensure unblocked eventually(t, errorChanClosed(shutdownChan)) @@ -872,6 +872,24 @@ func (p *taggedProcess) Stop() error { return p.stopErr } +type taggedProcessFinalizer struct { + taggedProcess + finalize chan<- string + finalizeErr error +} + +func newTaggedProcessFinalizer(taggedProcess taggedProcess, finalize chan<- string) *taggedProcessFinalizer { + return &taggedProcessFinalizer{ + taggedProcess: taggedProcess, + finalize: finalize, + } +} + +func (i *taggedProcessFinalizer) Finalize() error { + i.finalize <- i.name + return i.finalizeErr +} + type blockingProcess struct { sync chan struct{} wait chan struct{} From 7aebe8e4551542228cdd660673fbecade0f7a35a Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Mon, 14 Dec 2020 08:42:06 -0600 Subject: [PATCH 2/9] Add context parameters to interfaces. --- initializer.go | 16 +++++--- parallel_initializer.go | 35 +++++++++--------- parallel_initializer_test.go | 9 +++-- process.go | 6 ++- process_container_test.go | 34 +++++++++-------- process_meta.go | 5 ++- runner.go | 71 ++++++++++++++++++------------------ runner_test.go | 63 ++++++++++++++++---------------- watcher_test.go | 3 +- 9 files changed, 129 insertions(+), 113 deletions(-) diff --git a/initializer.go b/initializer.go index 42e7312..fce4959 100644 --- a/initializer.go +++ b/initializer.go @@ -1,6 +1,10 @@ package process -import "github.com/go-nacelle/config" +import ( + "context" + + "github.com/go-nacelle/config" +) // Initializer is an interface that is called once on app // startup. @@ -10,7 +14,7 @@ type Initializer interface { // files from disk, connecting to a remote service, // initializing shared data structures, and inserting // a service into a shared service container. - Init(config config.Config) error + Init(ctx context.Context, config config.Config) error } // Finalizer is an optional extension of an Initializer that @@ -21,13 +25,13 @@ type Initializer interface { type Finalizer interface { // Finalize is called after the application has stopped // all running processes. - Finalize() error + Finalize(ctx context.Context) error } // InitializerFunc is a non-struct version of an initializer. -type InitializerFunc func(config config.Config) error +type InitializerFunc func(ctx context.Context, config config.Config) error // Init calls the underlying function. -func (f InitializerFunc) Init(config config.Config) error { - return f(config) +func (f InitializerFunc) Init(ctx context.Context, config config.Config) error { + return f(ctx, config) } diff --git a/parallel_initializer.go b/parallel_initializer.go index f4d6a1b..46d92c3 100644 --- a/parallel_initializer.go +++ b/parallel_initializer.go @@ -1,6 +1,7 @@ package process import ( + "context" "fmt" "sync" "time" @@ -52,7 +53,7 @@ func (i *ParallelInitializer) RegisterInitializer( } // Init runs Init on all registered initializers concurrently. -func (pi *ParallelInitializer) Init(config config.Config) error { +func (pi *ParallelInitializer) Init(ctx context.Context, config config.Config) error { for _, initializer := range pi.initializers { if err := pi.inject(initializer); err != nil { return errMetaSet{ @@ -62,7 +63,7 @@ func (pi *ParallelInitializer) Init(config config.Config) error { } errMetas := errMetaSet{} - initErrors := pi.initializeAll(config) + initErrors := pi.initializeAll(ctx, config) for i, err := range initErrors { if err != nil { @@ -71,7 +72,7 @@ func (pi *ParallelInitializer) Init(config config.Config) error { } if len(errMetas) > 0 { - for i, err := range pi.finalizeAll(initErrors) { + for i, err := range pi.finalizeAll(ctx, initErrors) { if err != nil { errMetas = append(errMetas, errMeta{err: err, source: pi.initializers[i]}) } @@ -84,9 +85,9 @@ func (pi *ParallelInitializer) Init(config config.Config) error { } // Finalize runs Finalize on all registered initializers concurrently. -func (pi *ParallelInitializer) Finalize() error { +func (pi *ParallelInitializer) Finalize(ctx context.Context) error { errMetas := errMetaSet{} - for i, err := range pi.finalizeAll(make([]error, len(pi.initializers))) { + for i, err := range pi.finalizeAll(ctx, make([]error, len(pi.initializers))) { if err != nil { errMetas = append(errMetas, errMeta{err: err, source: pi.initializers[i]}) } @@ -113,7 +114,7 @@ func (pi *ParallelInitializer) inject(initializer namedInjectable) error { return nil } -func (pi *ParallelInitializer) initializeAll(config config.Config) []error { +func (pi *ParallelInitializer) initializeAll(ctx context.Context, config config.Config) []error { errors := make([]error, len(pi.initializers)) mutex := sync.Mutex{} wg := sync.WaitGroup{} @@ -124,7 +125,7 @@ func (pi *ParallelInitializer) initializeAll(config config.Config) []error { go func(i int) { defer wg.Done() - if err := pi.initWithTimeout(pi.initializers[i], config); err != nil { + if err := pi.initWithTimeout(ctx, pi.initializers[i], config); err != nil { mutex.Lock() errors[i] = err mutex.Unlock() @@ -136,7 +137,7 @@ func (pi *ParallelInitializer) initializeAll(config config.Config) []error { return errors } -func (pi *ParallelInitializer) finalizeAll(initErrors []error) []error { +func (pi *ParallelInitializer) finalizeAll(ctx context.Context, initErrors []error) []error { errors := make([]error, len(pi.initializers)) mutex := sync.Mutex{} wg := sync.WaitGroup{} @@ -151,7 +152,7 @@ func (pi *ParallelInitializer) finalizeAll(initErrors []error) []error { go func(i int) { defer wg.Done() - if err := pi.finalizeWithTimeout(pi.initializers[i]); err != nil { + if err := pi.finalizeWithTimeout(ctx, pi.initializers[i]); err != nil { mutex.Lock() errors[i] = err mutex.Unlock() @@ -163,9 +164,9 @@ func (pi *ParallelInitializer) finalizeAll(initErrors []error) []error { return errors } -func (pi *ParallelInitializer) initWithTimeout(initializer namedInitializer, config config.Config) error { +func (pi *ParallelInitializer) initWithTimeout(ctx context.Context, initializer namedInitializer, config config.Config) error { errChan := makeErrChan(func() error { - return pi.init(initializer, config) + return pi.init(ctx, initializer, config) }) select { @@ -177,10 +178,10 @@ func (pi *ParallelInitializer) initWithTimeout(initializer namedInitializer, con } } -func (pi *ParallelInitializer) init(initializer namedInitializer, config config.Config) error { +func (pi *ParallelInitializer) init(ctx context.Context, initializer namedInitializer, config config.Config) error { pi.Logger.WithFields(initializer.LogFields()).Info("Initializing %s", initializer.Name()) - if err := initializer.Init(config); err != nil { + if err := initializer.Init(ctx, config); err != nil { return fmt.Errorf( "failed to initialize %s (%s)", initializer.Name(), @@ -192,9 +193,9 @@ func (pi *ParallelInitializer) init(initializer namedInitializer, config config. return nil } -func (pi *ParallelInitializer) finalizeWithTimeout(initializer namedFinalizer) error { +func (pi *ParallelInitializer) finalizeWithTimeout(ctx context.Context, initializer namedFinalizer) error { errChan := makeErrChan(func() error { - return pi.finalize(initializer) + return pi.finalize(ctx, initializer) }) select { @@ -206,7 +207,7 @@ func (pi *ParallelInitializer) finalizeWithTimeout(initializer namedFinalizer) e } } -func (pi *ParallelInitializer) finalize(initializer namedFinalizer) error { +func (pi *ParallelInitializer) finalize(ctx context.Context, initializer namedFinalizer) error { finalizer, ok := initializer.Wrapped().(Finalizer) if !ok { return nil @@ -214,7 +215,7 @@ func (pi *ParallelInitializer) finalize(initializer namedFinalizer) error { pi.Logger.WithFields(initializer.LogFields()).Info("Finalizing %s", initializer.Name()) - if err := finalizer.Finalize(); err != nil { + if err := finalizer.Finalize(ctx); err != nil { return fmt.Errorf( "%s returned error from finalize (%s)", initializer.Name(), diff --git a/parallel_initializer_test.go b/parallel_initializer_test.go index 270051f..1cd3da7 100644 --- a/parallel_initializer_test.go +++ b/parallel_initializer_test.go @@ -1,6 +1,7 @@ package process import ( + "context" "fmt" "testing" @@ -29,7 +30,7 @@ func TestParallelInitializerInitialize(t *testing.T) { pi.RegisterInitializer(i2) pi.RegisterInitializer(i3) - require.Nil(t, pi.Init(nil)) + require.Nil(t, pi.Init(context.Background(), nil)) // May initialize in any order eventually(t, stringChanReceivesUnordered(init, "a", "b", "c")) @@ -69,7 +70,7 @@ func TestParallelInitializerInitError(t *testing.T) { WithInitializerName("c")(m3) WithInitializerName("d")(m4) - err := pi.Init(nil) + err := pi.Init(context.Background(), nil) require.NotNil(t, err) expected := []errMeta{ @@ -104,7 +105,7 @@ func TestParallelInitializerFinalize(t *testing.T) { pi.RegisterInitializer(i2) pi.RegisterInitializer(i3) - require.Nil(t, pi.Finalize()) + require.Nil(t, pi.Finalize(context.Background())) // May finalize in any order eventually(t, stringChanReceivesUnordered(finalize, "a", "c")) @@ -140,7 +141,7 @@ func TestParallelInitializerFinalizeError(t *testing.T) { WithInitializerName("b")(m2) WithInitializerName("c")(m3) - err := pi.Finalize() + err := pi.Finalize(context.Background()) require.NotNil(t, err) expected := []errMeta{ diff --git a/process.go b/process.go index c8c92b0..bc619ca 100644 --- a/process.go +++ b/process.go @@ -1,5 +1,7 @@ package process +import "context" + // Process is an interface that continually performs a behavior // during the life of a program. Generally, one process should // do a single thing. Multiple processes can be registered to @@ -16,10 +18,10 @@ type Process interface { // called (at which point a nil error should be returned). // If this method is non-blocking, then the process should // be registered with the WithSilentExit option. - Start() error + Start(ctx context.Context) error // Stop informs the work being performed by the Start // method to begin a graceful shutdown. This method is // not expected to block until shutdown completes. - Stop() error + Stop(ctx context.Context) error } diff --git a/process_container_test.go b/process_container_test.go index 0f27bc8..13407da 100644 --- a/process_container_test.go +++ b/process_container_test.go @@ -1,6 +1,7 @@ package process import ( + "context" "fmt" "testing" "time" @@ -11,9 +12,9 @@ import ( ) func TestProcessContainerInitializers(t *testing.T) { - i1 := InitializerFunc(func(config.Config) error { return fmt.Errorf("a") }) - i2 := InitializerFunc(func(config.Config) error { return fmt.Errorf("b") }) - i3 := InitializerFunc(func(config.Config) error { return fmt.Errorf("c") }) + i1 := InitializerFunc(func(ctx context.Context, c config.Config) error { return fmt.Errorf("a") }) + i2 := InitializerFunc(func(ctx context.Context, c config.Config) error { return fmt.Errorf("b") }) + i3 := InitializerFunc(func(ctx context.Context, c config.Config) error { return fmt.Errorf("c") }) c := NewProcessContainer() c.RegisterInitializer(i1) @@ -34,9 +35,9 @@ func TestProcessContainerInitializers(t *testing.T) { assert.Equal(t, time.Minute*2, initializers[2].InitTimeout()) // Test inner function - assert.EqualError(t, initializers[0].Initializer.Init(nil), "a") - assert.EqualError(t, initializers[1].Initializer.Init(nil), "b") - assert.EqualError(t, initializers[2].Initializer.Init(nil), "c") + assert.EqualError(t, initializers[0].Initializer.Init(context.Background(), nil), "a") + assert.EqualError(t, initializers[1].Initializer.Init(context.Background(), nil), "b") + assert.EqualError(t, initializers[2].Initializer.Init(context.Background(), nil), "c") } func TestProcessContainerProcesses(t *testing.T) { @@ -76,12 +77,12 @@ func TestProcessContainerProcesses(t *testing.T) { assert.Equal(t, "b", p4[0].Name()) // Test inner function - assert.EqualError(t, p1[0].Process.Init(nil), "a") - assert.EqualError(t, p1[1].Process.Init(nil), "f") - assert.EqualError(t, p2[0].Process.Init(nil), "c") - assert.EqualError(t, p2[1].Process.Init(nil), "e") - assert.EqualError(t, p3[0].Process.Init(nil), "d") - assert.EqualError(t, p4[0].Process.Init(nil), "b") + assert.EqualError(t, p1[0].Process.Init(context.Background(), nil), "a") + assert.EqualError(t, p1[1].Process.Init(context.Background(), nil), "f") + assert.EqualError(t, p2[0].Process.Init(context.Background(), nil), "c") + assert.EqualError(t, p2[1].Process.Init(context.Background(), nil), "e") + assert.EqualError(t, p3[0].Process.Init(context.Background(), nil), "d") + assert.EqualError(t, p4[0].Process.Init(context.Background(), nil), "b") } // @@ -95,6 +96,9 @@ func newInitFailProcess(name string) Process { return &initFailProcess{name: name} } -func (p *initFailProcess) Init(config config.Config) error { return fmt.Errorf("%s", p.name) } -func (p *initFailProcess) Start() error { return nil } -func (p *initFailProcess) Stop() error { return nil } +func (p *initFailProcess) Init(ctx context.Context, config config.Config) error { + return fmt.Errorf("%s", p.name) +} + +func (p *initFailProcess) Start(ctx context.Context) error { return nil } +func (p *initFailProcess) Stop(ctx context.Context) error { return nil } diff --git a/process_meta.go b/process_meta.go index 00731fe..f32acae 100644 --- a/process_meta.go +++ b/process_meta.go @@ -1,6 +1,7 @@ package process import ( + "context" "sync" "time" @@ -54,10 +55,10 @@ func (m *ProcessMeta) InitTimeout() time.Duration { // Stop wraps the underlying process's Stop method with a Once // value in order to guarantee that the Stop method will not // take effect multiple times. -func (m *ProcessMeta) Stop() (err error) { +func (m *ProcessMeta) Stop(ctx context.Context) (err error) { m.once.Do(func() { close(m.stopped) - err = m.Process.Stop() + err = m.Process.Stop(ctx) }) return diff --git a/runner.go b/runner.go index 5b27852..aacbe6c 100644 --- a/runner.go +++ b/runner.go @@ -1,6 +1,7 @@ package process import ( + "context" "fmt" "strings" "sync" @@ -23,7 +24,7 @@ type Runner interface { // be sent on this channel (nil errors are ignored). This channel will // close once all processes have exited (or, alternatively, when the // shutdown timeout has elapsed). - Run(config.Config) <-chan error + Run(ctx context.Context, config config.Config) <-chan error // Shutdown will begin a graceful exit of all processes. This method // will block until the runner has exited (the channel from the Run @@ -113,7 +114,7 @@ func NewRunner( return r } -func (r *runner) Run(config config.Config) <-chan error { +func (r *runner) Run(ctx context.Context, config config.Config) <-chan error { // Start watching things before running anything. This ensures that // we start listening for shutdown requests and intercepted signals // as soon as anything starts being initialized. @@ -123,7 +124,7 @@ func (r *runner) Run(config config.Config) <-chan error { // Run the initializers in sequence. If there were no errors, begin // initializing and running processes in priority/registration order. - _ = r.runInitializers(config) && r.runProcesses(config) + _ = r.runInitializers(ctx, config) && r.runProcesses(ctx, config) return r.outChan } @@ -141,19 +142,19 @@ func (r *runner) Shutdown(timeout time.Duration) error { // // Running and Watching -func (r *runner) runInitializers(config config.Config) bool { +func (r *runner) runInitializers(ctx context.Context, config config.Config) bool { r.logger.Info("Running initializers") for i, initializer := range r.processes.GetInitializers() { if err := r.inject(initializer); err != nil { - _ = r.unwindInitializers(i) + _ = r.unwindInitializers(ctx, i) r.errChan <- errMeta{err: err, source: initializer} close(r.errChan) return false } - if err := r.initWithTimeout(initializer, config); err != nil { - _ = r.unwindInitializers(i) + if err := r.initWithTimeout(ctx, initializer, config); err != nil { + _ = r.unwindInitializers(ctx, i) // Parallel initializers may return multiple errors, so // we return all of them here. This check if asymmetric // as there is no equivalent for processes. @@ -169,28 +170,28 @@ func (r *runner) runInitializers(config config.Config) bool { return true } -func (r *runner) runFinalizers(beforeIndex int) bool { +func (r *runner) runFinalizers(ctx context.Context, beforeIndex int) bool { r.logger.Info("Running finalizers") success := true for i := beforeIndex - 1; i >= 0; i-- { for _, process := range r.processes.GetProcessesAtPriorityIndex(i) { - if err := r.finalizeWithTimeout(process); err != nil { + if err := r.finalizeWithTimeout(ctx, process); err != nil { r.errChan <- errMeta{err: err, source: process} success = false } } } - return r.unwindInitializers(r.processes.NumInitializers()) && success + return r.unwindInitializers(ctx, r.processes.NumInitializers()) && success } -func (r *runner) unwindInitializers(beforeIndex int) bool { +func (r *runner) unwindInitializers(ctx context.Context, beforeIndex int) bool { success := true initializers := r.processes.GetInitializers() for i := beforeIndex - 1; i >= 0; i-- { - if err := r.finalizeWithTimeout(initializers[i]); err != nil { + if err := r.finalizeWithTimeout(ctx, initializers[i]); err != nil { // Parallel initializers may return multiple errors, so // we return all of them here. This check if asymmetric // as there is no equivalent for processes. @@ -205,7 +206,7 @@ func (r *runner) unwindInitializers(beforeIndex int) bool { return success } -func (r *runner) runProcesses(config config.Config) bool { +func (r *runner) runProcesses(ctx context.Context, config config.Config) bool { r.logger.Info("Running processes") if !r.injectProcesses() { @@ -221,12 +222,12 @@ func (r *runner) runProcesses(config config.Config) bool { success := true index := 0 for ; index < r.processes.NumPriorities(); index++ { - if !r.initProcessesAtPriorityIndex(config, index) { + if !r.initProcessesAtPriorityIndex(ctx, config, index) { success = false break } - if !r.startProcessesAtPriorityIndex(index) { + if !r.startProcessesAtPriorityIndex(ctx, index) { success = false break } @@ -240,7 +241,7 @@ func (r *runner) runProcesses(config config.Config) bool { go func() { r.wg.Wait() - _ = r.runFinalizers(index) + _ = r.runFinalizers(ctx, index) close(r.errChan) }() @@ -302,11 +303,11 @@ func inject(injectable namedInjectable, services service.ServiceContainer, logge // // Initialization -func (r *runner) initProcessesAtPriorityIndex(config config.Config, index int) bool { +func (r *runner) initProcessesAtPriorityIndex(ctx context.Context, config config.Config, index int) bool { r.logger.Info("Initializing processes at priority index %d", index) for _, process := range r.processes.GetProcessesAtPriorityIndex(index) { - if err := r.initWithTimeout(process, config); err != nil { + if err := r.initWithTimeout(ctx, process, config); err != nil { r.errChan <- errMeta{err: err, source: process} return false } @@ -315,14 +316,14 @@ func (r *runner) initProcessesAtPriorityIndex(config config.Config, index int) b return true } -func (r *runner) initWithTimeout(initializer namedInitializer, config config.Config) error { +func (r *runner) initWithTimeout(ctx context.Context, initializer namedInitializer, config config.Config) error { // Run the initializer in a goroutine. We don't want to block // on this in case we want to abandon reading from this channel // (timeout or shutdown). This is only true for initializer // methods (will not be true for process Start methods). errChan := makeErrChan(func() error { - return r.init(initializer, config) + return r.init(ctx, initializer, config) }) // Construct a timeout chan for the init (if timeout is set to @@ -347,10 +348,10 @@ func (r *runner) initWithTimeout(initializer namedInitializer, config config.Con } } -func (r *runner) init(initializer namedInitializer, config config.Config) error { +func (r *runner) init(ctx context.Context, initializer namedInitializer, config config.Config) error { r.logger.WithFields(initializer.LogFields()).Info("Initializing %s", initializer.Name()) - if err := initializer.Init(config); err != nil { + if err := initializer.Init(ctx, config); err != nil { if _, ok := err.(errMetaSet); ok { // Pass error sets up unchanged return err @@ -370,13 +371,13 @@ func (r *runner) init(initializer namedInitializer, config config.Config) error // // Finalization -func (r *runner) finalizeWithTimeout(initializer namedFinalizer) error { +func (r *runner) finalizeWithTimeout(ctx context.Context, initializer namedFinalizer) error { // Similar to initWithTimeout, run the finalizer in a goroutine // and either return the error result or return an error value // if the finalizer took too long. errChan := makeErrChan(func() error { - return r.finalize(initializer) + return r.finalize(ctx, initializer) }) finalizeTimeoutChan := r.makeTimeoutChan(initializer.FinalizeTimeout()) @@ -390,7 +391,7 @@ func (r *runner) finalizeWithTimeout(initializer namedFinalizer) error { } } -func (r *runner) finalize(initializer namedFinalizer) error { +func (r *runner) finalize(ctx context.Context, initializer namedFinalizer) error { // Finalizer is an optional interface on Initializer. Skip // if this initializer doesn't conform. finalizer, ok := initializer.Wrapped().(Finalizer) @@ -400,7 +401,7 @@ func (r *runner) finalize(initializer namedFinalizer) error { r.logger.WithFields(initializer.LogFields()).Info("Finalizing %s", initializer.Name()) - if err := finalizer.Finalize(); err != nil { + if err := finalizer.Finalize(ctx); err != nil { if _, ok := err.(errMetaSet); ok { // Pass error sets up unchanged return err @@ -420,7 +421,7 @@ func (r *runner) finalize(initializer namedFinalizer) error { // // Process Starting -func (r *runner) startProcessesAtPriorityIndex(index int) bool { +func (r *runner) startProcessesAtPriorityIndex(ctx context.Context, index int) bool { r.logger.Info("Starting processes at priority index %d", index) // For each process group, we create a goroutine that will shutdown @@ -434,7 +435,7 @@ func (r *runner) startProcessesAtPriorityIndex(index int) bool { go func() { defer r.wg.Done() <-r.watcher.shutdownSignal - r.stopProcessesAtPriorityIndex(index) + r.stopProcessesAtPriorityIndex(ctx, index) }() // Create an abandon channel that closes to signal the routine invoking @@ -451,7 +452,7 @@ func (r *runner) startProcessesAtPriorityIndex(index int) bool { go func(p *ProcessMeta) { defer r.wg.Done() - r.startProcess(p, abandonSignal) + r.startProcess(ctx, p, abandonSignal) }(process) } @@ -520,7 +521,7 @@ func (r *runner) getHealthDescriptions() []string { return descriptions } -func (r *runner) startProcess(process *ProcessMeta, abandonSignal <-chan struct{}) { +func (r *runner) startProcess(ctx context.Context, process *ProcessMeta, abandonSignal <-chan struct{}) { r.logger.WithFields(process.LogFields()).Info("Starting %s", process.Name()) // Run the start method in a goroutine. We need to do @@ -529,7 +530,7 @@ func (r *runner) startProcess(process *ProcessMeta, abandonSignal <-chan struct{ // and timeout behavior. errChan := makeErrChan(func() error { - return process.Start() + return process.Start(ctx) }) // Create a channel for the shutdown timeout. This @@ -586,7 +587,7 @@ func (r *runner) startProcess(process *ProcessMeta, abandonSignal <-chan struct{ // // Process Stopping -func (r *runner) stopProcessesAtPriorityIndex(index int) { +func (r *runner) stopProcessesAtPriorityIndex(ctx context.Context, index int) { r.logger.Info("Stopping processes at priority index %d", index) // Call stop on all processes at this priority index in parallel. We @@ -600,17 +601,17 @@ func (r *runner) stopProcessesAtPriorityIndex(index int) { go func(process *ProcessMeta) { defer r.wg.Done() - if err := r.stop(process); err != nil { + if err := r.stop(ctx, process); err != nil { r.errChan <- errMeta{err: err, source: process} } }(process) } } -func (r *runner) stop(process *ProcessMeta) error { +func (r *runner) stop(ctx context.Context, process *ProcessMeta) error { r.logger.WithFields(process.LogFields()).Info("Stopping %s", process.Name()) - if err := process.Stop(); err != nil { + if err := process.Stop(ctx); err != nil { return fmt.Errorf( "%s returned error from stop (%s)", process.Name(), diff --git a/runner_test.go b/runner_test.go index 48cf7b5..3b727b0 100644 --- a/runner_test.go +++ b/runner_test.go @@ -1,6 +1,7 @@ package process import ( + "context" "fmt" "testing" "time" @@ -45,7 +46,7 @@ func TestRunnerRunOrder(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -105,7 +106,7 @@ func TestRunnerEarlyExit(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -113,7 +114,7 @@ func TestRunnerEarlyExit(t *testing.T) { eventually(t, stringChanReceivesOrdered(init, "a", "b")) eventually(t, stringChanReceivesN(start, 2)) - go p2.Stop() + go p2.Stop(context.Background()) // Stopping one process should shutdown the rest eventually(t, stringChanReceivesUnordered(stop, "b")) @@ -137,12 +138,12 @@ func TestRunnerSilentExit(t *testing.T) { processes.RegisterProcess(p1) processes.RegisterProcess(p2, WithSilentExit()) - go runner.Run(nil) + go runner.Run(context.Background(), nil) eventually(t, stringChanReceivesOrdered(init, "a", "b")) eventually(t, stringChanReceivesN(start, 2)) - go p2.Stop() + go p2.Stop(context.Background()) eventually(t, stringChanReceivesUnordered(stop, "b")) } @@ -164,7 +165,7 @@ func TestRunnerShutdownTimeout(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -219,7 +220,7 @@ func TestRunnerProcessStartTimeout(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -262,7 +263,7 @@ func TestRunnerProcessShutdownTimeout(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -314,7 +315,7 @@ func TestRunnerInitializerInjectionError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -362,7 +363,7 @@ func TestRunnerProcessInjectionError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -401,7 +402,7 @@ func TestRunnerInitializerInitTimeout(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -439,7 +440,7 @@ func TestRunnerFinalizerFinalizeTimeout(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -486,7 +487,7 @@ func TestRunnerFinalizerError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -527,7 +528,7 @@ func TestRunnerProcessInitTimeout(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -568,7 +569,7 @@ func TestRunnerInitializerError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -621,7 +622,7 @@ func TestRunnerProcessInitError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -684,7 +685,7 @@ func TestRunnerProcessStartError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -754,7 +755,7 @@ func TestRunnerProcessStopError(t *testing.T) { go func() { defer close(errChan) - for err := range runner.Run(nil) { + for err := range runner.Run(context.Background(), nil) { errChan <- err } }() @@ -796,7 +797,7 @@ func newTaggedInitializer(init chan<- string, name string) *taggedInitializer { } } -func (i *taggedInitializer) Init(c config.Config) error { +func (i *taggedInitializer) Init(ctx context.Context, c config.Config) error { i.init <- i.name return i.initErr } @@ -820,7 +821,7 @@ func newTaggedFinalizer(init chan<- string, finalize chan<- string, name string) } } -func (i *taggedFinalizer) Finalize() error { +func (i *taggedFinalizer) Finalize(ctx context.Context) error { i.finalize <- i.name return i.finalizeErr } @@ -850,12 +851,12 @@ func newTaggedProcess(init, start, stop chan<- string, name string) *taggedProce } } -func (p *taggedProcess) Init(c config.Config) error { +func (p *taggedProcess) Init(ctx context.Context, c config.Config) error { p.init <- p.name return p.initErr } -func (p *taggedProcess) Start() error { +func (p *taggedProcess) Start(ctx context.Context) error { p.start <- p.name if p.startErr != nil { @@ -866,7 +867,7 @@ func (p *taggedProcess) Start() error { return nil } -func (p *taggedProcess) Stop() error { +func (p *taggedProcess) Stop(ctx context.Context) error { p.stop <- p.name p.wait <- struct{}{} return p.stopErr @@ -885,7 +886,7 @@ func newTaggedProcessFinalizer(taggedProcess taggedProcess, finalize chan<- stri } } -func (i *taggedProcessFinalizer) Finalize() error { +func (i *taggedProcessFinalizer) Finalize(ctx context.Context) error { i.finalize <- i.name return i.finalizeErr } @@ -901,9 +902,9 @@ func newBlockingProcess(sync chan struct{}) *blockingProcess { } } -func (p *blockingProcess) Init(c config.Config) error { return nil } -func (p *blockingProcess) Start() error { close(p.sync); <-p.wait; return nil } -func (p *blockingProcess) Stop() error { return nil } +func (p *blockingProcess) Init(ctx context.Context, c config.Config) error { return nil } +func (p *blockingProcess) Start(ctx context.Context) error { close(p.sync); <-p.wait; return nil } +func (p *blockingProcess) Stop(ctx context.Context) error { return nil } // // @@ -919,7 +920,7 @@ type processWithService struct { func newInitializerWithService() *initializerWithService { return &initializerWithService{} } func newProcessWithService() *processWithService { return &processWithService{} } -func (i *initializerWithService) Init(c config.Config) error { return nil } -func (p *processWithService) Init(c config.Config) error { return nil } -func (p *processWithService) Start() error { return nil } -func (p *processWithService) Stop() error { return nil } +func (i *initializerWithService) Init(ctx context.Context, c config.Config) error { return nil } +func (p *processWithService) Init(ctx context.Context, c config.Config) error { return nil } +func (p *processWithService) Start(ctx context.Context) error { return nil } +func (p *processWithService) Stop(ctx context.Context) error { return nil } diff --git a/watcher_test.go b/watcher_test.go index 2de45a7..b2001f6 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -1,6 +1,7 @@ package process import ( + "context" "fmt" "syscall" "testing" @@ -130,7 +131,7 @@ func TestWatcherShutdownTimeout(t *testing.T) { // func makeNamedInitializer(name string) namedInitializer { - initializer := InitializerFunc(func(config.Config) error { + initializer := InitializerFunc(func(ctx context.Context, c config.Config) error { return nil }) From 1df194c366d887cefd5184bb29ca5f9f3a26b279 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Mon, 21 Dec 2020 11:18:09 -0600 Subject: [PATCH 3/9] Cancel context after stop but before finalize. --- process_meta.go | 4 ++++ runner.go | 2 ++ runner_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/process_meta.go b/process_meta.go index f32acae..352cd1a 100644 --- a/process_meta.go +++ b/process_meta.go @@ -18,6 +18,7 @@ type ProcessMeta struct { silentExit bool once *sync.Once stopped chan struct{} + cancelCtx func() initTimeout time.Duration startTimeout time.Duration shutdownTimeout time.Duration @@ -59,6 +60,9 @@ func (m *ProcessMeta) Stop(ctx context.Context) (err error) { m.once.Do(func() { close(m.stopped) err = m.Process.Stop(ctx) + if m.cancelCtx != nil { + m.cancelCtx() + } }) return diff --git a/runner.go b/runner.go index aacbe6c..01b9cac 100644 --- a/runner.go +++ b/runner.go @@ -530,6 +530,8 @@ func (r *runner) startProcess(ctx context.Context, process *ProcessMeta, abandon // and timeout behavior. errChan := makeErrChan(func() error { + ctx, cancelCtx := context.WithCancel(ctx) + process.cancelCtx = cancelCtx return process.Start(ctx) }) diff --git a/runner_test.go b/runner_test.go index 3b727b0..7d95a47 100644 --- a/runner_test.go +++ b/runner_test.go @@ -781,6 +781,45 @@ func TestRunnerProcessStopError(t *testing.T) { eventually(t, errorChanClosed(errChan)) } +func TestContext(t *testing.T) { + services := service.NewServiceContainer() + processes := NewProcessContainer() + health := NewHealth() + runner := NewRunner(processes, services, health) + + p1 := newContextProcess() + p2 := newContextProcess() + p3 := newContextProcess() + + // Register things + processes.RegisterProcess(p1, WithPriority(1)) + processes.RegisterProcess(p2, WithPriority(2)) + processes.RegisterProcess(p3, WithPriority(3)) + + errChan := make(chan error) + shutdownChan := make(chan error) + ctx, cancelCtx := context.WithCancel(context.Background()) + + go func() { + defer close(errChan) + + for err := range runner.Run(ctx, nil) { + errChan <- err + } + }() + + cancelCtx() + + go func() { + defer close(shutdownChan) + shutdownChan <- runner.Shutdown(time.Minute) + }() + + // Ensure unblocked + eventually(t, errorChanClosed(shutdownChan)) + eventually(t, errorChanClosed(errChan)) +} + // // @@ -924,3 +963,17 @@ func (i *initializerWithService) Init(ctx context.Context, c config.Config) erro func (p *processWithService) Init(ctx context.Context, c config.Config) error { return nil } func (p *processWithService) Start(ctx context.Context) error { return nil } func (p *processWithService) Stop(ctx context.Context) error { return nil } + +// +// + +type contextProcess struct { +} + +func newContextProcess() *contextProcess { + return &contextProcess{} +} + +func (p *contextProcess) Init(ctx context.Context, c config.Config) error { return nil } +func (p *contextProcess) Start(ctx context.Context) error { <-ctx.Done(); return nil } +func (p *contextProcess) Stop(ctx context.Context) error { return nil } From 46f3ed67f480a2db079a9a10028e19f78cac2826 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Mon, 21 Dec 2020 11:20:21 -0600 Subject: [PATCH 4/9] Update changelog. --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93d0a72..d0647a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## [Unreleased] +### Added + +- Added `Finalize` support to `Process` interface. [#5](https://github.com/go-nacelle/process/pull/5) + +### Changed + +- Added context parameters to `Init`, `Start`, `Stop`, and `Finalize` methods. [#5](https://github.com/go-nacelle/process/pull/5) + ## [v1.1.0] - 2020-10-03 ### Added From 244776d7d633fb6dbca7c52ceeba66162ebae3a3 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Mon, 21 Dec 2020 11:23:29 -0600 Subject: [PATCH 5/9] Regenerate mocks. --- mocks/finalizer.go | 28 ++++++++------- mocks/initializer.go | 30 +++++++++------- mocks/process.go | 84 +++++++++++++++++++++++++------------------- mocks/runner.go | 30 +++++++++------- 4 files changed, 97 insertions(+), 75 deletions(-) diff --git a/mocks/finalizer.go b/mocks/finalizer.go index 7a809d9..9a6d44c 100644 --- a/mocks/finalizer.go +++ b/mocks/finalizer.go @@ -3,6 +3,7 @@ package mocks import ( + "context" process "github.com/go-nacelle/process" "sync" ) @@ -20,7 +21,7 @@ type MockFinalizer struct { func NewMockFinalizer() *MockFinalizer { return &MockFinalizer{ FinalizeFunc: &FinalizerFinalizeFunc{ - defaultHook: func() error { + defaultHook: func(context.Context) error { return nil }, }, @@ -40,23 +41,23 @@ func NewMockFinalizerFrom(i process.Finalizer) *MockFinalizer { // FinalizerFinalizeFunc describes the behavior when the Finalize method of // the parent MockFinalizer instance is invoked. type FinalizerFinalizeFunc struct { - defaultHook func() error - hooks []func() error + defaultHook func(context.Context) error + hooks []func(context.Context) error history []FinalizerFinalizeFuncCall mutex sync.Mutex } // Finalize delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockFinalizer) Finalize() error { - r0 := m.FinalizeFunc.nextHook()() - m.FinalizeFunc.appendCall(FinalizerFinalizeFuncCall{r0}) +func (m *MockFinalizer) Finalize(v0 context.Context) error { + r0 := m.FinalizeFunc.nextHook()(v0) + m.FinalizeFunc.appendCall(FinalizerFinalizeFuncCall{v0, r0}) return r0 } // SetDefaultHook sets function that is called when the Finalize method of // the parent MockFinalizer instance is invoked and the hook queue is empty. -func (f *FinalizerFinalizeFunc) SetDefaultHook(hook func() error) { +func (f *FinalizerFinalizeFunc) SetDefaultHook(hook func(context.Context) error) { f.defaultHook = hook } @@ -64,7 +65,7 @@ func (f *FinalizerFinalizeFunc) SetDefaultHook(hook func() error) { // Finalize method of the parent MockFinalizer instance invokes the hook at // the front of the queue and discards it. After the queue is empty, the // default hook function is invoked for any future action. -func (f *FinalizerFinalizeFunc) PushHook(hook func() error) { +func (f *FinalizerFinalizeFunc) PushHook(hook func(context.Context) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -73,7 +74,7 @@ func (f *FinalizerFinalizeFunc) PushHook(hook func() error) { // SetDefaultReturn calls SetDefaultDefaultHook with a function that returns // the given values. func (f *FinalizerFinalizeFunc) SetDefaultReturn(r0 error) { - f.SetDefaultHook(func() error { + f.SetDefaultHook(func(context.Context) error { return r0 }) } @@ -81,12 +82,12 @@ func (f *FinalizerFinalizeFunc) SetDefaultReturn(r0 error) { // PushReturn calls PushDefaultHook with a function that returns the given // values. func (f *FinalizerFinalizeFunc) PushReturn(r0 error) { - f.PushHook(func() error { + f.PushHook(func(context.Context) error { return r0 }) } -func (f *FinalizerFinalizeFunc) nextHook() func() error { +func (f *FinalizerFinalizeFunc) nextHook() func(context.Context) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -119,6 +120,9 @@ func (f *FinalizerFinalizeFunc) History() []FinalizerFinalizeFuncCall { // FinalizerFinalizeFuncCall is an object that describes an invocation of // method Finalize on an instance of MockFinalizer. type FinalizerFinalizeFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context // Result0 is the value of the 1st result returned from this method // invocation. Result0 error @@ -127,7 +131,7 @@ type FinalizerFinalizeFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c FinalizerFinalizeFuncCall) Args() []interface{} { - return []interface{}{} + return []interface{}{c.Arg0} } // Results returns an interface slice containing the results of this diff --git a/mocks/initializer.go b/mocks/initializer.go index b029e6d..14c334e 100644 --- a/mocks/initializer.go +++ b/mocks/initializer.go @@ -3,6 +3,7 @@ package mocks import ( + "context" config "github.com/go-nacelle/config" process "github.com/go-nacelle/process" "sync" @@ -21,7 +22,7 @@ type MockInitializer struct { func NewMockInitializer() *MockInitializer { return &MockInitializer{ InitFunc: &InitializerInitFunc{ - defaultHook: func(config.Config) error { + defaultHook: func(context.Context, config.Config) error { return nil }, }, @@ -42,23 +43,23 @@ func NewMockInitializerFrom(i process.Initializer) *MockInitializer { // InitializerInitFunc describes the behavior when the Init method of the // parent MockInitializer instance is invoked. type InitializerInitFunc struct { - defaultHook func(config.Config) error - hooks []func(config.Config) error + defaultHook func(context.Context, config.Config) error + hooks []func(context.Context, config.Config) error history []InitializerInitFuncCall mutex sync.Mutex } // Init delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockInitializer) Init(v0 config.Config) error { - r0 := m.InitFunc.nextHook()(v0) - m.InitFunc.appendCall(InitializerInitFuncCall{v0, r0}) +func (m *MockInitializer) Init(v0 context.Context, v1 config.Config) error { + r0 := m.InitFunc.nextHook()(v0, v1) + m.InitFunc.appendCall(InitializerInitFuncCall{v0, v1, r0}) return r0 } // SetDefaultHook sets function that is called when the Init method of the // parent MockInitializer instance is invoked and the hook queue is empty. -func (f *InitializerInitFunc) SetDefaultHook(hook func(config.Config) error) { +func (f *InitializerInitFunc) SetDefaultHook(hook func(context.Context, config.Config) error) { f.defaultHook = hook } @@ -66,7 +67,7 @@ func (f *InitializerInitFunc) SetDefaultHook(hook func(config.Config) error) { // Init method of the parent MockInitializer instance invokes the hook at // the front of the queue and discards it. After the queue is empty, the // default hook function is invoked for any future action. -func (f *InitializerInitFunc) PushHook(hook func(config.Config) error) { +func (f *InitializerInitFunc) PushHook(hook func(context.Context, config.Config) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -75,7 +76,7 @@ func (f *InitializerInitFunc) PushHook(hook func(config.Config) error) { // SetDefaultReturn calls SetDefaultDefaultHook with a function that returns // the given values. func (f *InitializerInitFunc) SetDefaultReturn(r0 error) { - f.SetDefaultHook(func(config.Config) error { + f.SetDefaultHook(func(context.Context, config.Config) error { return r0 }) } @@ -83,12 +84,12 @@ func (f *InitializerInitFunc) SetDefaultReturn(r0 error) { // PushReturn calls PushDefaultHook with a function that returns the given // values. func (f *InitializerInitFunc) PushReturn(r0 error) { - f.PushHook(func(config.Config) error { + f.PushHook(func(context.Context, config.Config) error { return r0 }) } -func (f *InitializerInitFunc) nextHook() func(config.Config) error { +func (f *InitializerInitFunc) nextHook() func(context.Context, config.Config) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -123,7 +124,10 @@ func (f *InitializerInitFunc) History() []InitializerInitFuncCall { type InitializerInitFuncCall struct { // Arg0 is the value of the 1st argument passed to this method // invocation. - Arg0 config.Config + Arg0 context.Context + // Arg1 is the value of the 2nd argument passed to this method + // invocation. + Arg1 config.Config // Result0 is the value of the 1st result returned from this method // invocation. Result0 error @@ -132,7 +136,7 @@ type InitializerInitFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c InitializerInitFuncCall) Args() []interface{} { - return []interface{}{c.Arg0} + return []interface{}{c.Arg0, c.Arg1} } // Results returns an interface slice containing the results of this diff --git a/mocks/process.go b/mocks/process.go index 4168000..a35869f 100644 --- a/mocks/process.go +++ b/mocks/process.go @@ -3,6 +3,7 @@ package mocks import ( + "context" config "github.com/go-nacelle/config" process "github.com/go-nacelle/process" "sync" @@ -27,17 +28,17 @@ type MockProcess struct { func NewMockProcess() *MockProcess { return &MockProcess{ InitFunc: &ProcessInitFunc{ - defaultHook: func(config.Config) error { + defaultHook: func(context.Context, config.Config) error { return nil }, }, StartFunc: &ProcessStartFunc{ - defaultHook: func() error { + defaultHook: func(context.Context) error { return nil }, }, StopFunc: &ProcessStopFunc{ - defaultHook: func() error { + defaultHook: func(context.Context) error { return nil }, }, @@ -63,23 +64,23 @@ func NewMockProcessFrom(i process.Process) *MockProcess { // ProcessInitFunc describes the behavior when the Init method of the parent // MockProcess instance is invoked. type ProcessInitFunc struct { - defaultHook func(config.Config) error - hooks []func(config.Config) error + defaultHook func(context.Context, config.Config) error + hooks []func(context.Context, config.Config) error history []ProcessInitFuncCall mutex sync.Mutex } // Init delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockProcess) Init(v0 config.Config) error { - r0 := m.InitFunc.nextHook()(v0) - m.InitFunc.appendCall(ProcessInitFuncCall{v0, r0}) +func (m *MockProcess) Init(v0 context.Context, v1 config.Config) error { + r0 := m.InitFunc.nextHook()(v0, v1) + m.InitFunc.appendCall(ProcessInitFuncCall{v0, v1, r0}) return r0 } // SetDefaultHook sets function that is called when the Init method of the // parent MockProcess instance is invoked and the hook queue is empty. -func (f *ProcessInitFunc) SetDefaultHook(hook func(config.Config) error) { +func (f *ProcessInitFunc) SetDefaultHook(hook func(context.Context, config.Config) error) { f.defaultHook = hook } @@ -87,7 +88,7 @@ func (f *ProcessInitFunc) SetDefaultHook(hook func(config.Config) error) { // Init method of the parent MockProcess instance invokes the hook at the // front of the queue and discards it. After the queue is empty, the default // hook function is invoked for any future action. -func (f *ProcessInitFunc) PushHook(hook func(config.Config) error) { +func (f *ProcessInitFunc) PushHook(hook func(context.Context, config.Config) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -96,7 +97,7 @@ func (f *ProcessInitFunc) PushHook(hook func(config.Config) error) { // SetDefaultReturn calls SetDefaultDefaultHook with a function that returns // the given values. func (f *ProcessInitFunc) SetDefaultReturn(r0 error) { - f.SetDefaultHook(func(config.Config) error { + f.SetDefaultHook(func(context.Context, config.Config) error { return r0 }) } @@ -104,12 +105,12 @@ func (f *ProcessInitFunc) SetDefaultReturn(r0 error) { // PushReturn calls PushDefaultHook with a function that returns the given // values. func (f *ProcessInitFunc) PushReturn(r0 error) { - f.PushHook(func(config.Config) error { + f.PushHook(func(context.Context, config.Config) error { return r0 }) } -func (f *ProcessInitFunc) nextHook() func(config.Config) error { +func (f *ProcessInitFunc) nextHook() func(context.Context, config.Config) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -144,7 +145,10 @@ func (f *ProcessInitFunc) History() []ProcessInitFuncCall { type ProcessInitFuncCall struct { // Arg0 is the value of the 1st argument passed to this method // invocation. - Arg0 config.Config + Arg0 context.Context + // Arg1 is the value of the 2nd argument passed to this method + // invocation. + Arg1 config.Config // Result0 is the value of the 1st result returned from this method // invocation. Result0 error @@ -153,7 +157,7 @@ type ProcessInitFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c ProcessInitFuncCall) Args() []interface{} { - return []interface{}{c.Arg0} + return []interface{}{c.Arg0, c.Arg1} } // Results returns an interface slice containing the results of this @@ -165,23 +169,23 @@ func (c ProcessInitFuncCall) Results() []interface{} { // ProcessStartFunc describes the behavior when the Start method of the // parent MockProcess instance is invoked. type ProcessStartFunc struct { - defaultHook func() error - hooks []func() error + defaultHook func(context.Context) error + hooks []func(context.Context) error history []ProcessStartFuncCall mutex sync.Mutex } // Start delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockProcess) Start() error { - r0 := m.StartFunc.nextHook()() - m.StartFunc.appendCall(ProcessStartFuncCall{r0}) +func (m *MockProcess) Start(v0 context.Context) error { + r0 := m.StartFunc.nextHook()(v0) + m.StartFunc.appendCall(ProcessStartFuncCall{v0, r0}) return r0 } // SetDefaultHook sets function that is called when the Start method of the // parent MockProcess instance is invoked and the hook queue is empty. -func (f *ProcessStartFunc) SetDefaultHook(hook func() error) { +func (f *ProcessStartFunc) SetDefaultHook(hook func(context.Context) error) { f.defaultHook = hook } @@ -189,7 +193,7 @@ func (f *ProcessStartFunc) SetDefaultHook(hook func() error) { // Start method of the parent MockProcess instance invokes the hook at the // front of the queue and discards it. After the queue is empty, the default // hook function is invoked for any future action. -func (f *ProcessStartFunc) PushHook(hook func() error) { +func (f *ProcessStartFunc) PushHook(hook func(context.Context) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -198,7 +202,7 @@ func (f *ProcessStartFunc) PushHook(hook func() error) { // SetDefaultReturn calls SetDefaultDefaultHook with a function that returns // the given values. func (f *ProcessStartFunc) SetDefaultReturn(r0 error) { - f.SetDefaultHook(func() error { + f.SetDefaultHook(func(context.Context) error { return r0 }) } @@ -206,12 +210,12 @@ func (f *ProcessStartFunc) SetDefaultReturn(r0 error) { // PushReturn calls PushDefaultHook with a function that returns the given // values. func (f *ProcessStartFunc) PushReturn(r0 error) { - f.PushHook(func() error { + f.PushHook(func(context.Context) error { return r0 }) } -func (f *ProcessStartFunc) nextHook() func() error { +func (f *ProcessStartFunc) nextHook() func(context.Context) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -244,6 +248,9 @@ func (f *ProcessStartFunc) History() []ProcessStartFuncCall { // ProcessStartFuncCall is an object that describes an invocation of method // Start on an instance of MockProcess. type ProcessStartFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context // Result0 is the value of the 1st result returned from this method // invocation. Result0 error @@ -252,7 +259,7 @@ type ProcessStartFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c ProcessStartFuncCall) Args() []interface{} { - return []interface{}{} + return []interface{}{c.Arg0} } // Results returns an interface slice containing the results of this @@ -264,23 +271,23 @@ func (c ProcessStartFuncCall) Results() []interface{} { // ProcessStopFunc describes the behavior when the Stop method of the parent // MockProcess instance is invoked. type ProcessStopFunc struct { - defaultHook func() error - hooks []func() error + defaultHook func(context.Context) error + hooks []func(context.Context) error history []ProcessStopFuncCall mutex sync.Mutex } // Stop delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockProcess) Stop() error { - r0 := m.StopFunc.nextHook()() - m.StopFunc.appendCall(ProcessStopFuncCall{r0}) +func (m *MockProcess) Stop(v0 context.Context) error { + r0 := m.StopFunc.nextHook()(v0) + m.StopFunc.appendCall(ProcessStopFuncCall{v0, r0}) return r0 } // SetDefaultHook sets function that is called when the Stop method of the // parent MockProcess instance is invoked and the hook queue is empty. -func (f *ProcessStopFunc) SetDefaultHook(hook func() error) { +func (f *ProcessStopFunc) SetDefaultHook(hook func(context.Context) error) { f.defaultHook = hook } @@ -288,7 +295,7 @@ func (f *ProcessStopFunc) SetDefaultHook(hook func() error) { // Stop method of the parent MockProcess instance invokes the hook at the // front of the queue and discards it. After the queue is empty, the default // hook function is invoked for any future action. -func (f *ProcessStopFunc) PushHook(hook func() error) { +func (f *ProcessStopFunc) PushHook(hook func(context.Context) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -297,7 +304,7 @@ func (f *ProcessStopFunc) PushHook(hook func() error) { // SetDefaultReturn calls SetDefaultDefaultHook with a function that returns // the given values. func (f *ProcessStopFunc) SetDefaultReturn(r0 error) { - f.SetDefaultHook(func() error { + f.SetDefaultHook(func(context.Context) error { return r0 }) } @@ -305,12 +312,12 @@ func (f *ProcessStopFunc) SetDefaultReturn(r0 error) { // PushReturn calls PushDefaultHook with a function that returns the given // values. func (f *ProcessStopFunc) PushReturn(r0 error) { - f.PushHook(func() error { + f.PushHook(func(context.Context) error { return r0 }) } -func (f *ProcessStopFunc) nextHook() func() error { +func (f *ProcessStopFunc) nextHook() func(context.Context) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -343,6 +350,9 @@ func (f *ProcessStopFunc) History() []ProcessStopFuncCall { // ProcessStopFuncCall is an object that describes an invocation of method // Stop on an instance of MockProcess. type ProcessStopFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context // Result0 is the value of the 1st result returned from this method // invocation. Result0 error @@ -351,7 +361,7 @@ type ProcessStopFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c ProcessStopFuncCall) Args() []interface{} { - return []interface{}{} + return []interface{}{c.Arg0} } // Results returns an interface slice containing the results of this diff --git a/mocks/runner.go b/mocks/runner.go index 0452cff..6728f57 100644 --- a/mocks/runner.go +++ b/mocks/runner.go @@ -3,6 +3,7 @@ package mocks import ( + "context" config "github.com/go-nacelle/config" process "github.com/go-nacelle/process" "sync" @@ -25,7 +26,7 @@ type MockRunner struct { func NewMockRunner() *MockRunner { return &MockRunner{ RunFunc: &RunnerRunFunc{ - defaultHook: func(config.Config) <-chan error { + defaultHook: func(context.Context, config.Config) <-chan error { return nil }, }, @@ -53,23 +54,23 @@ func NewMockRunnerFrom(i process.Runner) *MockRunner { // RunnerRunFunc describes the behavior when the Run method of the parent // MockRunner instance is invoked. type RunnerRunFunc struct { - defaultHook func(config.Config) <-chan error - hooks []func(config.Config) <-chan error + defaultHook func(context.Context, config.Config) <-chan error + hooks []func(context.Context, config.Config) <-chan error history []RunnerRunFuncCall mutex sync.Mutex } // Run delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockRunner) Run(v0 config.Config) <-chan error { - r0 := m.RunFunc.nextHook()(v0) - m.RunFunc.appendCall(RunnerRunFuncCall{v0, r0}) +func (m *MockRunner) Run(v0 context.Context, v1 config.Config) <-chan error { + r0 := m.RunFunc.nextHook()(v0, v1) + m.RunFunc.appendCall(RunnerRunFuncCall{v0, v1, r0}) return r0 } // SetDefaultHook sets function that is called when the Run method of the // parent MockRunner instance is invoked and the hook queue is empty. -func (f *RunnerRunFunc) SetDefaultHook(hook func(config.Config) <-chan error) { +func (f *RunnerRunFunc) SetDefaultHook(hook func(context.Context, config.Config) <-chan error) { f.defaultHook = hook } @@ -77,7 +78,7 @@ func (f *RunnerRunFunc) SetDefaultHook(hook func(config.Config) <-chan error) { // Run method of the parent MockRunner instance invokes the hook at the // front of the queue and discards it. After the queue is empty, the default // hook function is invoked for any future action. -func (f *RunnerRunFunc) PushHook(hook func(config.Config) <-chan error) { +func (f *RunnerRunFunc) PushHook(hook func(context.Context, config.Config) <-chan error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -86,7 +87,7 @@ func (f *RunnerRunFunc) PushHook(hook func(config.Config) <-chan error) { // SetDefaultReturn calls SetDefaultDefaultHook with a function that returns // the given values. func (f *RunnerRunFunc) SetDefaultReturn(r0 <-chan error) { - f.SetDefaultHook(func(config.Config) <-chan error { + f.SetDefaultHook(func(context.Context, config.Config) <-chan error { return r0 }) } @@ -94,12 +95,12 @@ func (f *RunnerRunFunc) SetDefaultReturn(r0 <-chan error) { // PushReturn calls PushDefaultHook with a function that returns the given // values. func (f *RunnerRunFunc) PushReturn(r0 <-chan error) { - f.PushHook(func(config.Config) <-chan error { + f.PushHook(func(context.Context, config.Config) <-chan error { return r0 }) } -func (f *RunnerRunFunc) nextHook() func(config.Config) <-chan error { +func (f *RunnerRunFunc) nextHook() func(context.Context, config.Config) <-chan error { f.mutex.Lock() defer f.mutex.Unlock() @@ -134,7 +135,10 @@ func (f *RunnerRunFunc) History() []RunnerRunFuncCall { type RunnerRunFuncCall struct { // Arg0 is the value of the 1st argument passed to this method // invocation. - Arg0 config.Config + Arg0 context.Context + // Arg1 is the value of the 2nd argument passed to this method + // invocation. + Arg1 config.Config // Result0 is the value of the 1st result returned from this method // invocation. Result0 <-chan error @@ -143,7 +147,7 @@ type RunnerRunFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c RunnerRunFuncCall) Args() []interface{} { - return []interface{}{c.Arg0} + return []interface{}{c.Arg0, c.Arg1} } // Results returns an interface slice containing the results of this From 002d2582a0d9385a74db6bd3a0aac0def566536f Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Mon, 21 Dec 2020 11:49:04 -0600 Subject: [PATCH 6/9] Fix data race. --- process_meta.go | 23 +++++++++++++++++++++-- runner.go | 2 +- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/process_meta.go b/process_meta.go index 352cd1a..57a0d55 100644 --- a/process_meta.go +++ b/process_meta.go @@ -11,6 +11,7 @@ import ( // ProcessMeta wraps a process with some package private // fields. type ProcessMeta struct { + sync.RWMutex Process name string logFields log.LogFields @@ -58,16 +59,34 @@ func (m *ProcessMeta) InitTimeout() time.Duration { // take effect multiple times. func (m *ProcessMeta) Stop(ctx context.Context) (err error) { m.once.Do(func() { + m.RLock() close(m.stopped) + cancelCtx := m.cancelCtx + m.RUnlock() + err = m.Process.Stop(ctx) - if m.cancelCtx != nil { - m.cancelCtx() + + if cancelCtx != nil { + cancelCtx() } }) return } +func (m *ProcessMeta) setCancelCtx(cancelCtx func()) { + m.Lock() + defer m.Unlock() + + select { + case <-m.stopped: + cancelCtx() + return + default: + m.cancelCtx = cancelCtx + } +} + // FinalizeTimeout returns the maximum timeout allowed for a call to // the Finalize function. A zero value indicates no timeout. func (m *ProcessMeta) FinalizeTimeout() time.Duration { diff --git a/runner.go b/runner.go index 01b9cac..d6610dd 100644 --- a/runner.go +++ b/runner.go @@ -531,7 +531,7 @@ func (r *runner) startProcess(ctx context.Context, process *ProcessMeta, abandon errChan := makeErrChan(func() error { ctx, cancelCtx := context.WithCancel(ctx) - process.cancelCtx = cancelCtx + process.setCancelCtx(cancelCtx) return process.Start(ctx) }) From 78eb9dd6b2b9327d06ccfc0a84bd7da1cb2951f1 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Wed, 30 Dec 2020 15:04:15 -0600 Subject: [PATCH 7/9] Add context filter options. --- initializer_meta.go | 10 ++++++++++ initializer_options.go | 6 ++++++ process_meta.go | 9 +++++++++ process_options.go | 6 ++++++ runner.go | 13 +++++++------ 5 files changed, 38 insertions(+), 6 deletions(-) diff --git a/initializer_meta.go b/initializer_meta.go index f7bea20..ca254e4 100644 --- a/initializer_meta.go +++ b/initializer_meta.go @@ -1,6 +1,7 @@ package process import ( + "context" "time" "github.com/go-nacelle/log" @@ -10,6 +11,7 @@ import ( // private fields. type InitializerMeta struct { Initializer + contextFilter func(ctx context.Context) context.Context name string logFields log.LogFields initTimeout time.Duration @@ -22,6 +24,14 @@ func newInitializerMeta(initializer Initializer) *InitializerMeta { } } +func (m *InitializerMeta) FilterContext(ctx context.Context) context.Context { + if m.contextFilter == nil { + return ctx + } + + return m.contextFilter(ctx) +} + // Name returns the name of the initializer. func (m *InitializerMeta) Name() string { if m.name == "" { diff --git a/initializer_options.go b/initializer_options.go index 57df09d..45458c4 100644 --- a/initializer_options.go +++ b/initializer_options.go @@ -1,6 +1,7 @@ package process import ( + "context" "time" "github.com/go-nacelle/log" @@ -10,6 +11,11 @@ import ( // metadata to an initializer during registration. type InitializerConfigFunc func(*InitializerMeta) +// WithInitializerContextFilter sets the context filter for the initializer. +func WithInitializerContextFilter(f func(ctx context.Context) context.Context) InitializerConfigFunc { + return func(meta *InitializerMeta) { meta.contextFilter = f } +} + // WithInitializerName assigns a name to an initializer, visible in logs. func WithInitializerName(name string) InitializerConfigFunc { return func(meta *InitializerMeta) { meta.name = name } diff --git a/process_meta.go b/process_meta.go index 57a0d55..bdb51e1 100644 --- a/process_meta.go +++ b/process_meta.go @@ -13,6 +13,7 @@ import ( type ProcessMeta struct { sync.RWMutex Process + contextFilter func(ctx context.Context) context.Context name string logFields log.LogFields priority int @@ -34,6 +35,14 @@ func newProcessMeta(process Process) *ProcessMeta { } } +func (m *ProcessMeta) FilterContext(ctx context.Context) context.Context { + if m.contextFilter == nil { + return ctx + } + + return m.contextFilter(ctx) +} + // Name returns the name of the process. func (m *ProcessMeta) Name() string { if m.name == "" { diff --git a/process_options.go b/process_options.go index ea2eb92..e355c67 100644 --- a/process_options.go +++ b/process_options.go @@ -1,6 +1,7 @@ package process import ( + "context" "time" "github.com/go-nacelle/log" @@ -10,6 +11,11 @@ import ( // to an process during registration. type ProcessConfigFunc func(*ProcessMeta) +// WithProcessContextFilter sets the context filter for the process +func WithProcessContextFilter(f func(ctx context.Context) context.Context) ProcessConfigFunc { + return func(meta *ProcessMeta) { meta.contextFilter = f } +} + // WithProcessName assigns a name to an process, visible in logs. func WithProcessName(name string) ProcessConfigFunc { return func(meta *ProcessMeta) { meta.name = name } diff --git a/runner.go b/runner.go index d6610dd..916105d 100644 --- a/runner.go +++ b/runner.go @@ -153,7 +153,7 @@ func (r *runner) runInitializers(ctx context.Context, config config.Config) bool return false } - if err := r.initWithTimeout(ctx, initializer, config); err != nil { + if err := r.initWithTimeout(initializer.FilterContext(ctx), initializer, config); err != nil { _ = r.unwindInitializers(ctx, i) // Parallel initializers may return multiple errors, so // we return all of them here. This check if asymmetric @@ -176,7 +176,8 @@ func (r *runner) runFinalizers(ctx context.Context, beforeIndex int) bool { success := true for i := beforeIndex - 1; i >= 0; i-- { for _, process := range r.processes.GetProcessesAtPriorityIndex(i) { - if err := r.finalizeWithTimeout(ctx, process); err != nil { + + if err := r.finalizeWithTimeout(process.FilterContext(ctx), process); err != nil { r.errChan <- errMeta{err: err, source: process} success = false } @@ -191,7 +192,7 @@ func (r *runner) unwindInitializers(ctx context.Context, beforeIndex int) bool { initializers := r.processes.GetInitializers() for i := beforeIndex - 1; i >= 0; i-- { - if err := r.finalizeWithTimeout(ctx, initializers[i]); err != nil { + if err := r.finalizeWithTimeout(initializers[i].FilterContext(ctx), initializers[i]); err != nil { // Parallel initializers may return multiple errors, so // we return all of them here. This check if asymmetric // as there is no equivalent for processes. @@ -307,7 +308,7 @@ func (r *runner) initProcessesAtPriorityIndex(ctx context.Context, config config r.logger.Info("Initializing processes at priority index %d", index) for _, process := range r.processes.GetProcessesAtPriorityIndex(index) { - if err := r.initWithTimeout(ctx, process, config); err != nil { + if err := r.initWithTimeout(process.FilterContext(ctx), process, config); err != nil { r.errChan <- errMeta{err: err, source: process} return false } @@ -530,7 +531,7 @@ func (r *runner) startProcess(ctx context.Context, process *ProcessMeta, abandon // and timeout behavior. errChan := makeErrChan(func() error { - ctx, cancelCtx := context.WithCancel(ctx) + ctx, cancelCtx := context.WithCancel(process.FilterContext(ctx)) process.setCancelCtx(cancelCtx) return process.Start(ctx) }) @@ -603,7 +604,7 @@ func (r *runner) stopProcessesAtPriorityIndex(ctx context.Context, index int) { go func(process *ProcessMeta) { defer r.wg.Done() - if err := r.stop(ctx, process); err != nil { + if err := r.stop(process.FilterContext(ctx), process); err != nil { r.errChan <- errMeta{err: err, source: process} } }(process) From 5767e8af3229c029bc0248e8734cd044147c52c2 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Wed, 30 Dec 2020 15:13:46 -0600 Subject: [PATCH 8/9] Fix flaky test. --- runner.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runner.go b/runner.go index 916105d..ecb6be9 100644 --- a/runner.go +++ b/runner.go @@ -3,6 +3,7 @@ package process import ( "context" "fmt" + "sort" "strings" "sync" "time" @@ -519,6 +520,7 @@ func (r *runner) getHealthDescriptions() []string { descriptions = append(descriptions, fmt.Sprintf("%s", reason.Key)) } + sort.Strings(descriptions) return descriptions } From 4a96a1e4cade716b936bc46f54f0ae8405122f23 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Wed, 30 Dec 2020 15:15:37 -0600 Subject: [PATCH 9/9] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0647a2..f570b9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - Added `Finalize` support to `Process` interface. [#5](https://github.com/go-nacelle/process/pull/5) +- Added `WithInitializerContextFilter` and `WithProcessContextFilter`. [#5](https://github.com/go-nacelle/process/pull/5) ### Changed