Skip to content

Commit

Permalink
Merge pull request #1 from jjg-akers/publisher-ctx-cancellation
Browse files Browse the repository at this point in the history
added func to connect to gcloud pubsub with background ctx
  • Loading branch information
jjg-akers authored Jan 20, 2022
2 parents 312214d + e8e46d1 commit c7f7bd2
Showing 1 changed file with 37 additions and 2 deletions.
39 changes: 37 additions & 2 deletions pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (
ErrPublisherClosed = errors.New("publisher is closed")
// ErrTopicDoesNotExist happens when trying to publish or subscribe to a topic that doesn't exist.
ErrTopicDoesNotExist = errors.New("topic does not exist")
// ErrConnectTimeout happens when the Google Cloud PubSub connection context times out.
ErrConnectTimeout = errors.New("connect timeout")
)

type Publisher struct {
Expand Down Expand Up @@ -81,15 +83,48 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ
ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
defer cancel()

var err error
pub.client, err = pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...)
cc, errc, err := connect(ctx, config)
if err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, ErrConnectTimeout
case pub.client = <-cc:
case err = <-errc:
return nil, err
}

return pub, nil
}

func connect(ctx context.Context, config PublisherConfig) (<-chan *pubsub.Client, <-chan error, error) {
out := make(chan *pubsub.Client)
errc := make(chan error, 1)

go func() {
defer close(out)
defer close(errc)

// blocking
c, err := pubsub.NewClient(context.Background(), config.ProjectID, config.ClientOptions...)
if err != nil {
errc <- err
return
}
select {
case out <- c:
// ok, carry on
case <-ctx.Done():
return
}

}()

return out, errc, nil
}

// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
// It blocks until all the messages are successfully published or an error occurred.
//
Expand Down

0 comments on commit c7f7bd2

Please sign in to comment.