From 4e826ab0c02326a9394ac59aac5aa4bc112b01cf Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 16 Jun 2020 11:53:09 -0600 Subject: [PATCH 1/4] Update removedChecker to use countInterval --- zeroex/orderwatch/order_watcher.go | 46 +++++++++++++++++++----------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 5057caa25..150c96cc1 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -29,9 +29,14 @@ const ( // were not caught by the event watcher process. minCleanupInterval = 1 * time.Hour - // minRemovedCheckInterval specifies the minimum amount of time between checks - // on whether to remove orders flaggged for removal from the DB - minRemovedCheckInterval = 5 * time.Minute + // maxDeleteInterval specifies the maximum amount of time between calls to permanentlyDeleteStaleRemovedOrders + maxDeleteInterval = 5 * time.Minute + + // countInterval specifies the amount of time between calls to db.CountOrders. If the count is higher than a threshold, then permanentlyDeleteStaleRemovedOrders will be called, so this is the minimum interval between calls to permanentlyDeleteStaleRemovedOrders. + countInterval = 5 * time.Second + + // If, after a call to db.CountOrders, the database utilization exceeds databaseUtilizationThreshold, then permanentlyDeleteStaleRemovedOrders will be called. + databaseUtilizationThreshold = 0.5 // defaultLastUpdatedBuffer specifies how long it must have been since an order was // last updated in order to be re-validated by the cleanup worker @@ -146,22 +151,20 @@ func (w *Watcher) Watch(ctx context.Context) error { // A waitgroup lets us wait for all goroutines to exit. wg := &sync.WaitGroup{} + wg.Add(3) // Start some independent goroutines, each with a separate channel for communicating errors. mainLoopErrChan := make(chan error, 1) - wg.Add(1) go func() { defer wg.Done() mainLoopErrChan <- w.mainLoop(innerCtx) }() cleanupLoopErrChan := make(chan error, 1) - wg.Add(1) go func() { defer wg.Done() cleanupLoopErrChan <- w.cleanupLoop(innerCtx) }() removedCheckerLoopErrChan := make(chan error, 1) - wg.Add(1) go func() { defer wg.Done() removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx) @@ -264,20 +267,31 @@ func (w *Watcher) cleanupLoop(ctx context.Context) error { } func (w *Watcher) removedCheckerLoop(ctx context.Context) error { - for { - start := time.Now() - if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { - return err - } + lastDeleted := time.Now() + lastCounted := time.Now() + for { select { case <-ctx.Done(): return nil - // Wait minRemovedCheckInterval before calling permanentlyDeleteStaleRemovedOrders again. Since - // we only start waiting _after_ permanentlyDeleteStaleRemovedOrders completes, we will never - // have multiple calls to permanentlyDeleteStaleRemovedOrders running in parallel - case <-time.After(minRemovedCheckInterval - time.Since(start)): - continue + case <-time.After(maxDeleteInterval - time.Since(lastDeleted)): + if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { + return err + } + lastDeleted = time.Now() + case <-time.After(countInterval - time.Since(lastCounted)): + count, err := w.db.CountOrders(nil) + if err != nil { + return err + } + lastCounted = time.Now() + + if float64(count) / float64(w.maxOrders) > databaseUtilizationThreshold { + if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { + return err + } + lastDeleted = time.Now() + } } } } From 1641095028816d0d33532cfbfb7696242d046dc1 Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 16 Jun 2020 12:11:19 -0600 Subject: [PATCH 2/4] gofmt --- zeroex/orderwatch/order_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 150c96cc1..f2b3dbb54 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -286,7 +286,7 @@ func (w *Watcher) removedCheckerLoop(ctx context.Context) error { } lastCounted = time.Now() - if float64(count) / float64(w.maxOrders) > databaseUtilizationThreshold { + if float64(count)/float64(w.maxOrders) > databaseUtilizationThreshold { if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { return err } From 56054162c1b9e02202c9fbcc65f32dc381d1cc10 Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 23 Jun 2020 17:52:33 -0600 Subject: [PATCH 3/4] Requested changes --- go.mod | 1 + zeroex/orderwatch/order_watcher.go | 86 ++++++++---------------------- 2 files changed, 24 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index d5f5715c8..98a9361d9 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.1.0 golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 + golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index f2b3dbb54..f9e3320ed 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -21,6 +21,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" logger "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const ( @@ -32,8 +33,8 @@ const ( // maxDeleteInterval specifies the maximum amount of time between calls to permanentlyDeleteStaleRemovedOrders maxDeleteInterval = 5 * time.Minute - // countInterval specifies the amount of time between calls to db.CountOrders. If the count is higher than a threshold, then permanentlyDeleteStaleRemovedOrders will be called, so this is the minimum interval between calls to permanentlyDeleteStaleRemovedOrders. - countInterval = 5 * time.Second + // checkDatabaseUtilizationThresholdInterval specifies the amount of time between calls to db.CountOrders. If the count is higher than a threshold, then permanentlyDeleteStaleRemovedOrders will be called, so this is the minimum interval between calls to permanentlyDeleteStaleRemovedOrders. + checkDatabaseUtilizationThresholdInterval = 5 * time.Second // If, after a call to db.CountOrders, the database utilization exceeds databaseUtilizationThreshold, then permanentlyDeleteStaleRemovedOrders will be called. databaseUtilizationThreshold = 0.5 @@ -144,60 +145,22 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - // Create a child context so that we can preemptively cancel if there is an - // error. - innerCtx, cancel := context.WithCancel(ctx) - defer cancel() - - // A waitgroup lets us wait for all goroutines to exit. - wg := &sync.WaitGroup{} - wg.Add(3) - - // Start some independent goroutines, each with a separate channel for communicating errors. - mainLoopErrChan := make(chan error, 1) - go func() { - defer wg.Done() - mainLoopErrChan <- w.mainLoop(innerCtx) - }() - cleanupLoopErrChan := make(chan error, 1) - go func() { - defer wg.Done() - cleanupLoopErrChan <- w.cleanupLoop(innerCtx) - }() - removedCheckerLoopErrChan := make(chan error, 1) - go func() { - defer wg.Done() - removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx) - }() - - // If any error channel returns a non-nil error, we cancel the inner context - // and return the error. Note that this means we only return the first error - // that occurs. - select { - case err := <-mainLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher mainLoop") - cancel() - return err - } - case err := <-cleanupLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher cleanupLoop") - cancel() - return err - } - case err := <-removedCheckerLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher removedCheckerLoop") - cancel() + g, innerCtx := errgroup.WithContext(ctx) + namedLoops := []struct { + loop func(context.Context) error + name string + }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}} + for _, namedLoop := range namedLoops { + g.Go(func() error { + err := namedLoop.loop(innerCtx) + if err != nil { + logger.WithError(err).Error(fmt.Sprintf("error in orderwatcher %s", namedLoop.name)) + } return err - } + }) } - - // Wait for all goroutines to exit. If we reached here it means we are done - // and there are no errors. - wg.Wait() - return nil + // Wait for all loops to return nil, or for any loop to return an error. + return g.Wait() } func (w *Watcher) mainLoop(ctx context.Context) error { @@ -267,26 +230,23 @@ func (w *Watcher) cleanupLoop(ctx context.Context) error { } func (w *Watcher) removedCheckerLoop(ctx context.Context) error { + if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { + return err + } lastDeleted := time.Now() - lastCounted := time.Now() for { select { case <-ctx.Done(): return nil - case <-time.After(maxDeleteInterval - time.Since(lastDeleted)): - if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { - return err - } - lastDeleted = time.Now() - case <-time.After(countInterval - time.Since(lastCounted)): + case <-time.After(checkDatabaseUtilizationThresholdInterval): count, err := w.db.CountOrders(nil) if err != nil { return err } - lastCounted = time.Now() + databaseUtilization := float64(count) / float64(w.maxOrders) - if float64(count)/float64(w.maxOrders) > databaseUtilizationThreshold { + if time.Since(lastDeleted) > maxDeleteInterval || databaseUtilization > databaseUtilizationThreshold { if err := w.permanentlyDeleteStaleRemovedOrders(ctx); err != nil { return err } From 5d54542058d4391027b3e5bcf4df397e254688ac Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Wed, 24 Jun 2020 11:46:12 -0600 Subject: [PATCH 4/4] Revert using errgroup --- go.mod | 1 - zeroex/orderwatch/order_watcher.go | 69 +++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 98a9361d9..d5f5715c8 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,6 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.1.0 golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 - golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index f9e3320ed..e7c9d19ae 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -21,7 +21,6 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" logger "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" ) const ( @@ -145,22 +144,62 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - g, innerCtx := errgroup.WithContext(ctx) - namedLoops := []struct { - loop func(context.Context) error - name string - }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}} - for _, namedLoop := range namedLoops { - g.Go(func() error { - err := namedLoop.loop(innerCtx) - if err != nil { - logger.WithError(err).Error(fmt.Sprintf("error in orderwatcher %s", namedLoop.name)) - } + // Create a child context so that we can preemptively cancel if there is an + // error. + innerCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // A waitgroup lets us wait for all goroutines to exit. + wg := &sync.WaitGroup{} + + // Start some independent goroutines, each with a separate channel for communicating errors. + mainLoopErrChan := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + mainLoopErrChan <- w.mainLoop(innerCtx) + }() + cleanupLoopErrChan := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + cleanupLoopErrChan <- w.cleanupLoop(innerCtx) + }() + removedCheckerLoopErrChan := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx) + }() + + // If any error channel returns a non-nil error, we cancel the inner context + // and return the error. Note that this means we only return the first error + // that occurs. + select { + case err := <-mainLoopErrChan: + if err != nil { + logger.WithError(err).Error("error in orderwatcher mainLoop") + cancel() return err - }) + } + case err := <-cleanupLoopErrChan: + if err != nil { + logger.WithError(err).Error("error in orderwatcher cleanupLoop") + cancel() + return err + } + case err := <-removedCheckerLoopErrChan: + if err != nil { + logger.WithError(err).Error("error in orderwatcher removedCheckerLoop") + cancel() + return err + } } - // Wait for all loops to return nil, or for any loop to return an error. - return g.Wait() + + // Wait for all goroutines to exit. If we reached here it means we are done + // and there are no errors. + wg.Wait() + return nil } func (w *Watcher) mainLoop(ctx context.Context) error {