Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Client.Close on cloned clients #893

Merged
merged 6 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ type (
WorkflowService() workflowservice.WorkflowServiceClient

// Close client and clean up underlying resources.
//
// If this client was created via NewClientFromExisting or this client has
// been used in that call, Close() on may not necessarily close the
// underlying connection. Only the final close of all existing clients will
// close the underlying connection.
Close()
}

Expand Down Expand Up @@ -474,6 +479,11 @@ func NewClient(options Options) (Client, error) {
// this package and cannot be wrapped. Currently, this always attempts an eager
// connection even if the existing client was created with NewLazyClient and has
// not made any calls yet.
//
// Close() on the resulting client may not necessarily close the underlying
// connection if there are any other clients using the connection. All clients
// associated with the existing client must call Close() and only the last one
// actually performs the connection close.
func NewClientFromExisting(existingClient Client, options Options) (Client, error) {
return internal.NewClientFromExisting(existingClient, options)
}
Expand Down
15 changes: 11 additions & 4 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"crypto/tls"
"fmt"
"sync/atomic"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -689,12 +690,18 @@ func newClient(options ClientOptions, existing *WorkflowClient) (Client, error)
if client.capabilities, err = existing.loadCapabilities(); err != nil {
return nil, err
}
} else if !options.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(); err != nil {
client.Close()
return nil, err
client.unclosedClients = existing.unclosedClients
} else {
if !options.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(); err != nil {
client.Close()
return nil, err
}
}
var unclosedClients int32
client.unclosedClients = &unclosedClients
}
atomic.AddInt32(client.unclosedClients, 1)

return client, nil
}
Expand Down
28 changes: 24 additions & 4 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"errors"
"fmt"
"io"
"math"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -83,6 +85,10 @@ type (
excludeInternalFromRetry *uberatomic.Bool
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
capabilitiesLock sync.RWMutex

// The pointer value is shared across multiple clients. If non-nil, only
// access/mutate atomically.
unclosedClients *int32
}

// namespaceClient is the client for managing namespaces.
Expand Down Expand Up @@ -942,11 +948,25 @@ func (wc *WorkflowClient) ensureInitialized() error {

// Close client and clean up underlying resources.
func (wc *WorkflowClient) Close() {
if wc.conn == nil {
return
// If there's a set of unclosed clients, we have to decrement it and then
// set it to a new pointer of max to prevent decrementing on repeated Close
// calls to this client. If the count has not reached zero, this close call is
// ignored.
if wc.unclosedClients != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't ever be nil, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot trust all places that create a client will set this. That includes tests and other code paths.

remainingUnclosedClients := atomic.AddInt32(wc.unclosedClients, -1)
// Set the unclosed clients to max value so we never try this again
var maxUnclosedClients int32 = math.MaxInt32
wc.unclosedClients = &maxUnclosedClients
// If there are any remaining, do not close
if remainingUnclosedClients > 0 {
return
}
}
if err := wc.conn.Close(); err != nil {
wc.logger.Warn("unable to close connection", tagError, err)

if wc.conn != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is this nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test cases w/ mocks. This logic has not changed, I just moved the condition from the top of the function to down here.

if err := wc.conn.Close(); err != nil {
wc.logger.Warn("unable to close connection", tagError, err)
}
}
}

Expand Down
45 changes: 45 additions & 0 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"testing"
"time"

workflowpb "go.temporal.io/api/workflow/v1"
uberatomic "go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

ilog "go.temporal.io/sdk/internal/log"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -1402,3 +1405,45 @@ func serializeEvents(events []*historypb.HistoryEvent) *commonpb.DataBlob {
Data: blob.Data,
}
}

func TestClientCloseCount(t *testing.T) {
// Create primary client
server, err := startTestGRPCServer()
require.NoError(t, err)
defer server.Stop()
client, err := DialClient(ClientOptions{HostPort: server.addr})
require.NoError(t, err)
workflowClient := client.(*WorkflowClient)

// Confirm there is 1 unclosed client
require.EqualValues(t, 1, atomic.LoadInt32(workflowClient.unclosedClients))

// Create two more and confirm counts
client2, err := NewClientFromExisting(client, ClientOptions{})
require.NoError(t, err)
require.EqualValues(t, 2, atomic.LoadInt32(workflowClient.unclosedClients))
require.Same(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients)
client3, err := NewClientFromExisting(client, ClientOptions{})
require.NoError(t, err)
require.EqualValues(t, 3, atomic.LoadInt32(workflowClient.unclosedClients))
require.Same(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients)

// Close the third one 3 times and confirm counts and that connection not
// closed
client3.Close()
client3.Close()
client3.Close()
require.EqualValues(t, 2, atomic.LoadInt32(workflowClient.unclosedClients))
require.NotSame(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients)
require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown)

// Close the primary one and confirm not closed
client.Close()
require.EqualValues(t, 1, atomic.LoadInt32(client2.(*WorkflowClient).unclosedClients))
require.NotSame(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients)
require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown)

// Now close the last one (the second) and confirm it actually gets closed
client2.Close()
require.Equal(t, connectivity.Shutdown, workflowClient.conn.GetState())
}
1 change: 1 addition & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,7 @@ func (ts *IntegrationTestSuite) TestMultiNamespaceClient() {
// Make a new client with a different namespace and run again
newClient, err := client.NewClientFromExisting(ts.client, client.Options{Namespace: "some-other-namespace"})
ts.NoError(err)
defer newClient.Close()
_, _ = newClient.DescribeWorkflowExecution(ctx, "id-that-does-not-exist", "")

// Confirm there was no count change to other namespace but there is now a
Expand Down