Skip to content

Commit

Permalink
feat(pubsub): add support for cloud storage ingestion topics (#10959)
Browse files Browse the repository at this point in the history
* feat(pubsub): add support for cloud storage ingestion topics

* use getters for go protos

* make interface private, add export strings

* fix comment on pubsub avro format
  • Loading branch information
hongalex authored Oct 8, 2024
1 parent 3377a3c commit 1a11675
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 0 deletions.
156 changes: 156 additions & 0 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -350,6 +351,9 @@ type TopicConfigToUpdate struct {
// IngestionDataSourceSettings are settings for ingestion from a
// data source into this topic.
//
// When changing this value, the entire data source settings object must be applied,
// rather than just the differences.
//
// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
IngestionDataSourceSettings *IngestionDataSourceSettings
}
Expand Down Expand Up @@ -495,6 +499,97 @@ func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
return true
}

// CloudStorageIngestionState denotes the possible states for ingestion from Cloud Storage.
type CloudStorageIngestionState int

const (
// CloudStorageIngestionStateUnspecified is the default value. This value is unused.
CloudStorageIngestionStateUnspecified = iota

// CloudStorageIngestionStateActive means ingestion is active.
CloudStorageIngestionStateActive

// CloudStorageIngestionPermissionDenied means encountering an error while calling the Cloud Storage API.
// This can happen if the Pub/Sub SA has not been granted the
// [appropriate permissions](https://cloud.google.com/storage/docs/access-control/iam-permissions):
// - storage.objects.list: to list the objects in a bucket.
// - storage.objects.get: to read the objects in a bucket.
// - storage.buckets.get: to verify the bucket exists.
CloudStorageIngestionPermissionDenied

// CloudStorageIngestionPublishPermissionDenied means encountering an error when publishing to the topic.
// This can happen if the Pub/Sub SA has not been granted the [appropriate publish
// permissions](https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher)
CloudStorageIngestionPublishPermissionDenied

// CloudStorageIngestionBucketNotFound means the provided bucket doesn't exist.
CloudStorageIngestionBucketNotFound

// CloudStorageIngestionTooManyObjects means the bucket has too many objects, ingestion will be paused.
CloudStorageIngestionTooManyObjects
)

// IngestionDataSourceCloudStorage are ingestion settings for Cloud Storage.
type IngestionDataSourceCloudStorage struct {
// State is an output-only field indicating the state of the Cloud storage ingestion source.
State CloudStorageIngestionState

// Bucket is the Cloud Storage bucket. The bucket name must be without any
// prefix like "gs://". See the bucket naming requirements (https://cloud.google.com/storage/docs/buckets#naming).
Bucket string

// InputFormat is the format of objects in Cloud Storage.
// Defaults to TextFormat.
InputFormat ingestionDataSourceCloudStorageInputFormat

// MinimumObjectCreateTime means objects with a larger or equal creation timestamp will be
// ingested.
MinimumObjectCreateTime time.Time

// MatchGlob is the pattern used to match objects that will be ingested. If
// empty, all objects will be ingested. See the [supported
// patterns](https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-objects-and-prefixes-using-glob).
MatchGlob string
}

var _ IngestionDataSource = (*IngestionDataSourceCloudStorage)(nil)

func (i *IngestionDataSourceCloudStorage) isIngestionDataSource() bool {
return true
}

type ingestionDataSourceCloudStorageInputFormat interface {
isCloudStorageIngestionInputFormat() bool
}

var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStorageTextFormat)(nil)
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStorageAvroFormat)(nil)
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStoragePubSubAvroFormat)(nil)

// IngestionDataSourceCloudStorageTextFormat means Cloud Storage data will be interpreted as text.
type IngestionDataSourceCloudStorageTextFormat struct {
Delimiter string
}

func (i *IngestionDataSourceCloudStorageTextFormat) isCloudStorageIngestionInputFormat() bool {
return true
}

// IngestionDataSourceCloudStorageAvroFormat means Cloud Storage data will be interpreted in Avro format.
type IngestionDataSourceCloudStorageAvroFormat struct{}

func (i *IngestionDataSourceCloudStorageAvroFormat) isCloudStorageIngestionInputFormat() bool {
return true
}

