Skip to content

Commit

Permalink
pubsub: implement on top of generated client
Browse files Browse the repository at this point in the history
The pubsub client uses the generated client, and therefore gRPC.

This is not a breaking change, but it may have performance implications
if you create many pubsub.Clients. The current version of gRPC does not
reuse connections, so each Client manages its own TCP connection. You
should create clients sparingly and call the Close method on them when
done (although closing on program termination is not necessary).

The biggest change was in the iterators. I made the smallest changes
that kept everything working. There will be more changes soon as
the generated code adopts the latest iterator design, and that
trickles into this client.

Change-Id: I6c4c1f4b50b52ff8f1d1ff403e2979cd81c6c7ab
Reviewed-on: https://code-review.googlesource.com/7911
Reviewed-by: Ross Light <light@google.com>
  • Loading branch information
jba committed Oct 13, 2016
1 parent fa279f9 commit b9d51d5
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 310 deletions.
2 changes: 1 addition & 1 deletion pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newMessageIterator(ctx context.Context, s service, subName string, po *pull
Notify: ka.Remove,
}

pull := newPuller(s, subName, ctx, int64(po.maxPrefetch), ka.Add, ka.Remove)
pull := newPuller(s, subName, ctx, po.maxPrefetch, ka.Add, ka.Remove)

ka.Start()
ack.Start()
Expand Down
6 changes: 3 additions & 3 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type blockingFetch struct {
service
}

func (s *blockingFetch) fetchMessages(ctx context.Context, subName string, maxMessages int64) ([]*Message, error) {
func (s *blockingFetch) fetchMessages(ctx context.Context, subName string, maxMessages int32) ([]*Message, error) {
<-ctx.Done()
return nil, ctx.Err()
}
Expand All @@ -88,7 +88,7 @@ type justInTimeFetch struct {
service
}

func (s *justInTimeFetch) fetchMessages(ctx context.Context, subName string, maxMessages int64) ([]*Message, error) {
func (s *justInTimeFetch) fetchMessages(ctx context.Context, subName string, maxMessages int32) ([]*Message, error) {
<-ctx.Done()
// The context was cancelled, but let's pretend that this happend just after our RPC returned.

Expand Down Expand Up @@ -154,7 +154,7 @@ func TestAfterAbortReturnsNoMoreThanOneMessage(t *testing.T) {
po := &pullOptions{
ackDeadline: time.Second * 10,
maxExtension: time.Hour,
maxPrefetch: n,
maxPrefetch: int32(n),
}
it := newMessageIterator(ctx, s, "subname", po)
defer it.Stop()
Expand Down
15 changes: 6 additions & 9 deletions pubsub/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package pubsub

import (
"encoding/base64"
"time"

raw "google.golang.org/api/pubsub/v1"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
)

// Message represents a Pub/Sub message.
Expand Down Expand Up @@ -49,21 +49,18 @@ type Message struct {
it *MessageIterator
}

func toMessage(resp *raw.ReceivedMessage) (*Message, error) {
func toMessage(resp *pb.ReceivedMessage) (*Message, error) {
if resp.Message == nil {
return &Message{ackID: resp.AckId}, nil
}
data, err := base64.StdEncoding.DecodeString(resp.Message.Data)
if err != nil {
return nil, err
}
pubTime, err := time.Parse(time.RFC3339, resp.Message.PublishTime)

pubTime, err := ptypes.Timestamp(resp.Message.PublishTime)
if err != nil {
return nil, err
}
return &Message{
ackID: resp.AckId,
Data: data,
Data: resp.Message.Data,
Attributes: resp.Message.Attributes,
ID: resp.Message.MessageId,
PublishTime: pubTime,
Expand Down
32 changes: 10 additions & 22 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ package pubsub // import "cloud.google.com/go/pubsub"

import (
"fmt"
"net/http"
"os"

"google.golang.org/api/iterator"
"google.golang.org/api/option"
raw "google.golang.org/api/pubsub/v1"
"google.golang.org/api/transport"
"google.golang.org/grpc"

"golang.org/x/net/context"
)
Expand All @@ -38,7 +36,7 @@ const (
)

const prodAddr = "https://pubsub.googleapis.com/"
const userAgent = "gcloud-golang-pubsub/20151008"
const userAgent = "gcloud-golang-pubsub/20160927"

// Client is a Google Pub/Sub client scoped to a single project.
//
Expand All @@ -53,26 +51,18 @@ type Client struct {
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
var o []option.ClientOption
// Environment variables for gcloud emulator:
// https://option.google.com/sdk/gcloud/reference/beta/emulators/pubsub/
// https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/
if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" {
o = []option.ClientOption{
option.WithEndpoint("http://" + addr + "/"),
option.WithHTTPClient(http.DefaultClient),
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, fmt.Errorf("grpc.Dial: %v", err)
}
o = []option.ClientOption{option.WithGRPCConn(conn)}
} else {
o = []option.ClientOption{
option.WithEndpoint(prodAddr),
option.WithScopes(raw.PubsubScope, raw.CloudPlatformScope),
option.WithUserAgent(userAgent),
}
o = []option.ClientOption{option.WithUserAgent(userAgent)}
}
o = append(o, opts...)
httpClient, endpoint, err := transport.NewHTTPClient(ctx, o...)
if err != nil {
return nil, fmt.Errorf("dialing: %v", err)
}

s, err := newPubSubService(httpClient, endpoint)
s, err := newPubSubService(ctx, o)
if err != nil {
return nil, fmt.Errorf("constructing pubsub client: %v", err)
}
Expand All @@ -89,9 +79,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
//
// Close need not be called at program exit.
func (c *Client) Close() error {
// Nothing now, but will close a connection when
// we move to gRPC.
return nil
return c.s.close()
}

func (c *Client) fullyQualifiedProjectName() string {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type puller struct {
// newPuller constructs a new puller.
// batchSize is the maximum number of messages to fetch at once.
// No more than batchSize messages will be outstanding at any time.
func newPuller(s service, subName string, ctx context.Context, batchSize int64, keepAlive, abandon func(ackID string)) *puller {
func newPuller(s service, subName string, ctx context.Context, batchSize int32, keepAlive, abandon func(ackID string)) *puller {
ctx, cancel := context.WithCancel(ctx)
return &puller{
cancel: cancel,
Expand Down
2 changes: 1 addition & 1 deletion pubsub/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type fetcherService struct {
unexpectedCall bool
}

func (s *fetcherService) fetchMessages(ctx context.Context, subName string, maxMessages int64) ([]*Message, error) {
func (s *fetcherService) fetchMessages(ctx context.Context, subName string, maxMessages int32) ([]*Message, error) {
if len(s.results) == 0 {
s.unexpectedCall = true
return nil, errors.New("bang")
Expand Down
Loading

0 comments on commit b9d51d5

Please sign in to comment.