From 3d211c478735ac7f5fad139983fe529c57254f37 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 29 Aug 2022 14:43:21 -0500 Subject: [PATCH 1/3] Use counter on close for reused-connection clients Fixes #886 --- client/client.go | 10 ++++++ internal/client.go | 15 +++++--- internal/internal_workflow_client.go | 28 ++++++++++++--- internal/internal_workflow_client_test.go | 44 +++++++++++++++++++++++ 4 files changed, 89 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index e6a58b9de..ca442d091 100644 --- a/client/client.go +++ b/client/client.go @@ -386,6 +386,11 @@ type ( WorkflowService() workflowservice.WorkflowServiceClient // Close client and clean up underlying resources. + // + // If this client was created via NewClientFromExisting or this client has + // been used in that call, Close() on may not necessarily close the + // underlying connection. Only the final close of all existing clients will + // close the underlying connection. Close() } @@ -474,6 +479,11 @@ func NewClient(options Options) (Client, error) { // this package and cannot be wrapped. Currently, this always attempts an eager // connection even if the existing client was created with NewLazyClient and has // not made any calls yet. +// +// Close() on the resulting client may not necessarily close the underlying +// connection if there are any other clients using the connection. All clients +// associated with the existing client must call Close() and only the last one +// actually performs the connection close. func NewClientFromExisting(existingClient Client, options Options) (Client, error) { return internal.NewClientFromExisting(existingClient, options) } diff --git a/internal/client.go b/internal/client.go index e23d3c540..059b5eb74 100644 --- a/internal/client.go +++ b/internal/client.go @@ -28,6 +28,7 @@ import ( "context" "crypto/tls" "fmt" + "sync/atomic" "time" commonpb "go.temporal.io/api/common/v1" @@ -689,12 +690,18 @@ func newClient(options ClientOptions, existing *WorkflowClient) (Client, error) if client.capabilities, err = existing.loadCapabilities(); err != nil { return nil, err } - } else if !options.ConnectionOptions.disableEagerConnection { - if _, err := client.loadCapabilities(); err != nil { - client.Close() - return nil, err + client.unclosedClients = existing.unclosedClients + } else { + if !options.ConnectionOptions.disableEagerConnection { + if _, err := client.loadCapabilities(); err != nil { + client.Close() + return nil, err + } } + var unclosedClients int32 + client.unclosedClients = &unclosedClients } + atomic.AddInt32(client.unclosedClients, 1) return client, nil } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index b4eae7669..955bfe75c 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -29,8 +29,10 @@ import ( "errors" "fmt" "io" + "math" "reflect" "sync" + "sync/atomic" "time" "github.com/pborman/uuid" @@ -83,6 +85,10 @@ type ( excludeInternalFromRetry *uberatomic.Bool capabilities *workflowservice.GetSystemInfoResponse_Capabilities capabilitiesLock sync.RWMutex + + // The pointer value is shared across multiple clients. If non-nil, only + // access/mutate atomically. + unclosedClients *int32 } // namespaceClient is the client for managing namespaces. @@ -942,11 +948,25 @@ func (wc *WorkflowClient) ensureInitialized() error { // Close client and clean up underlying resources. func (wc *WorkflowClient) Close() { - if wc.conn == nil { - return + // If there's a set of unclosed clients, we have to decrement it and then + // set it to a new pointer of max to prevent decrementing on repeated Close + // calls to this client. If the count has not reached zero, this close call is + // ignored. + if wc.unclosedClients != nil { + remainingUnclosedClients := atomic.AddInt32(wc.unclosedClients, -1) + // Set the unclosed clients to max value so we never try this again + var maxUnclosedClients int32 = math.MaxInt32 + wc.unclosedClients = &maxUnclosedClients + // If there are any remaining, do not close + if remainingUnclosedClients > 0 { + return + } } - if err := wc.conn.Close(); err != nil { - wc.logger.Warn("unable to close connection", tagError, err) + + if wc.conn != nil { + if err := wc.conn.Close(); err != nil { + wc.logger.Warn("unable to close connection", tagError, err) + } } } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index b53629cea..3e7351ae2 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -34,10 +34,12 @@ import ( workflowpb "go.temporal.io/api/workflow/v1" uberatomic "go.uber.org/atomic" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" ilog "go.temporal.io/sdk/internal/log" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -1402,3 +1404,45 @@ func serializeEvents(events []*historypb.HistoryEvent) *commonpb.DataBlob { Data: blob.Data, } } + +func TestClientCloseCount(t *testing.T) { + // Create primary client + server, err := startTestGRPCServer() + require.NoError(t, err) + defer server.Stop() + client, err := DialClient(ClientOptions{HostPort: server.addr}) + require.NoError(t, err) + workflowClient := client.(*WorkflowClient) + + // Confirm there is 1 unclosed client + require.EqualValues(t, 1, *workflowClient.unclosedClients) + + // Create two more and confirm counts + client2, err := NewClientFromExisting(client, ClientOptions{}) + require.NoError(t, err) + require.EqualValues(t, 2, *workflowClient.unclosedClients) + require.Same(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients) + client3, err := NewClientFromExisting(client, ClientOptions{}) + require.NoError(t, err) + require.EqualValues(t, 3, *workflowClient.unclosedClients) + require.Same(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients) + + // Close the third one 3 times and confirm counts and that connection not + // closed + client3.Close() + client3.Close() + client3.Close() + require.EqualValues(t, 2, *workflowClient.unclosedClients) + require.NotSame(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients) + require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown) + + // Close the primary one and confirm not closed + client.Close() + require.EqualValues(t, 1, *client2.(*WorkflowClient).unclosedClients) + require.NotSame(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients) + require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown) + + // Now close the last one (the second) and confirm it actually gets closed + client2.Close() + require.Equal(t, connectivity.Shutdown, workflowClient.conn.GetState()) +} From 82071cd3e4881d1827920e0fbeac6eff19c39f7d Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 30 Aug 2022 09:24:57 -0500 Subject: [PATCH 2/3] Properly close client when done with it --- test/integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration_test.go b/test/integration_test.go index 912f6e1ec..d4d19c545 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -2474,6 +2474,7 @@ func (ts *IntegrationTestSuite) TestMultiNamespaceClient() { // Make a new client with a different namespace and run again newClient, err := client.NewClientFromExisting(ts.client, client.Options{Namespace: "some-other-namespace"}) ts.NoError(err) + defer newClient.Close() _, _ = newClient.DescribeWorkflowExecution(ctx, "id-that-does-not-exist", "") // Confirm there was no count change to other namespace but there is now a From 2098f45b589a6bab4b9f7985ef9773ac51363281 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Thu, 1 Sep 2022 12:37:27 -0500 Subject: [PATCH 3/3] Minor PR fix --- internal/internal_workflow_client_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 3e7351ae2..a626bae26 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "testing" "time" @@ -1415,16 +1416,16 @@ func TestClientCloseCount(t *testing.T) { workflowClient := client.(*WorkflowClient) // Confirm there is 1 unclosed client - require.EqualValues(t, 1, *workflowClient.unclosedClients) + require.EqualValues(t, 1, atomic.LoadInt32(workflowClient.unclosedClients)) // Create two more and confirm counts client2, err := NewClientFromExisting(client, ClientOptions{}) require.NoError(t, err) - require.EqualValues(t, 2, *workflowClient.unclosedClients) + require.EqualValues(t, 2, atomic.LoadInt32(workflowClient.unclosedClients)) require.Same(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients) client3, err := NewClientFromExisting(client, ClientOptions{}) require.NoError(t, err) - require.EqualValues(t, 3, *workflowClient.unclosedClients) + require.EqualValues(t, 3, atomic.LoadInt32(workflowClient.unclosedClients)) require.Same(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients) // Close the third one 3 times and confirm counts and that connection not @@ -1432,13 +1433,13 @@ func TestClientCloseCount(t *testing.T) { client3.Close() client3.Close() client3.Close() - require.EqualValues(t, 2, *workflowClient.unclosedClients) + require.EqualValues(t, 2, atomic.LoadInt32(workflowClient.unclosedClients)) require.NotSame(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients) require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown) // Close the primary one and confirm not closed client.Close() - require.EqualValues(t, 1, *client2.(*WorkflowClient).unclosedClients) + require.EqualValues(t, 1, atomic.LoadInt32(client2.(*WorkflowClient).unclosedClients)) require.NotSame(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients) require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown)