Skip to content

Commit

Permalink
Adds pubsub.ClientConfig to the configuration (#35)
Browse files Browse the repository at this point in the history
The pubsub client has more configuration options that what was being
exposed by the watermill configuration structs.

This adds the missing pubsub.ClientConfig struct to the watermill
configuration structs and changes the client initialization to
NewClientWithConfig instead of NewClient.
  • Loading branch information
rjfonseca authored Oct 16, 2024
1 parent 59d9adc commit 63b382e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
3 changes: 2 additions & 1 deletion pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type PublisherConfig struct {
// Settings for cloud.google.com/go/pubsub client library.
PublishSettings *pubsub.PublishSettings
ClientOptions []option.ClientOption
ClientConfig *pubsub.ClientConfig

Marshaler Marshaler
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func connect(ctx context.Context, config PublisherConfig) (<-chan *pubsub.Client
defer close(errc)

// blocking
c, err := pubsub.NewClient(context.Background(), config.ProjectID, config.ClientOptions...)
c, err := pubsub.NewClientWithConfig(context.Background(), config.ProjectID, config.ClientConfig, config.ClientOptions...)
if err != nil {
errc <- err
return
Expand Down
3 changes: 2 additions & 1 deletion pkg/googlecloud/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type SubscriberConfig struct {
ReceiveSettings pubsub.ReceiveSettings
SubscriptionConfig pubsub.SubscriptionConfig
ClientOptions []option.ClientOption
ClientConfig *pubsub.ClientConfig

// Unmarshaler transforms the client library format into watermill/message.Message.
// Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
Expand Down Expand Up @@ -419,7 +420,7 @@ func (s *Subscriber) subscription(ctx context.Context, subscriptionName, topicNa
}

func (s *Subscriber) newClient(ctx context.Context) (*pubsub.Client, error) {
client, err := pubsub.NewClient(ctx, s.config.ProjectID, s.config.ClientOptions...)
client, err := pubsub.NewClientWithConfig(ctx, s.config.ProjectID, s.config.ClientConfig, s.config.ClientOptions...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 63b382e

Please sign in to comment.