Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add list configs for topic & sub #4607

Merged
merged 10 commits into from
Sep 8, 2021
22 changes: 21 additions & 1 deletion pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
durpb "google.golang.org/protobuf/types/known/durationpb"

vkit "cloud.google.com/go/pubsub/apiv1"
)

// Subscription is a reference to a PubSub subscription.
Expand Down Expand Up @@ -85,7 +87,8 @@ func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
Project: c.fullyQualifiedProjectName(),
})
return &SubscriptionIterator{
c: c,
c: c,
it: it,
next: func() (string, error) {
sub, err := it.Next()
if err != nil {
Expand All @@ -99,6 +102,7 @@ func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
// SubscriptionIterator is an iterator that returns a series of subscriptions.
type SubscriptionIterator struct {
c *Client
it *vkit.SubscriptionIterator
next func() (string, error)
}

Expand All @@ -111,6 +115,22 @@ func (subs *SubscriptionIterator) Next() (*Subscription, error) {
return &Subscription{c: subs.c, name: subName}, nil
}

// NextConfig returns the next subscription config. If there are no more subscriptions,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `SubscriptionIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error) {
spb, err := subs.it.Next()
if err != nil {
return nil, err
}
cfg, err := protoToSubscriptionConfig(spb, subs.c)
if err != nil {
return nil, err
}
return &cfg, nil
}

// PushConfig contains configuration for subscriptions that operate in push mode.
type PushConfig struct {
// A URL locating the endpoint to which messages should be pushed.
Expand Down
15 changes: 15 additions & 0 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ func TestListProjectSubscriptions(t *testing.T) {
if !testutil.Equal(got, want) {
t.Errorf("got %v, want %v", got, want)
}

// Call list again, but check the config this time.
it := c.Subscriptions(ctx)
for {
sub, err := it.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("SubscriptionIterator.NextConfig() got err: %v", err)
}
if got := sub.Topic.ID(); got != topic.ID() {
t.Errorf("subConfig.Topic mismatch, got: %v, want: %v", got, topic.ID())
}
}
}

func getSubIDs(subs []*Subscription) []string {
Expand Down
18 changes: 17 additions & 1 deletion pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -381,7 +382,8 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
func (c *Client) Topics(ctx context.Context) *TopicIterator {
it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
return &TopicIterator{
c: c,
c: c,
it: it,
next: func() (string, error) {
topic, err := it.Next()
if err != nil {
Expand All @@ -395,6 +397,7 @@ func (c *Client) Topics(ctx context.Context) *TopicIterator {
// TopicIterator is an iterator that returns a series of topics.
type TopicIterator struct {
c *Client
it *vkit.TopicIterator
next func() (string, error)
}

Expand All @@ -407,6 +410,19 @@ func (tps *TopicIterator) Next() (*Topic, error) {
return newTopic(tps.c, topicName), nil
}

// NextConfig returns the next topic config. If there are no more topics,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `TopicIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (t *TopicIterator) NextConfig() (*TopicConfig, error) {
tpb, err := t.it.Next()
if err != nil {
return nil, err
}
cfg := protoToTopicConfig(tpb)
return &cfg, nil
}

// ID returns the unique identifier of the topic within its project.
func (t *Topic) ID() string {
slash := strings.LastIndex(t.name, "/")
Expand Down
21 changes: 20 additions & 1 deletion pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,36 @@ func TestCreateTopicWithConfig(t *testing.T) {
}

func TestListTopics(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

var ids []string
for i := 1; i <= 4; i++ {
numTopics := 4
for i := 1; i <= numTopics; i++ {
id := fmt.Sprintf("t%d", i)
ids = append(ids, id)
mustCreateTopic(t, c, id)
}
checkTopicListing(t, c, ids)

var tt []*TopicConfig
it := c.Topics(ctx)
for {
topic, err := it.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("TopicIterator.NextConfig() got err: %v", err)
} else {
tt = append(tt, topic)
}
}
if got := len(tt); got != numTopics {
t.Errorf("c.Topics(ctx) returned %d topics, expected %d", got, numTopics)
}
}

func TestListCompletelyEmptyTopics(t *testing.T) {
Expand Down