diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index 3a1fde5..acb39fa 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -18,6 +18,8 @@ var ( ErrPublisherClosed = errors.New("publisher is closed") // ErrTopicDoesNotExist happens when trying to publish or subscribe to a topic that doesn't exist. ErrTopicDoesNotExist = errors.New("topic does not exist") + // ErrConnectTimeout happens when the Google Cloud PubSub connection context times out. + ErrConnectTimeout = errors.New("connect timeout") ) type Publisher struct { @@ -81,15 +83,48 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout) defer cancel() - var err error - pub.client, err = pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...) + cc, errc, err := connect(ctx, config) if err != nil { return nil, err } + select { + case <-ctx.Done(): + return nil, ErrConnectTimeout + case pub.client = <-cc: + case err = <-errc: + return nil, err + } + return pub, nil } +func connect(ctx context.Context, config PublisherConfig) (<-chan *pubsub.Client, <-chan error, error) { + out := make(chan *pubsub.Client) + errc := make(chan error, 1) + + go func() { + defer close(out) + defer close(errc) + + // blocking + c, err := pubsub.NewClient(context.Background(), config.ProjectID, config.ClientOptions...) + if err != nil { + errc <- err + return + } + select { + case out <- c: + // ok, carry on + case <-ctx.Done(): + return + } + + }() + + return out, errc, nil +} + // Publish publishes a set of messages on a Google Cloud Pub/Sub topic. // It blocks until all the messages are successfully published or an error occurred. //