Skip to content

Commit

Permalink
Add refresh retries on failure
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>
  • Loading branch information
Vladimir Popov committed Sep 20, 2021
1 parent 07ced33 commit 41ae4ca
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
45 changes: 31 additions & 14 deletions pkg/networkservice/common/refresh/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type refreshClient struct {
Expand All @@ -52,6 +51,8 @@ func NewClient(ctx context.Context) networkservice.NetworkServiceClient {
}

func (t *refreshClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
logger := log.FromContext(ctx).WithField("refreshClient", "Request")

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
Expand All @@ -74,17 +75,25 @@ func (t *refreshClient) Request(ctx context.Context, request *networkservice.Net
store(ctx, metadata.IsClient(t), cancel)

eventFactory := begin.FromContext(ctx)
timeClock := clock.FromContext(ctx)
clockTime := clock.FromContext(ctx)
// Create the afterCh *outside* the go routine. This must be done to avoid picking up a later 'now'
// from mockClock in testing
afterCh := timeClock.After(refreshAfter)
go func(cancelCtx context.Context, afterCh <-chan time.Time) {
select {
case <-cancelCtx.Done():
case <-afterCh:
eventFactory.Request(begin.CancelContext(cancelCtx))
afterTicker := clockTime.Ticker(refreshAfter)
go func() {
defer afterTicker.Stop()
for {
select {
case <-cancelCtx.Done():
return
case <-afterTicker.C():
if err := <-eventFactory.Request(begin.CancelContext(cancelCtx)); err != nil {
logger.Warnf("refresh failed: %s", err.Error())
continue
}
return
}
}
}(cancelCtx, afterCh)
}()

return conn, nil
}
Expand All @@ -98,11 +107,18 @@ func (t *refreshClient) Close(ctx context.Context, conn *networkservice.Connecti

func after(ctx context.Context, conn *networkservice.Connection) (time.Duration, error) {
clockTime := clock.FromContext(ctx)

expireTime, err := ptypes.Timestamp(conn.GetCurrentPathSegment().GetExpires())
if err != nil {
return 0, errors.WithStack(err)
}
log.FromContext(ctx).Infof("expireTime %s now %s", expireTime, clockTime.Now().UTC())

timeout := clockTime.Until(expireTime)
log.FromContext(ctx).Infof("expiration after %s at %s", timeout.String(), expireTime.UTC())

if timeout <= 0 {
return 1, nil
}

// A heuristic to reduce the number of redundant requests in a chain
// made of refreshing clients with the same expiration time: let outer
Expand All @@ -113,6 +129,7 @@ func after(ctx context.Context, conn *networkservice.Connection) (time.Duration,
if len(path.PathSegments) > 1 {
scale = 0.2 + 0.2*float64(path.Index)/float64(len(path.PathSegments))
}
duration := time.Duration(float64(clockTime.Until(expireTime)) * scale)
duration := time.Duration(float64(timeout) * scale)

return duration, nil
}
4 changes: 2 additions & 2 deletions pkg/networkservice/common/refresh/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestRefreshClient_NoRefreshOnFailure(t *testing.T) {
require.Never(t, cloneClient.validator(2), testWait, testTick)
}

func TestRefreshClient_NoRefreshOnRefreshFailure(t *testing.T) {
func TestRefreshClient_RefreshOnRefreshFailure(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -330,5 +330,5 @@ func TestRefreshClient_NoRefreshOnRefreshFailure(t *testing.T) {

clockMock.Add(expireTimeout)

require.Never(t, cloneClient.validator(3), testWait, testTick)
require.Eventually(t, cloneClient.validator(3), testWait, testTick)
}

0 comments on commit 41ae4ca

Please sign in to comment.