Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription dance: backoff, logging, and test coverage #62

Merged
merged 16 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions pkg/chain/ethereum/ethutil/resubscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ethutil

import (
"context"
"time"

"github.com/ethereum/go-ethereum/event"
)

// WithResubscription wraps the subscribe function to call it repeatedly
// to keep a subscription alive. When a subscription is established, it is
// monitored and in the case of a failure, resubscribe is attempted by
// calling the subscribe function again.
//
// The mechanism applies backoff between resubscription attempts.
// The time between calls is adapted based on the error rate, but will never
// exceed backoffMax.
//
// The mechanism monitors the time elapsed between resubscription attempts and
// if it is shorter than the specificed alertThreshold, it calls
// thresholdViolatedFn passing the time elapsed between resubscription attempts.
// This function alarms about potential problems with the stability of the
// subscription.
//
// In case of an error returned by the wrapped subscription function,
// subscriptionFailedFn is called with the underlying error.
//
// thresholdViolatedFn and subscriptionFailedFn calls are executed in a separate
// goroutine and thus are non-blocking.
func WithResubscription(
backoffMax time.Duration,
resubscribeFn event.ResubscribeFunc,
nkuba marked this conversation as resolved.
Show resolved Hide resolved
alertThreshold time.Duration,
thresholdViolatedFn func(time.Duration),
subscriptionFailedFn func(error),
nkuba marked this conversation as resolved.
Show resolved Hide resolved
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
) event.Subscription {
lastAttempt := time.Time{}
nkuba marked this conversation as resolved.
Show resolved Hide resolved
wrappedResubscribeFn := func(ctx context.Context) (event.Subscription, error) {
now := time.Now()
elapsed := now.Sub(lastAttempt)
if elapsed < alertThreshold {
nkuba marked this conversation as resolved.
Show resolved Hide resolved
go thresholdViolatedFn(elapsed)
}

lastAttempt = now

sub, err := resubscribeFn(ctx)
if err != nil {
go subscriptionFailedFn(err)
}
return sub, err
}

return event.Resubscribe(backoffMax, wrappedResubscribeFn)
}
280 changes: 280 additions & 0 deletions pkg/chain/ethereum/ethutil/resubscribe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package ethutil

import (
"context"
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/event"
)

