Skip to content

Commit

Permalink
Make client Dial context controlable by callers (#1416)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminaudier authored Mar 12, 2024
1 parent 3b9c572 commit 9c42221
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 64 deletions.
28 changes: 25 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,14 @@ var MetricsNopHandler = metrics.NopHandler
// to the server eagerly and will return an error if the server is not
// available.
func Dial(options Options) (Client, error) {
return internal.DialClient(options)
return DialContext(context.Background(), options)
}

// DialContext creates an instance of a workflow client. This will attempt to connect
// to the server eagerly and will return an error if the server is not
// available. Connection will respect provided context deadlines and cancellations.
func DialContext(ctx context.Context, options Options) (Client, error) {
return internal.DialClient(ctx, options)
}

// NewLazyClient creates an instance of a workflow client. Unlike Dial, this
Expand All @@ -687,7 +694,7 @@ func NewLazyClient(options Options) (Client, error) {
//
// Deprecated: Use Dial or NewLazyClient instead.
func NewClient(options Options) (Client, error) {
return internal.NewClient(options)
return internal.NewClient(context.Background(), options)
}

// NewClientFromExisting creates a new client using the same connection as the
Expand All @@ -702,7 +709,22 @@ func NewClient(options Options) (Client, error) {
// associated with the existing client must call Close() and only the last one
// actually performs the connection close.
func NewClientFromExisting(existingClient Client, options Options) (Client, error) {
return internal.NewClientFromExisting(existingClient, options)
return NewClientFromExistingWithContext(context.Background(), existingClient, options)
}

// NewClientFromExistingWithContext creates a new client using the same connection as the
// existing client. This means all options.ConnectionOptions are ignored and
// options.HostPort is ignored. The existing client must have been created from
// this package and cannot be wrapped. Currently, this always attempts an eager
// connection even if the existing client was created with NewLazyClient and has
// not made any calls yet.
//
// Close() on the resulting client may not necessarily close the underlying
// connection if there are any other clients using the connection. All clients
// associated with the existing client must call Close() and only the last one
// actually performs the connection close.
func NewClientFromExistingWithContext(ctx context.Context, existingClient Client, options Options) (Client, error) {
return internal.NewClientFromExisting(ctx, existingClient, options)
}

// NewNamespaceClient creates an instance of a namespace client, to manage
Expand Down
24 changes: 14 additions & 10 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ type (
// default: 15s
KeepAliveTimeout time.Duration

// GetSystemInfoTimeout is the timeout for the RPC made by the
// client to fetch server capabilities.
GetSystemInfoTimeout time.Duration

// if true, when there are no active RPCs, Time and Timeout will be ignored and no
// keepalive pings will be sent.
// If false, client sends keepalive pings even with no active RPCs
Expand Down Expand Up @@ -715,35 +719,35 @@ type Credentials interface {
}

// DialClient creates a client and attempts to connect to the server.
func DialClient(options ClientOptions) (Client, error) {
func DialClient(ctx context.Context, options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = false
return NewClient(options)
return NewClient(ctx, options)
}

// NewLazyClient creates a client and does not attempt to connect to the server.
func NewLazyClient(options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = true
return NewClient(options)
return NewClient(context.Background(), options)
}

// NewClient creates an instance of a workflow client
//
// Deprecated: Use DialClient or NewLazyClient instead.
func NewClient(options ClientOptions) (Client, error) {
return newClient(options, nil)
func NewClient(ctx context.Context, options ClientOptions) (Client, error) {
return newClient(ctx, options, nil)
}

// NewClientFromExisting creates a new client using the same connection as the
// existing client.
func NewClientFromExisting(existingClient Client, options ClientOptions) (Client, error) {
func NewClientFromExisting(ctx context.Context, existingClient Client, options ClientOptions) (Client, error) {
existing, _ := existingClient.(*WorkflowClient)
if existing == nil {
return nil, fmt.Errorf("existing client must have been created directly from a client package call")
}
return newClient(options, existing)
return newClient(ctx, options, existing)
}

func newClient(options ClientOptions, existing *WorkflowClient) (Client, error) {
func newClient(ctx context.Context, options ClientOptions, existing *WorkflowClient) (Client, error) {
if options.Namespace == "" {
options.Namespace = DefaultNamespace
}
Expand Down Expand Up @@ -788,13 +792,13 @@ func newClient(options ClientOptions, existing *WorkflowClient) (Client, error)
// the new connection. Otherwise, only load server capabilities eagerly if not
// disabled.
if existing != nil {
if client.capabilities, err = existing.loadCapabilities(); err != nil {
if client.capabilities, err = existing.loadCapabilities(ctx, options.ConnectionOptions.GetSystemInfoTimeout); err != nil {
return nil, err
}
client.unclosedClients = existing.unclosedClients
} else {
if !options.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(); err != nil {
if _, err := client.loadCapabilities(ctx, options.ConnectionOptions.GetSystemInfoTimeout); err != nil {
client.Close()
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestMissingGetServerInfo(t *testing.T) {
require.NoError(t, lastErr)

// Create a new client and confirm client has empty capabilities set
client, err := DialClient(ClientOptions{HostPort: l.Addr().String()})
client, err := DialClient(context.Background(), ClientOptions{HostPort: l.Addr().String()})
require.NoError(t, err)
workflowClient := client.(*WorkflowClient)
require.True(t, proto.Equal(&workflowservice.GetSystemInfoResponse_Capabilities{}, workflowClient.capabilities))
Expand All @@ -192,7 +192,7 @@ func TestInternalErrorRetry(t *testing.T) {
srv.signalWorkflowExecutionResponseError = status.Error(codes.Internal, "oh no, an internal error")

// Create client and make call
client, err := DialClient(ClientOptions{HostPort: srv.addr})
client, err := DialClient(context.Background(), ClientOptions{HostPort: srv.addr})
require.NoError(t, err)
defer client.Close()
_, err = client.WorkflowService().SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{})
Expand All @@ -213,7 +213,7 @@ func TestInternalErrorRetry(t *testing.T) {
srv.signalWorkflowExecutionResponseError = status.Error(codes.Internal, "oh no, an internal error")

// Create client and make call
client, err = DialClient(ClientOptions{HostPort: srv.addr})
client, err = DialClient(context.Background(), ClientOptions{HostPort: srv.addr})
require.NoError(t, err)
defer client.Close()
_, err = client.WorkflowService().SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{})
Expand All @@ -231,7 +231,7 @@ func TestEagerAndLazyClient(t *testing.T) {
srv.getSystemInfoResponseError = fmt.Errorf("some server failure")

// Confirm eager dial fails
_, err = DialClient(ClientOptions{HostPort: srv.addr})
_, err = DialClient(context.Background(), ClientOptions{HostPort: srv.addr})
require.EqualError(t, err, "failed reaching server: some server failure")

// Confirm lazy dial succeeds but fails signal workflow
Expand All @@ -247,7 +247,7 @@ func TestEagerAndLazyClient(t *testing.T) {
require.NoError(t, err)

// Now that there's no sys info response error, eager should succeed
c, err = DialClient(ClientOptions{HostPort: srv.addr})
c, err = DialClient(context.Background(), ClientOptions{HostPort: srv.addr})
require.NoError(t, err)
defer c.Close()

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestDialOptions(t *testing.T) {
return invoker(ctx, method, req, reply, cc, opts...)
}
}
client, err := DialClient(ClientOptions{
client, err := DialClient(context.Background(), ClientOptions{
HostPort: srv.addr,
ConnectionOptions: ConnectionOptions{
DialOptions: []grpc.DialOption{
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestCustomResolver(t *testing.T) {
builder := manual.NewBuilderWithScheme(scheme)
builder.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: s1.addr}, {Addr: s2.addr}}})
resolver.Register(builder)
client, err := DialClient(ClientOptions{HostPort: scheme + ":///whatever"})
client, err := DialClient(context.Background(), ClientOptions{HostPort: scheme + ":///whatever"})
require.NoError(t, err)
defer client.Close()

Expand Down Expand Up @@ -417,12 +417,12 @@ func TestResourceExhaustedCause(t *testing.T) {
Cause: enums.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT,
})
srv.getSystemInfoResponseError = s.Err()
_, err = DialClient(ClientOptions{HostPort: srv.addr, MetricsHandler: handler})
_, err = DialClient(context.Background(), ClientOptions{HostPort: srv.addr, MetricsHandler: handler})
require.Error(t, err)

// Attempt dial with a cause-less resource exhausted
srv.getSystemInfoResponseError = status.New(codes.ResourceExhausted, "some resource exhausted").Err()
_, err = DialClient(ClientOptions{HostPort: srv.addr, MetricsHandler: handler})
_, err = DialClient(context.Background(), ClientOptions{HostPort: srv.addr, MetricsHandler: handler})
require.Error(t, err)

// Make sure we have 1 metric with cause and 1 without
Expand All @@ -445,7 +445,7 @@ func TestCredentialsAPIKey(t *testing.T) {
defer srv.Stop()

// Fixed string
client, err := DialClient(ClientOptions{
client, err := DialClient(context.Background(), ClientOptions{
HostPort: srv.addr,
Credentials: NewAPIKeyStaticCredentials("my-api-key"),
})
Expand All @@ -470,7 +470,7 @@ func TestCredentialsAPIKey(t *testing.T) {
)

// Callback
client, err = DialClient(ClientOptions{
client, err = DialClient(context.Background(), ClientOptions{
HostPort: srv.addr,
Credentials: NewAPIKeyDynamicCredentials(func(ctx context.Context) (string, error) {
return "my-callback-api-key", nil
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche
}

func (sc *scheduleClient) Create(ctx context.Context, options ScheduleOptions) (ScheduleHandle, error) {
if err := sc.workflowClient.ensureInitialized(); err != nil {
if err := sc.workflowClient.ensureInitialized(ctx); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,11 +943,11 @@ func (aw *AggregatedWorker) Start() error {
aw.assertNotStopped()
if err := initBinaryChecksum(); err != nil {
return fmt.Errorf("failed to get executable checksum: %v", err)
} else if err = aw.client.ensureInitialized(); err != nil {
} else if err = aw.client.ensureInitialized(context.Background()); err != nil {
return err
}
// Populate the capabilities. This should be the only time it is written too.
capabilities, err := aw.client.loadCapabilities()
capabilities, err := aw.client.loadCapabilities(context.Background(), defaultGetSystemInfoTimeout)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 9c42221

Please sign in to comment.