From cc271fdb4c727b271d8ea06ccc1fa3751abe9d60 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Mon, 4 Jan 2021 16:07:40 +0100 Subject: [PATCH 01/16] Resubscription mechanism with alerting and backoff The mechanism wraps event.Resubscribe mechanism from go-ethereum adding alterting if the time between resubscription attemps is shorter than the specific threshold. This mechanism will replace the one we currently have in the generated code, adding alerting and backoff. --- pkg/chain/ethereum/ethutil/resubscribe.go | 39 ++++++ .../ethereum/ethutil/resubscribe_test.go | 121 ++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 pkg/chain/ethereum/ethutil/resubscribe.go create mode 100644 pkg/chain/ethereum/ethutil/resubscribe_test.go diff --git a/pkg/chain/ethereum/ethutil/resubscribe.go b/pkg/chain/ethereum/ethutil/resubscribe.go new file mode 100644 index 0000000..9f28c58 --- /dev/null +++ b/pkg/chain/ethereum/ethutil/resubscribe.go @@ -0,0 +1,39 @@ +package ethutil + +import ( + "context" + "time" + "github.com/ethereum/go-ethereum/event" +) + +// WithResubscription wraps the resubscription function to call it repeatedly +// to keep a subscription established. When subscription is established, it is +// monitored and in the case of a failure, resubscription is attempted by +// calling the function again. +// +// The mechanism applies backoff between calls to resubscribe function. +// The time between calls is adapted based on the error rate, but will never +// exceed backoffMax. +// +// The mechanism monitors the time elapsed between resubscription attempts and +// if it is shorter than the specificed alertThreshold, it calls the alertFn +// to alarm about potential problems with the stability of the subscription. +func WithResubscription( + backoffMax time.Duration, + resubscribeFn event.ResubscribeFunc, + alertThreshold time.Duration, + alertFn func(), +) event.Subscription { + lastAttempt := time.Time{} + wrappedResubscribeFn := func(ctx context.Context) (event.Subscription, error) { + now := time.Now() + elapsed := now.Sub(lastAttempt) + if elapsed < alertThreshold { + alertFn() + } + lastAttempt = now + return resubscribeFn(ctx) + } + + return event.Resubscribe(backoffMax, wrappedResubscribeFn) +} diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go new file mode 100644 index 0000000..34497e7 --- /dev/null +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -0,0 +1,121 @@ +package ethutil + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/event" +) + +func TestResubscribeAboveThreshold(t *testing.T) { + backoffMax := 100 * time.Millisecond + alertThreshold := 100 * time.Millisecond + + alertFnCalls := 0 + alertFn := func() { + alertFnCalls++ + } + + subscriptionFailures := 3 + resubscribeFnCalls := 0 + resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + resubscribeFnCalls++ + time.Sleep(150 * time.Millisecond) // 150 > 100, above alert threshold + if resubscribeFnCalls <= subscriptionFailures { + return nil, fmt.Errorf("this is the way") + } + delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { + return nil + }) + return delegate, nil + } + + subscription := WithResubscription( + backoffMax, + resubscribeFn, + alertThreshold, + alertFn, + ) + <-subscription.Err() + + // No calls to alertFn expected. Alert threshold is set to 100ms and no + // there were no resubscription attempts in a time shorter than 150ms one + // after another. + if alertFnCalls != 0 { + t.Fatalf("alert triggered [%v] times, expected none", alertFnCalls) + } + + // Subscription failed `subscriptionFailures` times and resubscription + // function should be called `subscriptionFailures + 1` times - one time + // for each failure and one time at the end - that subscription was + // successful and had not to be retried. + expectedResubscriptionCalls := subscriptionFailures + 1 + if resubscribeFnCalls != expectedResubscriptionCalls { + t.Fatalf( + "resubscription called [%v] times, expected [%v]", + resubscribeFnCalls, + expectedResubscriptionCalls, + ) + } +} + +func TestResubscribeBelowThreshold(t *testing.T) { + backoffMax := 50 * time.Millisecond + alertThreshold := 100 * time.Millisecond + + alertFnCalls := 0 + alertFn := func() { + alertFnCalls++ + } + + subscriptionFailures := 5 + resubscribeFnCalls := 0 + resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + resubscribeFnCalls++ + time.Sleep(50 * time.Millisecond) // 50 < 100, below alert threshold + if resubscribeFnCalls <= subscriptionFailures { + return nil, fmt.Errorf("i have spoken") + } + delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { + return nil + }) + return delegate, nil + } + + subscription := WithResubscription( + backoffMax, + resubscribeFn, + alertThreshold, + alertFn, + ) + <-subscription.Err() + + fmt.Printf("resubscribe count = [%v]\n", resubscribeFnCalls) + // Alert function should be called for each subscription failure if the time + // between the previous resubscription was shorter than the threshold. + // In this test, alert threshold is set to 100ms and delays between failures + // are just 50ms. + expectedAlertFnCalls := subscriptionFailures + if alertFnCalls != expectedAlertFnCalls { + t.Fatalf( + "alert triggered [%v] times, expected [%v]", + alertFnCalls, + expectedAlertFnCalls, + ) + } + + // Subscription failed `subscriptionFailures` times and resubscription + // function should be called `subscriptionFailures + 1` times - one time + // for each failure and one time at the end - that subscription was + // successful and had not to be retried. + expectedResubscriptionCalls := subscriptionFailures + 1 + if resubscribeFnCalls != expectedResubscriptionCalls { + t.Fatalf( + "resubscription called [%v] times, expected [%v]", + resubscribeFnCalls, + expectedResubscriptionCalls, + ) + } +} From 93769e72e76fb9308d21001755b406153faa7476 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Tue, 5 Jan 2021 22:48:16 +0100 Subject: [PATCH 02/16] Accept subscriptionFailed and thresholdViolated in WithResubscription The mechanism monitors the time elapsed between resubscription attempts and if it is shorter than the specificed alertThreshold, it writes the time elapsed between resubscription attempts to the thresholdViolated channel to alarm about potential problems with the stability of the subscription. Every error returned by the wrapped subscription function, is written to subscriptionFailed channel. If the calling code is interested in reading from thresholdViolated and/or subscriptionFailed channel, appropriate readers need to be set up *before* WithResubscription is called. Writes to thresholdViolated and subscriptionFailed channels are non-blocking and are not stopping resubscription attempts. Those two channels will let the client code log warning/errors or even kill the subscription in case alrt threshold is violated too many times. --- pkg/chain/ethereum/ethutil/resubscribe.go | 46 +++- .../ethereum/ethutil/resubscribe_test.go | 216 +++++++++++++++--- 2 files changed, 215 insertions(+), 47 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/resubscribe.go b/pkg/chain/ethereum/ethutil/resubscribe.go index 9f28c58..440780e 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe.go +++ b/pkg/chain/ethereum/ethutil/resubscribe.go @@ -6,33 +6,57 @@ import ( "github.com/ethereum/go-ethereum/event" ) -// WithResubscription wraps the resubscription function to call it repeatedly -// to keep a subscription established. When subscription is established, it is -// monitored and in the case of a failure, resubscription is attempted by -// calling the function again. +// WithResubscription wraps the subscribe function to call it repeatedly +// to keep a subscription alive. When a subscription is established, it is +// monitored and in the case of a failure, resubscribe is attempted by +// calling the subscribe function again. // -// The mechanism applies backoff between calls to resubscribe function. +// The mechanism applies backoff between resubscription attempts. // The time between calls is adapted based on the error rate, but will never // exceed backoffMax. // // The mechanism monitors the time elapsed between resubscription attempts and -// if it is shorter than the specificed alertThreshold, it calls the alertFn -// to alarm about potential problems with the stability of the subscription. +// if it is shorter than the specificed alertThreshold, it writes the time +// elapsed between resubscription attempts to the thresholdViolated channel to +// alarm about potential problems with the stability of the subscription. +// +// Every error returned by the wrapped subscription function, is written to +// subscriptionFailed channel. +// +// If the calling code is interested in reading from thresholdViolated and/or +// subscriptionFailed channel, appropriate readers need to be set up *before* +// WithResubscription is called. +// +// Writes to thresholdViolated and subscriptionFailed channels are non-blocking +// and are not stopping resubscription attempts. func WithResubscription( backoffMax time.Duration, - resubscribeFn event.ResubscribeFunc, + resubscribeFn event.ResubscribeFunc, alertThreshold time.Duration, - alertFn func(), + thresholdViolated chan<- time.Duration, + subscriptionFailed chan<- error, ) event.Subscription { lastAttempt := time.Time{} wrappedResubscribeFn := func(ctx context.Context) (event.Subscription, error) { now := time.Now() elapsed := now.Sub(lastAttempt) if elapsed < alertThreshold { - alertFn() + select { + case thresholdViolated <- elapsed: + default: + } } + lastAttempt = now - return resubscribeFn(ctx) + + sub, err := resubscribeFn(ctx) + if err != nil { + select { + case subscriptionFailed <- err: + default: + } + } + return sub, err } return event.Resubscribe(backoffMax, wrappedResubscribeFn) diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go index 34497e7..4400a4d 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe_test.go +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -9,21 +9,66 @@ import ( "github.com/ethereum/go-ethereum/event" ) -func TestResubscribeAboveThreshold(t *testing.T) { +func TestEmitOriginalError(t *testing.T) { backoffMax := 100 * time.Millisecond alertThreshold := 100 * time.Millisecond - alertFnCalls := 0 - alertFn := func() { - alertFnCalls++ + failedOnce := false + resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + if !failedOnce { + failedOnce = true + return nil, fmt.Errorf("wherever I go, he goes") + } + delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { + return nil + }) + return delegate, nil + } + + thresholdViolated := make(chan time.Duration, 10) + subscriptionFailed := make(chan error, 10) + subscription := WithResubscription( + backoffMax, + resubscribeFn, + alertThreshold, + thresholdViolated, + subscriptionFailed, + ) + <-subscription.Err() + + // Subscription failed one time so there should be one error in the channel. + subscriptionFailCount := len(subscriptionFailed) + if subscriptionFailCount != 1 { + t.Fatalf( + "subscription failure reported [%v] times, expected [1]", + subscriptionFailCount, + ) + } + + // That failure should refer the original error. + expectedFailMessage := "wherever I go, he goes" + err := <-subscriptionFailed + if err.Error() != expectedFailMessage { + t.Fatalf( + "unexpected subscription error message\nexpected: [%v]\nactual: [%v]", + expectedFailMessage, + err.Error(), + ) } +} + +func TestResubscribeAboveThreshold(t *testing.T) { + backoffMax := 100 * time.Millisecond + alertThreshold := 100 * time.Millisecond + + plannedSubscriptionFailures := 3 + elapsedBetweenFailures := 150 * time.Millisecond - subscriptionFailures := 3 resubscribeFnCalls := 0 resubscribeFn := func(ctx context.Context) (event.Subscription, error) { resubscribeFnCalls++ - time.Sleep(150 * time.Millisecond) // 150 > 100, above alert threshold - if resubscribeFnCalls <= subscriptionFailures { + time.Sleep(elapsedBetweenFailures) // 150ms > 100ms, above alert threshold + if resubscribeFnCalls <= plannedSubscriptionFailures { return nil, fmt.Errorf("this is the way") } delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { @@ -32,26 +77,33 @@ func TestResubscribeAboveThreshold(t *testing.T) { return delegate, nil } + thresholdViolated := make(chan time.Duration, 10) + subscriptionFailed := make(chan error, 10) subscription := WithResubscription( backoffMax, resubscribeFn, alertThreshold, - alertFn, + thresholdViolated, + subscriptionFailed, ) <-subscription.Err() - // No calls to alertFn expected. Alert threshold is set to 100ms and no - // there were no resubscription attempts in a time shorter than 150ms one - // after another. - if alertFnCalls != 0 { - t.Fatalf("alert triggered [%v] times, expected none", alertFnCalls) + // Nothing expected in thresholdViolated channel. + // Alert threshold is set to 100ms and there were no resubscription attempts + // in a time shorter than 150ms one after another. + violationCount := len(thresholdViolated) + if violationCount != 0 { + t.Fatalf( + "threshold violation reported [%v] times, expected none", + violationCount, + ) } - // Subscription failed `subscriptionFailures` times and resubscription - // function should be called `subscriptionFailures + 1` times - one time + // Subscription failed plannedSubscriptionFailures times and resubscription + // function should be called plannedSubscriptionFailures + 1 times. One time // for each failure and one time at the end - that subscription was // successful and had not to be retried. - expectedResubscriptionCalls := subscriptionFailures + 1 + expectedResubscriptionCalls := plannedSubscriptionFailures + 1 if resubscribeFnCalls != expectedResubscriptionCalls { t.Fatalf( "resubscription called [%v] times, expected [%v]", @@ -59,23 +111,30 @@ func TestResubscribeAboveThreshold(t *testing.T) { expectedResubscriptionCalls, ) } + + // Expect all subscription failures to be reported. + subscriptionFailCount := len(subscriptionFailed) + if subscriptionFailCount != plannedSubscriptionFailures { + t.Fatalf( + "subscription failure reported [%v] times, expected [%v]", + subscriptionFailCount, + plannedSubscriptionFailures, + ) + } } func TestResubscribeBelowThreshold(t *testing.T) { backoffMax := 50 * time.Millisecond alertThreshold := 100 * time.Millisecond - alertFnCalls := 0 - alertFn := func() { - alertFnCalls++ - } + plannedSubscriptionFailures := 5 + elapsedBetweenFailures := 50 * time.Millisecond - subscriptionFailures := 5 resubscribeFnCalls := 0 resubscribeFn := func(ctx context.Context) (event.Subscription, error) { resubscribeFnCalls++ - time.Sleep(50 * time.Millisecond) // 50 < 100, below alert threshold - if resubscribeFnCalls <= subscriptionFailures { + time.Sleep(elapsedBetweenFailures) // 50ms < 100ms, below alert threshold + if resubscribeFnCalls <= plannedSubscriptionFailures { return nil, fmt.Errorf("i have spoken") } delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { @@ -84,33 +143,118 @@ func TestResubscribeBelowThreshold(t *testing.T) { return delegate, nil } + thresholdViolated := make(chan time.Duration, 10) + subscriptionFailed := make(chan error, 10) subscription := WithResubscription( backoffMax, resubscribeFn, alertThreshold, - alertFn, + thresholdViolated, + subscriptionFailed, ) <-subscription.Err() - fmt.Printf("resubscribe count = [%v]\n", resubscribeFnCalls) - // Alert function should be called for each subscription failure if the time - // between the previous resubscription was shorter than the threshold. + // Threshold violaton should be reported for each subscription failure if + // the time elapsed since the previous resubscription was shorter than the + // threshold. // In this test, alert threshold is set to 100ms and delays between failures - // are just 50ms. - expectedAlertFnCalls := subscriptionFailures - if alertFnCalls != expectedAlertFnCalls { + // are just 50ms. Thus, we expect the same number of threshold violations as + // resubscription attempts. + violationCount := len(thresholdViolated) + if violationCount != plannedSubscriptionFailures { t.Fatalf( - "alert triggered [%v] times, expected [%v]", - alertFnCalls, - expectedAlertFnCalls, + "threshold violation reported [%v] times, expected [%v]", + violationCount, + plannedSubscriptionFailures, ) } - // Subscription failed `subscriptionFailures` times and resubscription - // function should be called `subscriptionFailures + 1` times - one time + // All violations reported should have correct values - all of them should + // be longer than the time elapsed between failures and shorter than the + // alert threshold. It is not possible to assert on a precise value. + for i := 0; i < violationCount; i++ { + violation := <-thresholdViolated + if violation < elapsedBetweenFailures { + t.Fatalf( + "violation reported should be longer than the time elapsed "+ + "between failures; is: [%v] and should be longer than [%v]", + violation, + elapsedBetweenFailures, + ) + } + if violation > alertThreshold { + t.Fatalf( + "violation reported should be shorter than the alert threshold; "+ + "; is: [%v] and should be shorter than [%v]", + violation, + alertThreshold, + ) + } + } + + // Subscription failed plannedSubscriptionFailures times and resubscription + // function should be called plannedSubscriptionFailures + 1 times. One time // for each failure and one time at the end - that subscription was // successful and had not to be retried. - expectedResubscriptionCalls := subscriptionFailures + 1 + expectedResubscriptionCalls := plannedSubscriptionFailures + 1 + if resubscribeFnCalls != expectedResubscriptionCalls { + t.Fatalf( + "resubscription called [%v] times, expected [%v]", + resubscribeFnCalls, + expectedResubscriptionCalls, + ) + } + + // Expect all subscription failures to be reported. + subscriptionFailCount := len(subscriptionFailed) + if subscriptionFailCount != plannedSubscriptionFailures { + t.Fatalf( + "subscription failure reported [%v] times, expected [%v]", + subscriptionFailCount, + plannedSubscriptionFailures, + ) + } +} + +func TestDoNotBlockOnChannelWrites(t *testing.T) { + backoffMax := 50 * time.Millisecond + alertThreshold := 100 * time.Millisecond + + plannedSubscriptionFailures := 5 + elapsedBetweenFailures := 10 * time.Millisecond + + resubscribeFnCalls := 0 + resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + resubscribeFnCalls++ + time.Sleep(elapsedBetweenFailures) // 10ms < 100ms, below alert threshold + if resubscribeFnCalls <= plannedSubscriptionFailures { + return nil, fmt.Errorf("Groku?") + } + delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { + return nil + }) + return delegate, nil + } + + // Non-buffered channels with no receivers + thresholdViolated := make(chan time.Duration) + subscriptionFailed := make(chan error) + + subscription := WithResubscription( + backoffMax, + resubscribeFn, + alertThreshold, + thresholdViolated, + subscriptionFailed, + ) + <-subscription.Err() + + // Subscription failed plannedSubscriptionFailures times and resubscription + // function should be called plannedSubscriptionFailures + 1 times. One time + // for each failure and one time at the end - that subscription was + // successful and had not to be retried. No resubscription attempt should be + // blocked by the lack of channel receivers on non-buffered channels. + expectedResubscriptionCalls := plannedSubscriptionFailures + 1 if resubscribeFnCalls != expectedResubscriptionCalls { t.Fatalf( "resubscription called [%v] times, expected [%v]", From e1a10bc29bb8a63d493d05d4e07a36452c4a3bde Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 7 Jan 2021 12:43:41 +0100 Subject: [PATCH 03/16] Updated contract bindings to use ethutil.WithResubscription So far, all the resubscription logic was placed in the generated contract bindings. The new ethutil.WithResubscription function takes care of resubscription, having backoff and proper alerting implemented. --- tools/generators/ethereum/contract.go.tmpl | 15 ++ .../ethereum/contract_events.go.tmpl | 137 +++++------------- .../contract_events_template_content.go | 137 +++++------------- .../ethereum/contract_template_content.go | 15 ++ 4 files changed, 110 insertions(+), 194 deletions(-) diff --git a/tools/generators/ethereum/contract.go.tmpl b/tools/generators/ethereum/contract.go.tmpl index ab459c4..effddeb 100644 --- a/tools/generators/ethereum/contract.go.tmpl +++ b/tools/generators/ethereum/contract.go.tmpl @@ -25,6 +25,21 @@ import ( // included or excluded from logging at startup by name. var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}") +const ( + // Maximum backoff time between event resubscription attempts. + {{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute + + // Threshold below which event resubscription emits an error to the logs. + // WS connection can be dropped at any moment and event resubscription will + // follow. However, if WS connection for event subscription is getting + // dropped too often, it may indicate something is wrong with Ethereum + // client. This constant defines the minimum lifetime of an event + // subscription required before the subscription failure happens and + // resubscription follows so that the resubscription does not emit an error + // to the logs alerting about potential problems with Ethereum client. + {{.ShortVar}}SubscriptionAlertThreshold = 5 * time.Minute +) + type {{.Class}} struct { contract *abi.{{.AbiClass}} contractAddress common.Address diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index babdecc..f5445e3 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -37,119 +37,62 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( success {{$contract.FullVar}}{{$event.CapsName}}Func, - fail func(err error) error, {{$event.IndexedFilterDeclarations -}} -) (subscription.EventSubscription, error) { - errorChan := make(chan error) - unsubscribeChan := make(chan struct{}) +) (subscription.EventSubscription) { + eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) + thresholdViolated := make(chan time.Duration) + subscriptionFailed := make(chan error) - // Delay which must be preserved before a new resubscription attempt. - // There is no sense to resubscribe immediately after the fail of current - // subscription because the publisher must have some time to recover. - retryDelay := 5 * time.Second + ctx, cancel := context.WithCancel(context.Background()) - watch := func() { - failCallback := func(err error) error { - fail(err) - errorChan <- err // trigger resubscription signal - return err - } - - subscription, err := {{$contract.ShortVar}}.subscribe{{$event.CapsName}}( - success, - failCallback, - {{$event.IndexedFilters}} - ) - if err != nil { - errorChan <- err // trigger resubscription signal - return - } - - // wait for unsubscription signal - <-unsubscribeChan - subscription.Unsubscribe() - } - - // trigger the resubscriber goroutine - go func() { - go watch() // trigger first subscription - - for { - select { - case <-errorChan: - {{$logger}}.Warning( - "subscription to event {{$event.CapsName}} terminated with error; " + - "resubscription attempt will be performed after the retry delay", - ) - time.Sleep(retryDelay) - go watch() - case <-unsubscribeChan: - // shutdown the resubscriber goroutine on unsubscribe signal - return - } - } - }() - - // closing the unsubscribeChan will trigger a unsubscribe signal and - // run unsubscription for all subscription instances - unsubscribeCallback := func() { - close(unsubscribeChan) - } - - return subscription.NewEventSubscription(unsubscribeCallback), nil -} - -func ({{$contract.ShortVar}} *{{$contract.Class}}) subscribe{{$event.CapsName}}( - success {{$contract.FullVar}}{{$event.CapsName}}Func, - fail func(err error) error, - {{$event.IndexedFilterDeclarations -}} -) (subscription.EventSubscription, error) { - eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - eventSubscription, err := {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}( - nil, - eventChan, - {{$event.IndexedFilters}} - ) - if err != nil { - close(eventChan) - return eventSubscription, fmt.Errorf( - "error creating watch for {{$event.CapsName}} events: [%v]", - err, - ) - } - - var subscriptionMutex = &sync.Mutex{} go func() { for { select { - case event, subscribed := <-eventChan: - subscriptionMutex.Lock() - // if eventChan has been closed, it means we have unsubscribed - if !subscribed { - subscriptionMutex.Unlock() - return - } + case <-ctx.Done(): + return + case event := <-eventOccurred: success( {{$event.ParamExtractors}} ) - subscriptionMutex.Unlock() - case ee := <-eventSubscription.Err(): - fail(ee) - return + case violation := <-thresholdViolated: + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} had to be "+ + "retried [%v] since the last attempt; please inspect "+ + "Ethereum client connectivity", + violation, + ) + case err := <-subscriptionFailed: + {{$logger}}.Warningf( + "subscription to event {{$event.CapsName}} failed "+ + "with error: [%v]; resubscription attempt will be "+ + "performed", + err, + ) } } }() - unsubscribeCallback := func() { - subscriptionMutex.Lock() - defer subscriptionMutex.Unlock() - - eventSubscription.Unsubscribe() - close(eventChan) + subscribeFn := func(ctx context.Context) (event.Subscription, error) { + return {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}( + &bind.WatchOpts{Context: ctx}, + eventOccurred, + {{$event.IndexedFilters}} + ) } - return subscription.NewEventSubscription(unsubscribeCallback), nil + sub := ethutil.WithResubscription( + {{$contract.ShortVar}}SubscriptionBackoffMax, + subscribeFn, + {{$contract.ShortVar}}SubscriptionAlertThreshold, + thresholdViolated, + subscriptionFailed, + ) + + return subscription.NewEventSubscription(func() { + sub.Unsubscribe() + cancel() + }) } {{- end -}} \ No newline at end of file diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 76aa852..7c1c0a9 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -40,119 +40,62 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( success {{$contract.FullVar}}{{$event.CapsName}}Func, - fail func(err error) error, {{$event.IndexedFilterDeclarations -}} -) (subscription.EventSubscription, error) { - errorChan := make(chan error) - unsubscribeChan := make(chan struct{}) +) (subscription.EventSubscription) { + eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) + thresholdViolated := make(chan time.Duration) + subscriptionFailed := make(chan error) - // Delay which must be preserved before a new resubscription attempt. - // There is no sense to resubscribe immediately after the fail of current - // subscription because the publisher must have some time to recover. - retryDelay := 5 * time.Second + ctx, cancel := context.WithCancel(context.Background()) - watch := func() { - failCallback := func(err error) error { - fail(err) - errorChan <- err // trigger resubscription signal - return err - } - - subscription, err := {{$contract.ShortVar}}.subscribe{{$event.CapsName}}( - success, - failCallback, - {{$event.IndexedFilters}} - ) - if err != nil { - errorChan <- err // trigger resubscription signal - return - } - - // wait for unsubscription signal - <-unsubscribeChan - subscription.Unsubscribe() - } - - // trigger the resubscriber goroutine - go func() { - go watch() // trigger first subscription - - for { - select { - case <-errorChan: - {{$logger}}.Warning( - "subscription to event {{$event.CapsName}} terminated with error; " + - "resubscription attempt will be performed after the retry delay", - ) - time.Sleep(retryDelay) - go watch() - case <-unsubscribeChan: - // shutdown the resubscriber goroutine on unsubscribe signal - return - } - } - }() - - // closing the unsubscribeChan will trigger a unsubscribe signal and - // run unsubscription for all subscription instances - unsubscribeCallback := func() { - close(unsubscribeChan) - } - - return subscription.NewEventSubscription(unsubscribeCallback), nil -} - -func ({{$contract.ShortVar}} *{{$contract.Class}}) subscribe{{$event.CapsName}}( - success {{$contract.FullVar}}{{$event.CapsName}}Func, - fail func(err error) error, - {{$event.IndexedFilterDeclarations -}} -) (subscription.EventSubscription, error) { - eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - eventSubscription, err := {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}( - nil, - eventChan, - {{$event.IndexedFilters}} - ) - if err != nil { - close(eventChan) - return eventSubscription, fmt.Errorf( - "error creating watch for {{$event.CapsName}} events: [%v]", - err, - ) - } - - var subscriptionMutex = &sync.Mutex{} go func() { for { select { - case event, subscribed := <-eventChan: - subscriptionMutex.Lock() - // if eventChan has been closed, it means we have unsubscribed - if !subscribed { - subscriptionMutex.Unlock() - return - } + case <-ctx.Done(): + return + case event := <-eventOccurred: success( {{$event.ParamExtractors}} ) - subscriptionMutex.Unlock() - case ee := <-eventSubscription.Err(): - fail(ee) - return + case violation := <-thresholdViolated: + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} had to be "+ + "retried [%v] since the last attempt; please inspect "+ + "Ethereum client connectivity", + violation, + ) + case err := <-subscriptionFailed: + {{$logger}}.Warningf( + "subscription to event {{$event.CapsName}} failed "+ + "with error: [%v]; resubscription attempt will be "+ + "performed", + err, + ) } } }() - unsubscribeCallback := func() { - subscriptionMutex.Lock() - defer subscriptionMutex.Unlock() - - eventSubscription.Unsubscribe() - close(eventChan) + subscribeFn := func(ctx context.Context) (event.Subscription, error) { + return {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}( + &bind.WatchOpts{Context: ctx}, + eventOccurred, + {{$event.IndexedFilters}} + ) } - return subscription.NewEventSubscription(unsubscribeCallback), nil + sub := ethutil.WithResubscription( + {{$contract.ShortVar}}SubscriptionBackoffMax, + subscribeFn, + {{$contract.ShortVar}}SubscriptionAlertThreshold, + thresholdViolated, + subscriptionFailed, + ) + + return subscription.NewEventSubscription(func() { + sub.Unsubscribe() + cancel() + }) } {{- end -}}` diff --git a/tools/generators/ethereum/contract_template_content.go b/tools/generators/ethereum/contract_template_content.go index e29e0b8..82c9b5e 100644 --- a/tools/generators/ethereum/contract_template_content.go +++ b/tools/generators/ethereum/contract_template_content.go @@ -28,6 +28,21 @@ import ( // included or excluded from logging at startup by name. var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}") +const ( + // Maximum backoff time between event resubscription attempts. + {{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute + + // Threshold below which event resubscription emits an error to the logs. + // WS connection can be dropped at any moment and event resubscription will + // follow. However, if WS connection for event subscription is getting + // dropped too often, it may indicate something is wrong with Ethereum + // client. This constant defines the minimum lifetime of an event + // subscription required before the subscription failure happens and + // resubscription follows so that the resubscription does not emit an error + // to the logs alerting about potential problems with Ethereum client. + {{.ShortVar}}SubscriptionAlertThreshold = 5 * time.Minute +) + type {{.Class}} struct { contract *abi.{{.AbiClass}} contractAddress common.Address From e718dc3917e094cf48eca16ab30bb0aa0b56b609 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 7 Jan 2021 15:03:10 +0100 Subject: [PATCH 04/16] Log subscription failed message on error level This message is emitted when a subscription failed immediatelly afters it's been requested. It is not the case when some long-living connection has been dropped by a load balancer. In this case, it makes sense to log the problem on error level. --- .../ethereum/contract_events_template_content.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 7c1c0a9..8318ecf 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -66,10 +66,9 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( violation, ) case err := <-subscriptionFailed: - {{$logger}}.Warningf( - "subscription to event {{$event.CapsName}} failed "+ - "with error: [%v]; resubscription attempt will be "+ - "performed", + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} failed: [%v]; "+ + "resubscription attempt will be performed", err, ) } From 2d19dfff38cdf6e5af640023cf9dad34a7d7e312 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 7 Jan 2021 15:07:01 +0100 Subject: [PATCH 05/16] Increased subscription alert threshold to 15 minutes This value is still quite experimental but 15 minutes is a stronger requirement than 5 minutes and if the client can't keep WS connection alive for at least 15 minutes it is better to inspect it. --- tools/generators/ethereum/contract.go.tmpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/generators/ethereum/contract.go.tmpl b/tools/generators/ethereum/contract.go.tmpl index effddeb..102bc78 100644 --- a/tools/generators/ethereum/contract.go.tmpl +++ b/tools/generators/ethereum/contract.go.tmpl @@ -37,7 +37,7 @@ const ( // subscription required before the subscription failure happens and // resubscription follows so that the resubscription does not emit an error // to the logs alerting about potential problems with Ethereum client. - {{.ShortVar}}SubscriptionAlertThreshold = 5 * time.Minute + {{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute ) type {{.Class}} struct { From 992b2f9c5567052ffd3d8be97c40d37f5a6616ca Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Tue, 12 Jan 2021 13:33:53 +0100 Subject: [PATCH 06/16] Expect threshold violated and subscription error function callbacks Instead of accepting channels for altert threshold violation and resubscribe function failure notifications, WithResubscription function accepts two callback functions now. Channels were used initially because the generated Watch* code had a for-select section routing events from a channel to handler passed as a Watch* function parameter. This behavior will be eliminated in the next commit and Watch* function will accept a channel as a parameter and will not be doing any forwarding from channel to handler. Given that, two function handlers accepted by WithResubscription will be much more convenient. --- pkg/chain/ethereum/ethutil/resubscribe.go | 38 ++++++++----------- .../ethereum/ethutil/resubscribe_test.go | 33 +++++++++++----- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/resubscribe.go b/pkg/chain/ethereum/ethutil/resubscribe.go index 440780e..b2af62e 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe.go +++ b/pkg/chain/ethereum/ethutil/resubscribe.go @@ -3,6 +3,7 @@ package ethutil import ( "context" "time" + "github.com/ethereum/go-ethereum/event" ) @@ -16,45 +17,36 @@ import ( // exceed backoffMax. // // The mechanism monitors the time elapsed between resubscription attempts and -// if it is shorter than the specificed alertThreshold, it writes the time -// elapsed between resubscription attempts to the thresholdViolated channel to -// alarm about potential problems with the stability of the subscription. -// -// Every error returned by the wrapped subscription function, is written to -// subscriptionFailed channel. +// if it is shorter than the specificed alertThreshold, it calls +// thresholdViolatedFn passing the time elapsed between resubscription attempts. +// This function alarms about potential problems with the stability of the +// subscription. // -// If the calling code is interested in reading from thresholdViolated and/or -// subscriptionFailed channel, appropriate readers need to be set up *before* -// WithResubscription is called. +// In case of an error returned by the wrapped subscription function, +// subscriptionFailedFn is called with the underlying error. // -// Writes to thresholdViolated and subscriptionFailed channels are non-blocking -// and are not stopping resubscription attempts. +// thresholdViolatedFn and subscriptionFailedFn calls are executed in a separate +// goroutine and thus are non-blocking. func WithResubscription( backoffMax time.Duration, - resubscribeFn event.ResubscribeFunc, + resubscribeFn event.ResubscribeFunc, alertThreshold time.Duration, - thresholdViolated chan<- time.Duration, - subscriptionFailed chan<- error, + thresholdViolatedFn func(time.Duration), + subscriptionFailedFn func(error), ) event.Subscription { lastAttempt := time.Time{} wrappedResubscribeFn := func(ctx context.Context) (event.Subscription, error) { now := time.Now() elapsed := now.Sub(lastAttempt) if elapsed < alertThreshold { - select { - case thresholdViolated <- elapsed: - default: - } + go thresholdViolatedFn(elapsed) } - + lastAttempt = now sub, err := resubscribeFn(ctx) if err != nil { - select { - case subscriptionFailed <- err: - default: - } + go subscriptionFailedFn(err) } return sub, err } diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go index 4400a4d..6db07a2 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe_test.go +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -31,8 +31,8 @@ func TestEmitOriginalError(t *testing.T) { backoffMax, resubscribeFn, alertThreshold, - thresholdViolated, - subscriptionFailed, + func(elapsed time.Duration) { thresholdViolated <- elapsed }, + func(err error) { subscriptionFailed <- err }, ) <-subscription.Err() @@ -83,8 +83,8 @@ func TestResubscribeAboveThreshold(t *testing.T) { backoffMax, resubscribeFn, alertThreshold, - thresholdViolated, - subscriptionFailed, + func(elapsed time.Duration) { thresholdViolated <- elapsed }, + func(err error) { subscriptionFailed <- err }, ) <-subscription.Err() @@ -149,8 +149,8 @@ func TestResubscribeBelowThreshold(t *testing.T) { backoffMax, resubscribeFn, alertThreshold, - thresholdViolated, - subscriptionFailed, + func(elapsed time.Duration) { thresholdViolated <- elapsed }, + func(err error) { subscriptionFailed <- err }, ) <-subscription.Err() @@ -236,16 +236,31 @@ func TestDoNotBlockOnChannelWrites(t *testing.T) { return delegate, nil } - // Non-buffered channels with no receivers + // Non-buffered channels with no receivers, will block on write thresholdViolated := make(chan time.Duration) subscriptionFailed := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + subscription := WithResubscription( backoffMax, resubscribeFn, alertThreshold, - thresholdViolated, - subscriptionFailed, + func(elapsed time.Duration) { + select { + case thresholdViolated <- elapsed: + case <-ctx.Done(): + return + } + }, + func(err error) { + select { + case subscriptionFailed <- err: + case <-ctx.Done(): + return + } + }, ) <-subscription.Err() From a285b995930cd01dc0803b3b470da077ba6d6ed9 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Tue, 12 Jan 2021 13:58:27 +0100 Subject: [PATCH 07/16] Updated contract bindings to pass callbacks to WithResubscription The previous version of WithResubscription accepted channels. The new version accepting function callbacks will be easier to use in contract bindings once the goroutine passing events from channel to callback will be eliminated from there. And that's the plan in one of the next commits. --- .../ethereum/contract_events.go.tmpl | 37 +++++++++---------- .../contract_events_template_content.go | 36 +++++++++--------- .../ethereum/contract_template_content.go | 2 +- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index f5445e3..17f9f10 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -40,12 +40,11 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( {{$event.IndexedFilterDeclarations -}} ) (subscription.EventSubscription) { eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - thresholdViolated := make(chan time.Duration) - subscriptionFailed := make(chan error) ctx, cancel := context.WithCancel(context.Background()) - + // TODO: Watch* function will soon accept channel as a parameter instead + // of the callback. This loop will be eliminated then. go func() { for { select { @@ -55,20 +54,6 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( success( {{$event.ParamExtractors}} ) - case violation := <-thresholdViolated: - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} had to be "+ - "retried [%v] since the last attempt; please inspect "+ - "Ethereum client connectivity", - violation, - ) - case err := <-subscriptionFailed: - {{$logger}}.Warningf( - "subscription to event {{$event.CapsName}} failed "+ - "with error: [%v]; resubscription attempt will be "+ - "performed", - err, - ) } } }() @@ -85,8 +70,22 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( {{$contract.ShortVar}}SubscriptionBackoffMax, subscribeFn, {{$contract.ShortVar}}SubscriptionAlertThreshold, - thresholdViolated, - subscriptionFailed, + func(elapsed time.Duration) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} had to be "+ + "retried [%v] since the last attempt; please inspect "+ + "Ethereum client connectivity", + elapsed, + ) + }, + func(err error) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} failed "+ + "with error: [%v]; resubscription attempt will be "+ + "performed", + err, + ) + }, ) return subscription.NewEventSubscription(func() { diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 8318ecf..753e362 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -43,12 +43,11 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( {{$event.IndexedFilterDeclarations -}} ) (subscription.EventSubscription) { eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - thresholdViolated := make(chan time.Duration) - subscriptionFailed := make(chan error) ctx, cancel := context.WithCancel(context.Background()) - + // TODO: Watch* function will soon accept channel as a parameter instead + // of the callback. This loop will be eliminated then. go func() { for { select { @@ -58,19 +57,6 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( success( {{$event.ParamExtractors}} ) - case violation := <-thresholdViolated: - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} had to be "+ - "retried [%v] since the last attempt; please inspect "+ - "Ethereum client connectivity", - violation, - ) - case err := <-subscriptionFailed: - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} failed: [%v]; "+ - "resubscription attempt will be performed", - err, - ) } } }() @@ -87,8 +73,22 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( {{$contract.ShortVar}}SubscriptionBackoffMax, subscribeFn, {{$contract.ShortVar}}SubscriptionAlertThreshold, - thresholdViolated, - subscriptionFailed, + func(elapsed time.Duration) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} had to be "+ + "retried [%v] since the last attempt; please inspect "+ + "Ethereum client connectivity", + elapsed, + ) + }, + func(err error) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} failed "+ + "with error: [%v]; resubscription attempt will be "+ + "performed", + err, + ) + }, ) return subscription.NewEventSubscription(func() { diff --git a/tools/generators/ethereum/contract_template_content.go b/tools/generators/ethereum/contract_template_content.go index 82c9b5e..e2085fb 100644 --- a/tools/generators/ethereum/contract_template_content.go +++ b/tools/generators/ethereum/contract_template_content.go @@ -40,7 +40,7 @@ const ( // subscription required before the subscription failure happens and // resubscription follows so that the resubscription does not emit an error // to the logs alerting about potential problems with Ethereum client. - {{.ShortVar}}SubscriptionAlertThreshold = 5 * time.Minute + {{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute ) type {{.Class}} struct { From eb5ee3d0578ab0adde55c7e6d732c8d7f40f5e83 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 10:38:53 +0100 Subject: [PATCH 08/16] Define thresholdViolated/subscriptionFailed functions separately Define thresholdViolated and subscriptionFailed functions in contract bindings separately for a better clarity of the code. --- .../ethereum/contract_events.go.tmpl | 36 ++++++++++--------- .../contract_events_template_content.go | 36 ++++++++++--------- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 17f9f10..d1fbc4d 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -66,26 +66,30 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( ) } + thresholdViolatedFn := func(elapsed time.Duration) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} had to be "+ + "retried [%v] since the last attempt; please inspect "+ + "Ethereum client connectivity", + elapsed, + ) + } + + subscriptionFailedFn := func(err error) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} failed "+ + "with error: [%v]; resubscription attempt will be "+ + "performed", + err, + ) + } + sub := ethutil.WithResubscription( {{$contract.ShortVar}}SubscriptionBackoffMax, subscribeFn, {{$contract.ShortVar}}SubscriptionAlertThreshold, - func(elapsed time.Duration) { - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} had to be "+ - "retried [%v] since the last attempt; please inspect "+ - "Ethereum client connectivity", - elapsed, - ) - }, - func(err error) { - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} failed "+ - "with error: [%v]; resubscription attempt will be "+ - "performed", - err, - ) - }, + thresholdViolatedFn, + subscriptionFailedFn, ) return subscription.NewEventSubscription(func() { diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 753e362..ab4d5ee 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -69,26 +69,30 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( ) } + thresholdViolatedFn := func(elapsed time.Duration) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} had to be "+ + "retried [%v] since the last attempt; please inspect "+ + "Ethereum client connectivity", + elapsed, + ) + } + + subscriptionFailedFn := func(err error) { + {{$logger}}.Errorf( + "subscription to event {{$event.CapsName}} failed "+ + "with error: [%v]; resubscription attempt will be "+ + "performed", + err, + ) + } + sub := ethutil.WithResubscription( {{$contract.ShortVar}}SubscriptionBackoffMax, subscribeFn, {{$contract.ShortVar}}SubscriptionAlertThreshold, - func(elapsed time.Duration) { - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} had to be "+ - "retried [%v] since the last attempt; please inspect "+ - "Ethereum client connectivity", - elapsed, - ) - }, - func(err error) { - {{$logger}}.Errorf( - "subscription to event {{$event.CapsName}} failed "+ - "with error: [%v]; resubscription attempt will be "+ - "performed", - err, - ) - }, + thresholdViolatedFn, + subscriptionFailedFn, ) return subscription.NewEventSubscription(func() { From 44412b59cf176977a2f301cf75ed5d57380846ad Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 11:02:14 +0100 Subject: [PATCH 09/16] Explained why channels in test use 10-elements buffer --- pkg/chain/ethereum/ethutil/resubscribe_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go index 6db07a2..2f0836b 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe_test.go +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -25,6 +25,9 @@ func TestEmitOriginalError(t *testing.T) { return delegate, nil } + // Using buffered channels to do not block writes. + // There should never be a need to write more to those channels if the code + // under the test works as expected. thresholdViolated := make(chan time.Duration, 10) subscriptionFailed := make(chan error, 10) subscription := WithResubscription( @@ -77,6 +80,9 @@ func TestResubscribeAboveThreshold(t *testing.T) { return delegate, nil } + // Using buffered channels to do not block writes. + // There should never be a need to write more to those channels if the code + // under the test works as expected. thresholdViolated := make(chan time.Duration, 10) subscriptionFailed := make(chan error, 10) subscription := WithResubscription( @@ -143,6 +149,9 @@ func TestResubscribeBelowThreshold(t *testing.T) { return delegate, nil } + // Using buffered channels to do not block writes. + // There should never be a need to write more to those channels if the code + // under the test works as expected. thresholdViolated := make(chan time.Duration, 10) subscriptionFailed := make(chan error, 10) subscription := WithResubscription( From e81b7de053718e5b4d6c67275f7528529af92854 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 11:10:55 +0100 Subject: [PATCH 10/16] resubscribeFn renamed to subscribeFn This function was called resubscribeFn just because of event.ResubscribeFn type. However, this function is called for the first-ever subscription as well and we do have subscriptionFailedFn as another parameter, named this way and not resubscriptionFailedFn for exactly the same reason - it can be called for the first-ever subscription. Moreover, each resubscription is in fact another subscription attempt. --- pkg/chain/ethereum/ethutil/resubscribe.go | 4 ++-- pkg/chain/ethereum/ethutil/resubscribe_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/resubscribe.go b/pkg/chain/ethereum/ethutil/resubscribe.go index b2af62e..98a1f62 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe.go +++ b/pkg/chain/ethereum/ethutil/resubscribe.go @@ -29,7 +29,7 @@ import ( // goroutine and thus are non-blocking. func WithResubscription( backoffMax time.Duration, - resubscribeFn event.ResubscribeFunc, + subscribeFn event.ResubscribeFunc, alertThreshold time.Duration, thresholdViolatedFn func(time.Duration), subscriptionFailedFn func(error), @@ -44,7 +44,7 @@ func WithResubscription( lastAttempt = now - sub, err := resubscribeFn(ctx) + sub, err := subscribeFn(ctx) if err != nil { go subscriptionFailedFn(err) } diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go index 2f0836b..95b5654 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe_test.go +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -14,7 +14,7 @@ func TestEmitOriginalError(t *testing.T) { alertThreshold := 100 * time.Millisecond failedOnce := false - resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + subscribeFn := func(ctx context.Context) (event.Subscription, error) { if !failedOnce { failedOnce = true return nil, fmt.Errorf("wherever I go, he goes") @@ -32,7 +32,7 @@ func TestEmitOriginalError(t *testing.T) { subscriptionFailed := make(chan error, 10) subscription := WithResubscription( backoffMax, - resubscribeFn, + subscribeFn, alertThreshold, func(elapsed time.Duration) { thresholdViolated <- elapsed }, func(err error) { subscriptionFailed <- err }, @@ -68,7 +68,7 @@ func TestResubscribeAboveThreshold(t *testing.T) { elapsedBetweenFailures := 150 * time.Millisecond resubscribeFnCalls := 0 - resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + subscribeFn := func(ctx context.Context) (event.Subscription, error) { resubscribeFnCalls++ time.Sleep(elapsedBetweenFailures) // 150ms > 100ms, above alert threshold if resubscribeFnCalls <= plannedSubscriptionFailures { @@ -87,7 +87,7 @@ func TestResubscribeAboveThreshold(t *testing.T) { subscriptionFailed := make(chan error, 10) subscription := WithResubscription( backoffMax, - resubscribeFn, + subscribeFn, alertThreshold, func(elapsed time.Duration) { thresholdViolated <- elapsed }, func(err error) { subscriptionFailed <- err }, @@ -137,7 +137,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { elapsedBetweenFailures := 50 * time.Millisecond resubscribeFnCalls := 0 - resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + subscribeFn := func(ctx context.Context) (event.Subscription, error) { resubscribeFnCalls++ time.Sleep(elapsedBetweenFailures) // 50ms < 100ms, below alert threshold if resubscribeFnCalls <= plannedSubscriptionFailures { @@ -156,7 +156,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { subscriptionFailed := make(chan error, 10) subscription := WithResubscription( backoffMax, - resubscribeFn, + subscribeFn, alertThreshold, func(elapsed time.Duration) { thresholdViolated <- elapsed }, func(err error) { subscriptionFailed <- err }, @@ -233,7 +233,7 @@ func TestDoNotBlockOnChannelWrites(t *testing.T) { elapsedBetweenFailures := 10 * time.Millisecond resubscribeFnCalls := 0 - resubscribeFn := func(ctx context.Context) (event.Subscription, error) { + subscribeFn := func(ctx context.Context) (event.Subscription, error) { resubscribeFnCalls++ time.Sleep(elapsedBetweenFailures) // 10ms < 100ms, below alert threshold if resubscribeFnCalls <= plannedSubscriptionFailures { @@ -254,7 +254,7 @@ func TestDoNotBlockOnChannelWrites(t *testing.T) { subscription := WithResubscription( backoffMax, - resubscribeFn, + subscribeFn, alertThreshold, func(elapsed time.Duration) { select { From 7a5fcf12f04fefbab6137b71a21e483050482e70 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 11:15:22 +0100 Subject: [PATCH 11/16] Renamed cancel to cancelCtx Just to make it clear what we are cancelling. --- tools/generators/ethereum/contract_events.go.tmpl | 4 ++-- tools/generators/ethereum/contract_events_template_content.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index d1fbc4d..78b8046 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -41,7 +41,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( ) (subscription.EventSubscription) { eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) // TODO: Watch* function will soon accept channel as a parameter instead // of the callback. This loop will be eliminated then. @@ -94,7 +94,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( return subscription.NewEventSubscription(func() { sub.Unsubscribe() - cancel() + cancelCtx() }) } diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index ab4d5ee..249da65 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -44,7 +44,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( ) (subscription.EventSubscription) { eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) // TODO: Watch* function will soon accept channel as a parameter instead // of the callback. This loop will be eliminated then. @@ -97,7 +97,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( return subscription.NewEventSubscription(func() { sub.Unsubscribe() - cancel() + cancelCtx() }) } From eb2cc4af228eca465b33f4ad92ce6c1fb295368d Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 11:34:25 +0100 Subject: [PATCH 12/16] Define expected error message in test in only one place --- pkg/chain/ethereum/ethutil/resubscribe_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go index 95b5654..b7650f2 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe_test.go +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -14,10 +14,11 @@ func TestEmitOriginalError(t *testing.T) { alertThreshold := 100 * time.Millisecond failedOnce := false + expectedFailMessage := "wherever I go, he goes" subscribeFn := func(ctx context.Context) (event.Subscription, error) { if !failedOnce { failedOnce = true - return nil, fmt.Errorf("wherever I go, he goes") + return nil, fmt.Errorf(expectedFailMessage) } delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error { return nil @@ -49,7 +50,6 @@ func TestEmitOriginalError(t *testing.T) { } // That failure should refer the original error. - expectedFailMessage := "wherever I go, he goes" err := <-subscriptionFailed if err.Error() != expectedFailMessage { t.Fatalf( From 4841e64672d7215b060ca21758d9576d44f292e5 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 11:41:15 +0100 Subject: [PATCH 13/16] User Errorf instead of Fatalf where applicable We can report all failed assertions instead of failing on the first one failed. The only exception is TestEmitOriginalError where we want to fail the test in case the first assertion fails so that we do not hang on <-subscriptionFailed next line. --- .../ethereum/ethutil/resubscribe_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/resubscribe_test.go b/pkg/chain/ethereum/ethutil/resubscribe_test.go index b7650f2..41b7f78 100644 --- a/pkg/chain/ethereum/ethutil/resubscribe_test.go +++ b/pkg/chain/ethereum/ethutil/resubscribe_test.go @@ -52,7 +52,7 @@ func TestEmitOriginalError(t *testing.T) { // That failure should refer the original error. err := <-subscriptionFailed if err.Error() != expectedFailMessage { - t.Fatalf( + t.Errorf( "unexpected subscription error message\nexpected: [%v]\nactual: [%v]", expectedFailMessage, err.Error(), @@ -99,7 +99,7 @@ func TestResubscribeAboveThreshold(t *testing.T) { // in a time shorter than 150ms one after another. violationCount := len(thresholdViolated) if violationCount != 0 { - t.Fatalf( + t.Errorf( "threshold violation reported [%v] times, expected none", violationCount, ) @@ -111,7 +111,7 @@ func TestResubscribeAboveThreshold(t *testing.T) { // successful and had not to be retried. expectedResubscriptionCalls := plannedSubscriptionFailures + 1 if resubscribeFnCalls != expectedResubscriptionCalls { - t.Fatalf( + t.Errorf( "resubscription called [%v] times, expected [%v]", resubscribeFnCalls, expectedResubscriptionCalls, @@ -121,7 +121,7 @@ func TestResubscribeAboveThreshold(t *testing.T) { // Expect all subscription failures to be reported. subscriptionFailCount := len(subscriptionFailed) if subscriptionFailCount != plannedSubscriptionFailures { - t.Fatalf( + t.Errorf( "subscription failure reported [%v] times, expected [%v]", subscriptionFailCount, plannedSubscriptionFailures, @@ -171,7 +171,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { // resubscription attempts. violationCount := len(thresholdViolated) if violationCount != plannedSubscriptionFailures { - t.Fatalf( + t.Errorf( "threshold violation reported [%v] times, expected [%v]", violationCount, plannedSubscriptionFailures, @@ -184,7 +184,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { for i := 0; i < violationCount; i++ { violation := <-thresholdViolated if violation < elapsedBetweenFailures { - t.Fatalf( + t.Errorf( "violation reported should be longer than the time elapsed "+ "between failures; is: [%v] and should be longer than [%v]", violation, @@ -192,7 +192,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { ) } if violation > alertThreshold { - t.Fatalf( + t.Errorf( "violation reported should be shorter than the alert threshold; "+ "; is: [%v] and should be shorter than [%v]", violation, @@ -207,7 +207,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { // successful and had not to be retried. expectedResubscriptionCalls := plannedSubscriptionFailures + 1 if resubscribeFnCalls != expectedResubscriptionCalls { - t.Fatalf( + t.Errorf( "resubscription called [%v] times, expected [%v]", resubscribeFnCalls, expectedResubscriptionCalls, @@ -217,7 +217,7 @@ func TestResubscribeBelowThreshold(t *testing.T) { // Expect all subscription failures to be reported. subscriptionFailCount := len(subscriptionFailed) if subscriptionFailCount != plannedSubscriptionFailures { - t.Fatalf( + t.Errorf( "subscription failure reported [%v] times, expected [%v]", subscriptionFailCount, plannedSubscriptionFailures, @@ -280,7 +280,7 @@ func TestDoNotBlockOnChannelWrites(t *testing.T) { // blocked by the lack of channel receivers on non-buffered channels. expectedResubscriptionCalls := plannedSubscriptionFailures + 1 if resubscribeFnCalls != expectedResubscriptionCalls { - t.Fatalf( + t.Errorf( "resubscription called [%v] times, expected [%v]", resubscribeFnCalls, expectedResubscriptionCalls, From c777eab88a4a02e21b166c54e85a48055f2a74f9 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:04:17 +0100 Subject: [PATCH 14/16] Simplified error message in subscription duration threshold violated fn --- tools/generators/ethereum/contract_events.go.tmpl | 2 +- tools/generators/ethereum/contract_events_template_content.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 78b8046..1a820fd 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -70,7 +70,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( {{$logger}}.Errorf( "subscription to event {{$event.CapsName}} had to be "+ "retried [%v] since the last attempt; please inspect "+ - "Ethereum client connectivity", + "Ethereum connectivity", elapsed, ) } diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 249da65..4013f0c 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -73,7 +73,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( {{$logger}}.Errorf( "subscription to event {{$event.CapsName}} had to be "+ "retried [%v] since the last attempt; please inspect "+ - "Ethereum client connectivity", + "Ethereum connectivity", elapsed, ) } From 64cab832ccbd7ae02b6e87da48ff65bf92fd8c1f Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:08:05 +0100 Subject: [PATCH 15/16] Use %s for printing duration in thresholdViolatedFn %v works fine as well but doing it with %s for a nicer code. --- tools/generators/ethereum/contract_events.go.tmpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 1a820fd..c27494c 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -69,7 +69,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( thresholdViolatedFn := func(elapsed time.Duration) { {{$logger}}.Errorf( "subscription to event {{$event.CapsName}} had to be "+ - "retried [%v] since the last attempt; please inspect "+ + "retried [%s] since the last attempt; please inspect "+ "Ethereum connectivity", elapsed, ) From c6adefdc082b4ee80aa6da0075e0f4baa8531f70 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:10:21 +0100 Subject: [PATCH 16/16] Regenerated template content for contract bindings Forgot to regenerate the template for the previous commit with a minor change about formatting string. --- tools/generators/ethereum/contract_events_template_content.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 4013f0c..1c21c27 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -72,7 +72,7 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( thresholdViolatedFn := func(elapsed time.Duration) { {{$logger}}.Errorf( "subscription to event {{$event.CapsName}} had to be "+ - "retried [%v] since the last attempt; please inspect "+ + "retried [%s] since the last attempt; please inspect "+ "Ethereum connectivity", elapsed, )