Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix begin on refresh #1365

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/networkservice/common/begin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (b *beginClient) Request(ctx context.Context, request *networkservice.Netwo
conn, err = b.Request(ctx, request, opts...)
return
}
eventFactoryClient.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryClient)
request.Connection = mergeConnection(eventFactoryClient.returnedConnection, request.GetConnection(), eventFactoryClient.request.GetConnection())
Expand Down
28 changes: 18 additions & 10 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.
client: next.Client(ctx),
opts: opts,
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -75,6 +71,14 @@ func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.
return f
}

func (f *eventFactoryClient) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down Expand Up @@ -155,11 +159,7 @@ func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactory
f := &eventFactoryServer{
server: next.Server(ctx),
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -168,6 +168,14 @@ func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactory
return f
}

func (f *eventFactoryServer) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down
42 changes: 42 additions & 0 deletions pkg/networkservice/common/begin/event_factory_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,48 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

// This test reproduces the situation when refresh changes the eventFactory context
// nolint:dupl
func TestRefresh_Client(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

syncChan := make(chan struct{})
checkCtxCl := &checkContextClient{t: t}
eventFactoryCl := &eventFactoryClient{ch: syncChan}
client := chain.NewNetworkServiceClient(
begin.NewClient(),
checkCtxCl,
eventFactoryCl,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set any value to context
ctx = context.WithValue(ctx, contextKey{}, "value_1")
checkCtxCl.setExpectedValue("value_1")

// Do Request with this context
request := testRequest("1")
conn, err := client.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Change context value before refresh Request
ctx = context.WithValue(ctx, contextKey{}, "value_2")
checkCtxCl.setExpectedValue("value_2")
request.Connection = conn.Clone()

// Call refresh
conn, err = client.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Call refresh from eventFactory. We are expecting updated value in the context
eventFactoryCl.callRefresh()
<-syncChan
}

// This test reproduces the situation when Close and Request were called at the same time
// nolint:dupl
func TestRefreshDuringClose_Client(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions pkg/networkservice/common/begin/event_factory_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

// This test reproduces the situation when refresh changes the eventFactory context
// nolint:dupl
func TestRefresh_Server(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

syncChan := make(chan struct{})
checkCtxServ := &checkContextServer{t: t}
eventFactoryServ := &eventFactoryServer{ch: syncChan}
server := chain.NewNetworkServiceServer(
begin.NewServer(),
checkCtxServ,
eventFactoryServ,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set any value to context
ctx = context.WithValue(ctx, contextKey{}, "value_1")
checkCtxServ.setExpectedValue("value_1")

// Do Request with this context
request := testRequest("1")
conn, err := server.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Change context value before refresh Request
ctx = context.WithValue(ctx, contextKey{}, "value_2")
checkCtxServ.setExpectedValue("value_2")
request.Connection = conn.Clone()

// Call refresh
conn, err = server.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Call refresh from eventFactory. We are expecting updated value in the context
eventFactoryServ.callRefresh()
<-syncChan
}

// This test reproduces the situation when Close and Request were called at the same time
// nolint:dupl
func TestRefreshDuringClose_Server(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
conn, err = b.Request(ctx, request)
return
}
eventFactoryServer.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryServer)
conn, err = next.Server(ctx).Request(ctx, request)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/common/begin/ns_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (b *beginNSClient) Register(ctx context.Context, in *registry.NetworkServic
resp, err = b.Register(ctx, in, opts...)
return
}
eventFactoryClient.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryClient)
resp, err = next.NetworkServiceRegistryClient(ctx).Register(ctx, in, opts...)
Expand Down
28 changes: 18 additions & 10 deletions pkg/registry/common/begin/ns_event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func newEventNSFactoryClient(ctx context.Context, afterClose func(), opts ...grp
client: next.NetworkServiceRegistryClient(ctx),
opts: opts,
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -58,6 +54,14 @@ func newEventNSFactoryClient(ctx context.Context, afterClose func(), opts ...grp
return f
}

func (f *eventNSFactoryClient) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSFactoryClient) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down Expand Up @@ -129,11 +133,7 @@ func newNSEventFactoryServer(ctx context.Context, afterClose func()) *eventNSFac
f := &eventNSFactoryServer{
server: next.NetworkServiceRegistryServer(ctx),
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -142,6 +142,14 @@ func newNSEventFactoryServer(ctx context.Context, afterClose func()) *eventNSFac
return f
}

func (f *eventNSFactoryServer) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSFactoryServer) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/registry/common/begin/ns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (b *beginNSServer) Register(ctx context.Context, in *registry.NetworkServic
resp, err = b.Register(ctx, in)
return
}
eventFactoryServer.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryServer)
resp, err = next.NetworkServiceRegistryServer(ctx).Register(ctx, in)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/common/begin/nse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (b *beginNSEClient) Register(ctx context.Context, in *registry.NetworkServi
resp, err = b.Register(ctx, in, opts...)
return
}
eventFactoryClient.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryClient)
resp, err = next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, in, opts...)
Expand Down
28 changes: 18 additions & 10 deletions pkg/registry/common/begin/nse_event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func newEventNSEFactoryClient(ctx context.Context, afterClose func(), opts ...gr
client: next.NetworkServiceEndpointRegistryClient(ctx),
opts: opts,
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -58,6 +54,14 @@ func newEventNSEFactoryClient(ctx context.Context, afterClose func(), opts ...gr
return f
}

func (f *eventNSEFactoryClient) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSEFactoryClient) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down Expand Up @@ -129,11 +133,7 @@ func newNSEEventFactoryServer(ctx context.Context, afterClose func()) *eventNSEF
f := &eventNSEFactoryServer{
server: next.NetworkServiceEndpointRegistryServer(ctx),
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -142,6 +142,14 @@ func newNSEEventFactoryServer(ctx context.Context, afterClose func()) *eventNSEF
return f
}

func (f *eventNSEFactoryServer) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSEFactoryServer) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down
42 changes: 42 additions & 0 deletions pkg/registry/common/begin/nse_event_factory_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,48 @@ import (
"google.golang.org/grpc"
)

// This test reproduces the situation when refresh changes the eventFactory context
func TestRefresh_Client(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

syncChan := make(chan struct{})
checkCtxCl := &checkContextClient{t: t}
eventFactoryCl := &eventFactoryClient{ch: syncChan}
client := chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
checkCtxCl,
eventFactoryCl,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set any value to context
ctx = context.WithValue(ctx, contextKey{}, "value_1")
checkCtxCl.setExpectedValue("value_1")

// Do Register with this context
nse := &registry.NetworkServiceEndpoint{
Name: "1",
}
conn, err := client.Register(ctx, nse.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Change context value before refresh
ctx = context.WithValue(ctx, contextKey{}, "value_2")
checkCtxCl.setExpectedValue("value_2")

// Call refresh
conn, err = client.Register(ctx, nse.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Call refresh from eventFactory. We are expecting updated value in the context
eventFactoryCl.callRefresh()
<-syncChan
}

// This test reproduces the situation when Unregister and Register were called at the same time
func TestRefreshDuringUnregister_Client(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
Expand Down
Loading