From cbbb4a0fb26261028af75ac2dce59690170f6987 Mon Sep 17 00:00:00 2001 From: Artem Glazychev Date: Tue, 15 Mar 2022 18:13:45 +0700 Subject: [PATCH] Add cleanup chain element Signed-off-by: Artem Glazychev --- .../common/cleanup/cleanup_test.go | 120 +++++++++++++++++ pkg/networkservice/common/cleanup/client.go | 124 ++++++++++++++++++ pkg/networkservice/common/cleanup/doc.go | 18 +++ pkg/networkservice/common/cleanup/metadata.go | 58 ++++++++ pkg/networkservice/common/cleanup/options.go | 39 ++++++ 5 files changed, 359 insertions(+) create mode 100644 pkg/networkservice/common/cleanup/cleanup_test.go create mode 100644 pkg/networkservice/common/cleanup/client.go create mode 100644 pkg/networkservice/common/cleanup/doc.go create mode 100644 pkg/networkservice/common/cleanup/metadata.go create mode 100644 pkg/networkservice/common/cleanup/options.go diff --git a/pkg/networkservice/common/cleanup/cleanup_test.go b/pkg/networkservice/common/cleanup/cleanup_test.go new file mode 100644 index 000000000..2fa6ce32f --- /dev/null +++ b/pkg/networkservice/common/cleanup/cleanup_test.go @@ -0,0 +1,120 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup_test + +import ( + "context" + "fmt" + "testing" + "time" + + "go.uber.org/goleak" + + "github.com/stretchr/testify/require" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/cleanup" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +func TestCleanUp_CtxDone(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + chainCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + counter := new(count.Client) + + client := chain.NewNetworkServiceClient( + begin.NewClient(), + metadata.NewClient(), + cleanup.NewClient(chainCtx), + counter, + ) + req := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{Id: "nsc-1"}, + } + _, err := client.Request(context.Background(), req) + require.NoError(t, err) + require.Equal(t, 1, counter.Requests()) + require.Equal(t, 0, counter.Closes()) + cancel() + + require.Eventually(t, func() bool { + return counter.Closes() == 1 + }, time.Millisecond*100, time.Millisecond*10) +} + +func TestCleanUp_Close(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + chainCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + counter := new(count.Client) + + client := chain.NewNetworkServiceClient( + begin.NewClient(), + metadata.NewClient(), + cleanup.NewClient(chainCtx), + counter, + ) + req := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{Id: "nsc-1"}, + } + conn, err := client.Request(context.Background(), req) + require.NoError(t, err) + + _, _ = client.Close(context.Background(), conn) + require.Equal(t, 1, counter.Closes()) + cancel() + require.Never(t, func() bool { + return counter.Closes() > 1 + }, time.Millisecond*100, time.Millisecond*10) +} + +func TestCleanUp_Chan(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + chainCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + counter := new(count.Client) + + doneCh := make(chan struct{}) + client := chain.NewNetworkServiceClient( + begin.NewClient(), + metadata.NewClient(), + cleanup.NewClient(chainCtx, cleanup.WithDoneChan(doneCh)), + counter, + ) + + requestsNumber := 500 + for i := 0; i < requestsNumber; i++ { + req := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{Id: fmt.Sprintf("nsc-%v", i)}, + } + _, err := client.Request(context.Background(), req) + require.NoError(t, err) + } + + cancel() + <-doneCh + + require.Equal(t, counter.Closes(), requestsNumber) +} diff --git a/pkg/networkservice/common/cleanup/client.go b/pkg/networkservice/common/cleanup/client.go new file mode 100644 index 000000000..5a8ed2a7c --- /dev/null +++ b/pkg/networkservice/common/cleanup/client.go @@ -0,0 +1,124 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +import ( + "context" + "sync/atomic" + + "github.com/edwarnicke/serialize" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +type cleanupClient struct { + chainCtx context.Context + + ccClose bool + doneCh chan struct{} + activeConns int32 + executor serialize.Executor +} + +// NewClient - returns a cleanup client chain element +func NewClient(ctx context.Context, opts ...Option) networkservice.NetworkServiceClient { + o := &options{} + for _, opt := range opts { + opt(o) + } + c := &cleanupClient{ + chainCtx: ctx, + ccClose: o.ccClose, + doneCh: o.doneCh, + } + go func() { + <-c.chainCtx.Done() + if atomic.LoadInt32(&c.activeConns) == 0 && c.doneCh != nil { + c.executor.AsyncExec(func() { + select { + case <-c.doneCh: + default: + close(c.doneCh) + } + }) + } + }() + return c +} + +func (c *cleanupClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + conn, err := next.Client(ctx).Request(ctx, request, opts...) + if err != nil { + return nil, err + } + // Update active connections counter. Needed for a cleanup done notification. + atomic.AddInt32(&c.activeConns, 1) + if cancel, ok := loadAndDeleteCancel(ctx); ok { + cancel() + } + + cancelCtx, cancel := context.WithCancel(context.Background()) + storeCancel(ctx, cancel) + + factory := begin.FromContext(ctx) + go func() { + select { + case <-c.chainCtx.Done(): + // Add to metadata if we want to delete clientconn + if c.ccClose { + storeCC(ctx) + } + + <-factory.Close(begin.CancelContext(cancelCtx)) + atomic.AddInt32(&c.activeConns, -1) + + if atomic.LoadInt32(&c.activeConns) == 0 && c.doneCh != nil { + c.executor.AsyncExec(func() { + select { + case <-c.doneCh: + default: + close(c.doneCh) + } + }) + } + case <-cancelCtx.Done(): + atomic.AddInt32(&c.activeConns, -1) + } + }() + return conn, err +} + +func (c *cleanupClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + if cancel, ok := loadAndDeleteCancel(ctx); ok { + if _, ok := loadAndDeleteCC(ctx); ok { + if cc, ok := clientconn.Load(ctx); ok { + if closable, ok := cc.(interface{ Close() error }); ok { + _ = closable.Close() + } + clientconn.Delete(ctx) + } + } + cancel() + } + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/cleanup/doc.go b/pkg/networkservice/common/cleanup/doc.go new file mode 100644 index 000000000..e7088bf7b --- /dev/null +++ b/pkg/networkservice/common/cleanup/doc.go @@ -0,0 +1,18 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package cleanup provides networkservice.NetworkService chain elements to clean up resources before termination +package cleanup diff --git a/pkg/networkservice/common/cleanup/metadata.go b/pkg/networkservice/common/cleanup/metadata.go new file mode 100644 index 000000000..b79e4c3ae --- /dev/null +++ b/pkg/networkservice/common/cleanup/metadata.go @@ -0,0 +1,58 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +import ( + "context" + + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +type keyCancel struct{} +type keyCC struct{} + +// storeCancel sets the context.CancelFunc stored in per Connection.Id metadata. +func storeCancel(ctx context.Context, cancel context.CancelFunc) { + metadata.Map(ctx, true).Store(keyCancel{}, cancel) +} + +// loadAndDeleteCancel deletes the context.CancelFunc stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func loadAndDeleteCancel(ctx context.Context) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(keyCancel{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} + +// storeCC sets the flag to delete clientconn in per Connection.Id metadata. +func storeCC(ctx context.Context) { + metadata.Map(ctx, true).Store(keyCC{}, struct{}{}) +} + +// loadAndDeleteCC deletes the flag stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func loadAndDeleteCC(ctx context.Context) (value struct{}, ok bool) { + rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(keyCC{}) + if !ok { + return + } + value, ok = rawValue.(struct{}) + return value, ok +} diff --git a/pkg/networkservice/common/cleanup/options.go b/pkg/networkservice/common/cleanup/options.go new file mode 100644 index 000000000..9cbb2e15c --- /dev/null +++ b/pkg/networkservice/common/cleanup/options.go @@ -0,0 +1,39 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cleanup + +type options struct { + ccClose bool + doneCh chan struct{} +} + +// Option - options for the cleanup chain element +type Option func(*options) + +// WithCCClose - closes clientconn to prevent calling requests/closes on other datapath objects +func WithCCClose() Option { + return func(o *options) { + o.ccClose = true + } +} + +// WithDoneChan - receives a channel to notify the end of cleaning +func WithDoneChan(doneCh chan struct{}) Option { + return func(o *options) { + o.doneCh = doneCh + } +}