Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share connection across different clients #881

Merged
merged 3 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ func NewClient(options Options) (Client, error) {
return internal.NewClient(options)
}

// NewClientFromExisting creates a new client using the same connection as the
// existing client. This means all options.ConnectionOptions are ignored and
// options.HostPort is ignored. The existing client must have been created from
// this package and cannot be wrapped. Currently, this always attempts an eager
// connection even if the existing client was created with NewLazyClient and has
// not made any calls yet.
func NewClientFromExisting(existingClient Client, options Options) (Client, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to make a narrower options type here with just the stuff that won't be ignored.

If that can be re-used in a backwards compat way inside the existing options, that's gravy. If not it might still be worth it just to make this less potentially confusing, but up to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory I like splitting the struct into "connection options" and "other options" but I don't think it's worth it for this feature that most people won't ever use

Copy link
Member Author

@cretz cretz Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to make a narrower options type here with just the stuff that won't be ignored.

This can't be done in a backwards compatible way. I'd have to make a whole new struct. This would entail just copying all but two fields of the existing struct just for this method and hoping I properly keep them and their docs in sync when adding more. I think it's reasonable to reuse these options explaining which are unused.

return internal.NewClientFromExisting(existingClient, options)
}

// NewNamespaceClient creates an instance of a namespace client, to manage
// lifecycle of namespaces. This will not attempt to connect to the server
// eagerly and therefore may not fail for an unreachable server until a call is
Expand Down
40 changes: 34 additions & 6 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package internal
import (
"context"
"crypto/tls"
"fmt"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -633,6 +634,20 @@ func NewLazyClient(options ClientOptions) (Client, error) {
//
// Deprecated: Use DialClient or NewLazyClient instead.
func NewClient(options ClientOptions) (Client, error) {
return newClient(options, nil)
}

// NewClientFromExisting creates a new client using the same connection as the
// existing client.
func NewClientFromExisting(existingClient Client, options ClientOptions) (Client, error) {
existing, _ := existingClient.(*WorkflowClient)
if existing == nil {
return nil, fmt.Errorf("existing client must have been created directly from a client package call")
}
return newClient(options, existing)
}

func newClient(options ClientOptions, existing *WorkflowClient) (Client, error) {
if options.Namespace == "" {
options.Namespace = DefaultNamespace
}
Expand All @@ -652,16 +667,29 @@ func NewClient(options ClientOptions) (Client, error) {
options.Logger.Info("No logger configured for temporal client. Created default one.")
}

options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
connection, err := dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
// Dial or use existing connection
var connection *grpc.ClientConn
var err error
if existing == nil {
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
connection, err = dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}
} else {
connection = existing.conn
}

client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options)

// Load server capabilities eagerly if not disabled
if !options.ConnectionOptions.disableEagerConnection {
// If using existing connection, always load its capabilities and use them for
// the new connection. Otherwise, only load server capabilities eagerly if not
// disabled.
if existing != nil {
if client.capabilities, err = existing.loadCapabilities(); err != nil {
return nil, err
}
} else if !options.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(); err != nil {
client.Close()
return nil, err
Expand Down
10 changes: 9 additions & 1 deletion internal/common/metrics/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,17 @@ func NewGRPCInterceptor(defaultHandler Handler, suffix string) grpc.UnaryClientI

// Only take method name after the last slash
operation := method[strings.LastIndex(method, "/")+1:]
handler = handler.WithTags(map[string]string{OperationTagName: operation})
tags := map[string]string{OperationTagName: operation}

// Since this interceptor can be used for clients of different name, we
// attempt to extract the namespace out of the request. All namespace-based
// requests have been confirmed to have a top-level namespace field.
Comment on lines +64 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this deserve a comment in the proto file? I mean I wouldn't do a PR just for that, but bundle it into your next api PR? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if I remember on my next one I'll add something like "Some clients expect all namespace-based RPC calls to have a single namespace string field in the request".

if nsReq, _ := req.(interface{ GetNamespace() string }); nsReq != nil {
tags[NamespaceTagName] = nsReq.GetNamespace()
}

// Capture time, record start, run, and record end
handler = handler.WithTags(tags)
start := time.Now()
recordRequestStart(handler, longPoll, suffix)
err := invoker(ctx, method, req, reply, cc, opts...)
Expand Down
30 changes: 30 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2456,6 +2456,36 @@ func (ts *IntegrationTestSuite) TestHistoryLength() {
ts.Equal(expected, actual)
}

func (ts *IntegrationTestSuite) TestMultiNamespaceClient() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Make simple call to describe an execution
_, _ = ts.client.DescribeWorkflowExecution(ctx, "id-that-does-not-exist", "")

// Confirm count on our namespace but not on the other
ts.assertMetricCount(metrics.TemporalRequest, 1,
metrics.OperationTagName, "DescribeWorkflowExecution",
metrics.NamespaceTagName, ts.config.Namespace)
ts.assertMetricCount(metrics.TemporalRequest, 0,
metrics.OperationTagName, "DescribeWorkflowExecution",
metrics.NamespaceTagName, "some-other-namespace")

// Make a new client with a different namespace and run again
newClient, err := client.NewClientFromExisting(ts.client, client.Options{Namespace: "some-other-namespace"})
ts.NoError(err)
_, _ = newClient.DescribeWorkflowExecution(ctx, "id-that-does-not-exist", "")

// Confirm there was no count change to other namespace but there is now a
// request for this one
ts.assertMetricCount(metrics.TemporalRequest, 1,
metrics.OperationTagName, "DescribeWorkflowExecution",
metrics.NamespaceTagName, ts.config.Namespace)
ts.assertMetricCount(metrics.TemporalRequest, 1,
metrics.OperationTagName, "DescribeWorkflowExecution",
metrics.NamespaceTagName, "some-other-namespace")
}

func (ts *IntegrationTestSuite) TestHeartbeatThrottleDisabled() {
// Heartbeat 4 times, 100ms apart
ts.NoError(ts.executeWorkflow("test-heartbeat-throttle-disabled-1", ts.workflows.HeartbeatSpecificCount, nil,
Expand Down