// IngestionDataSourceCloudStoragePubSubAvroFormat is used assuming the data was written using Cloud
// Storage subscriptions https://cloud.google.com/pubsub/docs/cloudstorage.
type IngestionDataSourceCloudStoragePubSubAvroFormat struct{}

func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestionInputFormat() bool {
return true
}

func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
if pbs == nil {
return nil
Expand All @@ -509,6 +604,25 @@ func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *In
AWSRoleARN: k.GetAwsRoleArn(),
GCPServiceAccount: k.GetGcpServiceAccount(),
}
} else if cs := pbs.GetCloudStorage(); cs != nil {
var format ingestionDataSourceCloudStorageInputFormat
switch t := cs.InputFormat.(type) {
case *pb.IngestionDataSourceSettings_CloudStorage_TextFormat_:
format = &IngestionDataSourceCloudStorageTextFormat{
Delimiter: *t.TextFormat.Delimiter,
}
case *pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_:
format = &IngestionDataSourceCloudStorageAvroFormat{}
case *pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat:
format = &IngestionDataSourceCloudStoragePubSubAvroFormat{}
}
s.Source = &IngestionDataSourceCloudStorage{
State: CloudStorageIngestionState(cs.GetState()),
Bucket: cs.GetBucket(),
InputFormat: format,
MinimumObjectCreateTime: cs.GetMinimumObjectCreateTime().AsTime(),
MatchGlob: cs.GetMatchGlob(),
}
}
return s
}
Expand All @@ -534,6 +648,48 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
},
}
}
if cs, ok := out.(*IngestionDataSourceCloudStorage); ok {
switch format := cs.InputFormat.(type) {
case *IngestionDataSourceCloudStorageTextFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
TextFormat: &pb.IngestionDataSourceSettings_CloudStorage_TextFormat{
Delimiter: &format.Delimiter,
},
},
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
MatchGlob: cs.MatchGlob,
},
}
case *IngestionDataSourceCloudStorageAvroFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_{
AvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat{},
},
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
MatchGlob: cs.MatchGlob,
},
}
case *IngestionDataSourceCloudStoragePubSubAvroFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat{
PubsubAvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_PubSubAvroFormat{},
},
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
MatchGlob: cs.MatchGlob,
},
}
}
}
}
return pbs
}
Expand Down
59 changes: 59 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,65 @@ func TestTopic_IngestionKinesis(t *testing.T) {
}
}

func TestTopic_IngestionCloudStorage(t *testing.T) {
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

id := "test-topic-storage-ingestion"
want := TopicConfig{
IngestionDataSourceSettings: &IngestionDataSourceSettings{
Source: &IngestionDataSourceCloudStorage{
Bucket: "fake-bucket",
InputFormat: &IngestionDataSourceCloudStorageTextFormat{
Delimiter: ",",
},
MinimumObjectCreateTime: time.Now().Add(-time.Hour),
MatchGlob: "**.txt",
},
},
}

topic := mustCreateTopicWithConfig(t, c, id, &want)
got, err := topic.Config(context.Background())
if err != nil {
t.Fatalf("error getting topic config: %v", err)
}
want.State = TopicStateActive
opt := cmpopts.IgnoreUnexported(TopicConfig{})
if !testutil.Equal(got, want, opt) {
t.Errorf("got %v, want %v", got, want)
}

// Update ingestion settings.
ctx := context.Background()
settings := &IngestionDataSourceSettings{
Source: &IngestionDataSourceCloudStorage{
Bucket: "fake-bucket-2",
InputFormat: &IngestionDataSourceCloudStoragePubSubAvroFormat{},
MinimumObjectCreateTime: time.Now().Add(-2 * time.Hour),
MatchGlob: "**.txt",
},
}
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) {
t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings)
}

// Clear ingestion settings.
settings = &IngestionDataSourceSettings{}
config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
t.Fatal(err)
}
if config3.IngestionDataSourceSettings != nil {
t.Errorf("got: %+v, want nil", config3.IngestionDataSourceSettings)
}
}

func TestListTopics(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
Expand Down

0 comments on commit 1a11675

Please sign in to comment.