Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into pr-heal-squash
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Belov <artem.belov@xored.com>
  • Loading branch information
Artem Belov committed Mar 9, 2021
2 parents a59fd48 + 134592a commit dc384f2
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 116 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/nats-io/nats-streaming-server v0.17.0
github.com/nats-io/stan.go v0.6.0
github.com/networkservicemesh/api v0.0.0-20210218170701-1a72f1cba074
github.com/networkservicemesh/api v0.0.0-20210305165706-bcfdc8d78700
github.com/open-policy-agent/opa v0.16.1
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.6.0 h1:26IJPeykh88d8KVLT4jJCIxCyUBOC5/IQup8oWD/QYY=
github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac=
github.com/networkservicemesh/api v0.0.0-20210218170701-1a72f1cba074 h1:lMU+bavS8l0vKZKtCYutUFtTaU5jzTEA7bD/s843XYU=
github.com/networkservicemesh/api v0.0.0-20210218170701-1a72f1cba074/go.mod h1:qvxdY1Zt4QTtiG+uH1XmjpegeHjlt5Jj4A8iK55iJPI=
github.com/networkservicemesh/api v0.0.0-20210305165706-bcfdc8d78700 h1:c4M5DLI0L3IMx56Gqnt6kQ4SAF0tRCu0thxH2gmTxCE=
github.com/networkservicemesh/api v0.0.0-20210305165706-bcfdc8d78700/go.mod h1:qvxdY1Zt4QTtiG+uH1XmjpegeHjlt5Jj4A8iK55iJPI=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/networkservice/common/timeout/gen.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -20,6 +20,6 @@ import (
"sync"
)

//go:generate go-syncmap -output timer_map.gen.go -type timerMap<string,*time.Timer>
//go:generate go-syncmap -output close_timer_map.gen.go -type closeTimerMap<string,*closeTimer>

type timerMap sync.Map
type closeTimerMap sync.Map
108 changes: 52 additions & 56 deletions pkg/networkservice/common/timeout/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"context"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"

Expand All @@ -36,7 +35,12 @@ import (

type timeoutServer struct {
ctx context.Context
timers timerMap
timers closeTimerMap
}

type closeTimer struct {
expirationTime time.Time
timer *time.Timer
}

// NewServer - creates a new NetworkServiceServer chain element that implements timeout of expired connections
Expand All @@ -49,84 +53,76 @@ func NewServer(ctx context.Context) networkservice.NetworkServiceServer {
}
}

func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
logger := log.FromContext(ctx).WithField("timeoutServer", "request")
func (s *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
if err := s.validateRequest(ctx, request); err != nil {
return nil, err
}

connID := request.GetConnection().GetId()

conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}
t, loaded := s.timers.Load(connID)
stopped := loaded && t.timer.Stop()

if timer, ok := t.timers.LoadAndDelete(connID); ok {
if !timer.Stop() {
// Even if we failed to stop the timer, we should execute. It does mean that the timeout action
// is waiting on `executor.AsyncExec()` until we will finish.
// Since timer is being deleted under the `executor.AsyncExec()` this can't be a situation when
// the Request is executing after the timeout Close. Such case cannot be distinguished with the
// first-request case.
logger.Warnf("connection has been timed out, re requesting: %v", connID)
}
}
expirationTime := request.GetConnection().GetPrevPathSegment().GetExpires().AsTime().Local()

timer, err := t.createTimer(ctx, conn.Clone())
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
if _, closeErr := next.Server(ctx).Close(ctx, conn); closeErr != nil {
err = errors.Wrapf(err, "error attempting to close failed connection %v: %+v", connID, closeErr)
if stopped {
t.timer.Reset(time.Until(t.expirationTime))
}
return nil, err
}

t.timers.Store(connID, timer)
s.timers.Store(connID, s.newTimer(ctx, expirationTime, conn.Clone()))

return conn, nil
}

func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection) (*time.Timer, error) {
logger := log.FromContext(ctx).WithField("timeoutServer", "createTimer")

executor := serialize.GetExecutor(ctx)
if executor == nil {
return nil, errors.New("no executor provided")
func (s *timeoutServer) validateRequest(ctx context.Context, request *networkservice.NetworkServiceRequest) error {
if request.GetConnection().GetPrevPathSegment().GetExpires() == nil {
return errors.Errorf("expiration for prev path segment cannot be nil. conn: %+v", request.GetConnection())
}

if conn.GetPrevPathSegment().GetExpires() == nil {
return nil, errors.Errorf("expiration for prev path segment cannot be nil. conn: %+v", conn)
if serialize.GetExecutor(ctx) == nil {
return errors.New("no executor provided")
}
expireTime, err := ptypes.Timestamp(conn.GetPrevPathSegment().GetExpires())
if err != nil {
return nil, err
return nil
}

func (s *timeoutServer) newTimer(ctx context.Context, expirationTime time.Time, conn *networkservice.Connection) *closeTimer {
logger := log.FromContext(ctx).WithField("timeoutServer", "newTimer")

tPtr := new(*closeTimer)
*tPtr = &closeTimer{
expirationTime: expirationTime,
timer: time.AfterFunc(time.Until(expirationTime), func() {
<-serialize.GetExecutor(ctx).AsyncExec(func() {
if t, ok := s.timers.LoadAndDelete(conn.GetId()); !ok || t != *tPtr {
// this timer has been stopped
return
}

closeCtx, cancel := context.WithCancel(s.ctx)
defer cancel()

if _, err := next.Server(ctx).Close(closeCtx, conn); err != nil {
logger.Errorf("failed to close timed out connection: %s %s", conn.GetId(), err.Error())
}
})
}),
}

conn = conn.Clone()

timerPtr := new(*time.Timer)
*timerPtr = time.AfterFunc(time.Until(expireTime), func() {
<-executor.AsyncExec(func() {
if timer, _ := t.timers.Load(conn.GetId()); timer != *timerPtr {
logger.Warnf("timer has been already stopped: %v", conn.GetId())
return
}
t.timers.Delete(conn.GetId())
if _, err := next.Server(ctx).Close(t.ctx, conn); err != nil {
logger.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err)
}
})
})

return *timerPtr, nil
return *tPtr
}

func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
logger := log.FromContext(ctx).WithField("timeoutServer", "close")
func (s *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
logger := log.FromContext(ctx).WithField("timeoutServer", "Close")

timer, ok := t.timers.LoadAndDelete(conn.GetId())
t, ok := s.timers.LoadAndDelete(conn.GetId())
if !ok {
logger.Warnf("connection has been already closed: %v", conn.GetId())
logger.Warnf("connection has been already closed: %s", conn.GetId())
return new(empty.Empty), nil
}
timer.Stop()
t.timer.Stop()

return next.Server(ctx).Close(ctx, conn)
}
Loading

0 comments on commit dc384f2

Please sign in to comment.