-
Notifications
You must be signed in to change notification settings - Fork 216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle Client.Close on cloned clients #893
Changes from 5 commits
3d211c4
7ed26b8
82071cd
e109a53
0c29894
2098f45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,8 +29,10 @@ import ( | |
"errors" | ||
"fmt" | ||
"io" | ||
"math" | ||
"reflect" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/pborman/uuid" | ||
|
@@ -83,6 +85,10 @@ type ( | |
excludeInternalFromRetry *uberatomic.Bool | ||
capabilities *workflowservice.GetSystemInfoResponse_Capabilities | ||
capabilitiesLock sync.RWMutex | ||
|
||
// The pointer value is shared across multiple clients. If non-nil, only | ||
// access/mutate atomically. | ||
unclosedClients *int32 | ||
} | ||
|
||
// namespaceClient is the client for managing namespaces. | ||
|
@@ -942,11 +948,25 @@ func (wc *WorkflowClient) ensureInitialized() error { | |
|
||
// Close client and clean up underlying resources. | ||
func (wc *WorkflowClient) Close() { | ||
if wc.conn == nil { | ||
return | ||
// If there's a set of unclosed clients, we have to decrement it and then | ||
// set it to a new pointer of max to prevent decrementing on repeated Close | ||
// calls to this client. If the count has not reached zero, this close call is | ||
// ignored. | ||
if wc.unclosedClients != nil { | ||
remainingUnclosedClients := atomic.AddInt32(wc.unclosedClients, -1) | ||
// Set the unclosed clients to max value so we never try this again | ||
var maxUnclosedClients int32 = math.MaxInt32 | ||
wc.unclosedClients = &maxUnclosedClients | ||
// If there are any remaining, do not close | ||
if remainingUnclosedClients > 0 { | ||
return | ||
} | ||
} | ||
if err := wc.conn.Close(); err != nil { | ||
wc.logger.Warn("unable to close connection", tagError, err) | ||
|
||
if wc.conn != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when is this nil? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test cases w/ mocks. This logic has not changed, I just moved the condition from the top of the function to down here. |
||
if err := wc.conn.Close(); err != nil { | ||
wc.logger.Warn("unable to close connection", tagError, err) | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,10 +34,12 @@ import ( | |
workflowpb "go.temporal.io/api/workflow/v1" | ||
uberatomic "go.uber.org/atomic" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/connectivity" | ||
|
||
ilog "go.temporal.io/sdk/internal/log" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/stretchr/testify/require" | ||
"github.com/stretchr/testify/suite" | ||
commonpb "go.temporal.io/api/common/v1" | ||
enumspb "go.temporal.io/api/enums/v1" | ||
|
@@ -1402,3 +1404,45 @@ func serializeEvents(events []*historypb.HistoryEvent) *commonpb.DataBlob { | |
Data: blob.Data, | ||
} | ||
} | ||
|
||
func TestClientCloseCount(t *testing.T) { | ||
// Create primary client | ||
server, err := startTestGRPCServer() | ||
require.NoError(t, err) | ||
defer server.Stop() | ||
client, err := DialClient(ClientOptions{HostPort: server.addr}) | ||
require.NoError(t, err) | ||
workflowClient := client.(*WorkflowClient) | ||
|
||
// Confirm there is 1 unclosed client | ||
require.EqualValues(t, 1, *workflowClient.unclosedClients) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is in a test case. I can trust my lack of race issues in my own test case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would do it just for consistency's sake. It's supposed to be an atomic int, it should be manipulated atomically even if Go lets you not do that. Trivial amount of extra typing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also access the pointers directly in a non-atomic way to compare they point to the same spot. But I will make the change. |
||
|
||
// Create two more and confirm counts | ||
client2, err := NewClientFromExisting(client, ClientOptions{}) | ||
require.NoError(t, err) | ||
require.EqualValues(t, 2, *workflowClient.unclosedClients) | ||
require.Same(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients) | ||
client3, err := NewClientFromExisting(client, ClientOptions{}) | ||
require.NoError(t, err) | ||
require.EqualValues(t, 3, *workflowClient.unclosedClients) | ||
require.Same(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients) | ||
|
||
// Close the third one 3 times and confirm counts and that connection not | ||
// closed | ||
client3.Close() | ||
client3.Close() | ||
client3.Close() | ||
require.EqualValues(t, 2, *workflowClient.unclosedClients) | ||
require.NotSame(t, workflowClient.unclosedClients, client3.(*WorkflowClient).unclosedClients) | ||
require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown) | ||
|
||
// Close the primary one and confirm not closed | ||
client.Close() | ||
require.EqualValues(t, 1, *client2.(*WorkflowClient).unclosedClients) | ||
require.NotSame(t, workflowClient.unclosedClients, client2.(*WorkflowClient).unclosedClients) | ||
require.Less(t, workflowClient.conn.GetState(), connectivity.Shutdown) | ||
|
||
// Now close the last one (the second) and confirm it actually gets closed | ||
client2.Close() | ||
require.Equal(t, connectivity.Shutdown, workflowClient.conn.GetState()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't ever be nil, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot trust all places that create a client will set this. That includes tests and other code paths.