Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync -- client cleanup #1680

Merged
merged 20 commits into from
Jul 7, 2023
84 changes: 54 additions & 30 deletions x/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync/atomic"
"time"

Expand All @@ -23,7 +24,9 @@ import (
)

const (
failedRequestSleepInterval = 10 * time.Millisecond
initialRetryWait = 10 * time.Millisecond
maxRetryWait = time.Second
retryWaitFactor = 1.5 // Larger --> timeout grows more quickly

epsilon = 1e-6 // small amount to add to time to avoid division by 0
)
Expand Down Expand Up @@ -177,25 +180,24 @@ func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofReq
return getAndParse(ctx, c, reqBytes, parseFn)
}

// getAndParse uses [client] to send [request] to an arbitrary peer. If the peer responds,
// [parseFn] is called with the raw response. If [parseFn] returns an error or the request
// times out, this function will retry the request to a different peer until [ctx] expires.
// If [parseFn] returns a nil error, the result is returned from getAndParse.
func getAndParse[T any](ctx context.Context, client *client, request []byte, parseFn func(context.Context, []byte) (*T, error)) (*T, error) {
// getAndParse uses [client] to send [request] to an arbitrary peer.
// Returns the response to the request.
// [parseFn] parses the raw response.
// If the request is unsuccessful or the response can't be parsed,
// retries the request to a different peer until [ctx] expires.
func getAndParse[T any](
ctx context.Context,
client *client,
request []byte,
parseFn func(context.Context, []byte) (*T, error),
) (*T, error) {
var (
lastErr error
response *T
)
// Loop until the context is cancelled or we get a valid response.
for attempt := 0; ; attempt++ {
// If the context has finished, return the context error early.
if err := ctx.Err(); err != nil {
if lastErr != nil {
return nil, fmt.Errorf("request failed after %d attempts with last error %w and ctx error %s", attempt, lastErr, err)
}
return nil, err
}
responseBytes, nodeID, err := client.get(ctx, request)
for attempt := 1; ; attempt++ {
nodeID, responseBytes, err := client.get(ctx, request)
if err == nil {
if response, err = parseFn(ctx, responseBytes); err == nil {
return response, nil
Expand All @@ -205,44 +207,66 @@ func getAndParse[T any](ctx context.Context, client *client, request []byte, par
client.log.Debug("request failed, retrying",
zap.Stringer("nodeID", nodeID),
zap.Int("attempt", attempt),
zap.Error(err))

zap.Error(err),
)
// if [err] is being propagated from [ctx], avoid overwriting [lastErr].
if err != ctx.Err() {
// if [err] is being propagated from [ctx], avoid overwriting [lastErr].
lastErr = err
time.Sleep(failedRequestSleepInterval)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed to avoid sleeping. It's not the end of the world to wait 10 ms to wake up in the event of context cancellation, but still seems like good practice to not sleep.

}

retryWait := initialRetryWait * time.Duration(math.Pow(retryWaitFactor, float64(attempt)))
if retryWait > maxRetryWait || retryWait < 0 { // Handle overflows with negative check.
retryWait = maxRetryWait
}

select {
case <-ctx.Done():
if lastErr != nil {
// prefer reporting [lastErr] if it's not nil.
return nil, fmt.Errorf(
"request failed after %d attempts with last error %w and ctx error %s",
attempt, lastErr, ctx.Err(),
)
}
return nil, ctx.Err()
case <-time.After(retryWait):
}
}
}

// get sends [request] to an arbitrary peer and blocks until the node receives a response
// or [ctx] expires. Returns the raw response from the peer, the peer's NodeID, and an
// error if the request timed out. Thread safe.
func (c *client) get(ctx context.Context, requestBytes []byte) ([]byte, ids.NodeID, error) {
c.metrics.RequestMade()
// get sends [request] to an arbitrary peer and blocks
// until the node receives a response, failure notification
// or [ctx] is canceled.
// Returns the peer's NodeID and response.
// It's safe to call this method multiple times concurrently.
func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places like this we return the node ID first so I did that here too

var (
response []byte
nodeID ids.NodeID
err error
startTime = time.Now()
)

c.metrics.RequestMade()

if len(c.stateSyncNodes) == 0 {
response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, requestBytes)
nodeID, response, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request)
} else {
// get the next nodeID using the nodeIdx offset. If we're out of nodes, loop back to 0
// we do this every attempt to ensure we get a different node each time if possible.
// Get the next nodeID to query using the [nodeIdx] offset.
// If we're out of nodes, loop back to 0.
// We do this try to query a different node each time if possible.
nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1)
nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))]
response, err = c.networkClient.Request(ctx, nodeID, requestBytes)
response, err = c.networkClient.Request(ctx, nodeID, request)
}
if err != nil {
c.metrics.RequestFailed()
c.networkClient.TrackBandwidth(nodeID, 0)
return response, nodeID, err
return nodeID, response, err
}

bandwidth := float64(len(response)) / (time.Since(startTime).Seconds() + epsilon)
c.networkClient.TrackBandwidth(nodeID, bandwidth)
c.metrics.RequestSucceeded()
return response, nodeID, nil
return nodeID, response, nil
}
Loading