func TestEmitOriginalError(t *testing.T) {
backoffMax := 100 * time.Millisecond
alertThreshold := 100 * time.Millisecond

failedOnce := false
resubscribeFn := func(ctx context.Context) (event.Subscription, error) {
if !failedOnce {
failedOnce = true
return nil, fmt.Errorf("wherever I go, he goes")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

thresholdViolated := make(chan time.Duration, 10)
subscriptionFailed := make(chan error, 10)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
subscription := WithResubscription(
backoffMax,
resubscribeFn,
alertThreshold,
func(elapsed time.Duration) { thresholdViolated <- elapsed },
func(err error) { subscriptionFailed <- err },
)
<-subscription.Err()
nkuba marked this conversation as resolved.
Show resolved Hide resolved

// Subscription failed one time so there should be one error in the channel.
subscriptionFailCount := len(subscriptionFailed)
if subscriptionFailCount != 1 {
t.Fatalf(
nkuba marked this conversation as resolved.
Show resolved Hide resolved
"subscription failure reported [%v] times, expected [1]",
subscriptionFailCount,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}

// That failure should refer the original error.
expectedFailMessage := "wherever I go, he goes"
nkuba marked this conversation as resolved.
Show resolved Hide resolved
err := <-subscriptionFailed
if err.Error() != expectedFailMessage {
t.Fatalf(
"unexpected subscription error message\nexpected: [%v]\nactual: [%v]",
expectedFailMessage,
err.Error(),
)
}
}

func TestResubscribeAboveThreshold(t *testing.T) {
backoffMax := 100 * time.Millisecond
alertThreshold := 100 * time.Millisecond

plannedSubscriptionFailures := 3
elapsedBetweenFailures := 150 * time.Millisecond

resubscribeFnCalls := 0
resubscribeFn := func(ctx context.Context) (event.Subscription, error) {
resubscribeFnCalls++
time.Sleep(elapsedBetweenFailures) // 150ms > 100ms, above alert threshold
if resubscribeFnCalls <= plannedSubscriptionFailures {
return nil, fmt.Errorf("this is the way")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

thresholdViolated := make(chan time.Duration, 10)
subscriptionFailed := make(chan error, 10)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
subscription := WithResubscription(
backoffMax,
resubscribeFn,
alertThreshold,
func(elapsed time.Duration) { thresholdViolated <- elapsed },
func(err error) { subscriptionFailed <- err },
)
<-subscription.Err()

// Nothing expected in thresholdViolated channel.
// Alert threshold is set to 100ms and there were no resubscription attempts
// in a time shorter than 150ms one after another.
violationCount := len(thresholdViolated)
if violationCount != 0 {
t.Fatalf(
"threshold violation reported [%v] times, expected none",
violationCount,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}

// Subscription failed plannedSubscriptionFailures times and resubscription
// function should be called plannedSubscriptionFailures + 1 times. One time
// for each failure and one time at the end - that subscription was
// successful and had not to be retried.
expectedResubscriptionCalls := plannedSubscriptionFailures + 1
if resubscribeFnCalls != expectedResubscriptionCalls {
t.Fatalf(
"resubscription called [%v] times, expected [%v]",
resubscribeFnCalls,
expectedResubscriptionCalls,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}

// Expect all subscription failures to be reported.
subscriptionFailCount := len(subscriptionFailed)
if subscriptionFailCount != plannedSubscriptionFailures {
t.Fatalf(
"subscription failure reported [%v] times, expected [%v]",
subscriptionFailCount,
plannedSubscriptionFailures,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}
}

func TestResubscribeBelowThreshold(t *testing.T) {
backoffMax := 50 * time.Millisecond
alertThreshold := 100 * time.Millisecond

plannedSubscriptionFailures := 5
elapsedBetweenFailures := 50 * time.Millisecond

resubscribeFnCalls := 0
resubscribeFn := func(ctx context.Context) (event.Subscription, error) {
resubscribeFnCalls++
time.Sleep(elapsedBetweenFailures) // 50ms < 100ms, below alert threshold
if resubscribeFnCalls <= plannedSubscriptionFailures {
return nil, fmt.Errorf("i have spoken")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

thresholdViolated := make(chan time.Duration, 10)
subscriptionFailed := make(chan error, 10)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
subscription := WithResubscription(
backoffMax,
resubscribeFn,
alertThreshold,
func(elapsed time.Duration) { thresholdViolated <- elapsed },
func(err error) { subscriptionFailed <- err },
)
<-subscription.Err()

// Threshold violaton should be reported for each subscription failure if
// the time elapsed since the previous resubscription was shorter than the
// threshold.
// In this test, alert threshold is set to 100ms and delays between failures
// are just 50ms. Thus, we expect the same number of threshold violations as
// resubscription attempts.
violationCount := len(thresholdViolated)
if violationCount != plannedSubscriptionFailures {
t.Fatalf(
"threshold violation reported [%v] times, expected [%v]",
violationCount,
plannedSubscriptionFailures,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}

// All violations reported should have correct values - all of them should
// be longer than the time elapsed between failures and shorter than the
// alert threshold. It is not possible to assert on a precise value.
for i := 0; i < violationCount; i++ {
violation := <-thresholdViolated
if violation < elapsedBetweenFailures {
t.Fatalf(
"violation reported should be longer than the time elapsed "+
"between failures; is: [%v] and should be longer than [%v]",
violation,
elapsedBetweenFailures,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}
if violation > alertThreshold {
t.Fatalf(
"violation reported should be shorter than the alert threshold; "+
"; is: [%v] and should be shorter than [%v]",
violation,
alertThreshold,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Subscription failed plannedSubscriptionFailures times and resubscription
// function should be called plannedSubscriptionFailures + 1 times. One time
// for each failure and one time at the end - that subscription was
// successful and had not to be retried.
expectedResubscriptionCalls := plannedSubscriptionFailures + 1
if resubscribeFnCalls != expectedResubscriptionCalls {
t.Fatalf(
"resubscription called [%v] times, expected [%v]",
resubscribeFnCalls,
expectedResubscriptionCalls,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}

// Expect all subscription failures to be reported.
subscriptionFailCount := len(subscriptionFailed)
if subscriptionFailCount != plannedSubscriptionFailures {
t.Fatalf(
"subscription failure reported [%v] times, expected [%v]",
subscriptionFailCount,
plannedSubscriptionFailures,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}
}

func TestDoNotBlockOnChannelWrites(t *testing.T) {
backoffMax := 50 * time.Millisecond
alertThreshold := 100 * time.Millisecond

plannedSubscriptionFailures := 5
elapsedBetweenFailures := 10 * time.Millisecond

resubscribeFnCalls := 0
resubscribeFn := func(ctx context.Context) (event.Subscription, error) {
resubscribeFnCalls++
time.Sleep(elapsedBetweenFailures) // 10ms < 100ms, below alert threshold
if resubscribeFnCalls <= plannedSubscriptionFailures {
return nil, fmt.Errorf("Groku?")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

// Non-buffered channels with no receivers, will block on write
thresholdViolated := make(chan time.Duration)
subscriptionFailed := make(chan error)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

subscription := WithResubscription(
backoffMax,
resubscribeFn,
alertThreshold,
func(elapsed time.Duration) {
select {
case thresholdViolated <- elapsed:
case <-ctx.Done():
return
}
},
func(err error) {
select {
case subscriptionFailed <- err:
case <-ctx.Done():
return
}
},
)
<-subscription.Err()

// Subscription failed plannedSubscriptionFailures times and resubscription
// function should be called plannedSubscriptionFailures + 1 times. One time
// for each failure and one time at the end - that subscription was
// successful and had not to be retried. No resubscription attempt should be
// blocked by the lack of channel receivers on non-buffered channels.
expectedResubscriptionCalls := plannedSubscriptionFailures + 1
if resubscribeFnCalls != expectedResubscriptionCalls {
t.Fatalf(
"resubscription called [%v] times, expected [%v]",
resubscribeFnCalls,
expectedResubscriptionCalls,
)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved
}
}
nkuba marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 15 additions & 0 deletions tools/generators/ethereum/contract.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ import (
// included or excluded from logging at startup by name.
var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}")

const (
// Maximum backoff time between event resubscription attempts.
{{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute

// Threshold below which event resubscription emits an error to the logs.
// WS connection can be dropped at any moment and event resubscription will
// follow. However, if WS connection for event subscription is getting
// dropped too often, it may indicate something is wrong with Ethereum
// client. This constant defines the minimum lifetime of an event
// subscription required before the subscription failure happens and
// resubscription follows so that the resubscription does not emit an error
// to the logs alerting about potential problems with Ethereum client.
{{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 should this be per-contract?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should not be but I do not have a good place to put it at. 😬 Maybe subscribe_opts.go from ethutil introduced in the next PR #63 is a good candidate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge this PR. Please let's handle this change in #63.

)

type {{.Class}} struct {
contract *abi.{{.AbiClass}}
contractAddress common.Address
Expand Down
Loading