From 54218e963bb5a6d47411a490985b54053825064f Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 23 Jun 2023 16:48:54 -0700 Subject: [PATCH] feat(pubsub): add support for cloud storage subscriptions (#7977) * feat(pubsub): add support for cloud storage subscriptions * fix exported type style * fix bad sentinel value in gcs and bq subs * address review comments * revert change that erroneously deletes bq config check * fix wording in one of comment --- pubsub/pstest/fake.go | 11 +++ pubsub/pstest/fake_test.go | 16 ++++ pubsub/subscription.go | 184 +++++++++++++++++++++++++++++++++--- pubsub/subscription_test.go | 67 +++++++++++++ 4 files changed, 265 insertions(+), 13 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index f4af55ca5627..f999af01c176 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -505,6 +505,9 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p if ps.GetBigqueryConfig() != nil && ps.GetBigqueryConfig().GetTable() != "" { ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE } + if ps.CloudStorageConfig != nil && ps.CloudStorageConfig.Bucket != "" { + ps.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE + } ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration var deadLetterTopic *topic if ps.DeadLetterPolicy != nil { @@ -617,6 +620,14 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti } } + case "cloud_storage_config": + sub.proto.CloudStorageConfig = req.GetSubscription().GetCloudStorageConfig() + // As long as the storage config is not nil, we assume it's valid + // without additional checks. + if sub.proto.GetCloudStorageConfig() != nil { + sub.proto.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE + } + case "ack_deadline_seconds": a := req.Subscription.AckDeadlineSeconds if err := checkAckDeadline(a); err != nil { diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index ff80c411b5df..1254ca0d14dd 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1530,6 +1530,7 @@ func TestStreaming_SubscriptionProperties(t *testing.T) { } } +// Test switching between the various subscription types: push to endpoint, bigquery, cloud storage, and pull. func TestSubscriptionPushPull(t *testing.T) { ctx := context.Background() pclient, sclient, _, cleanup := newFake(ctx, t) @@ -1586,6 +1587,21 @@ func TestSubscriptionPushPull(t *testing.T) { if got.BigqueryConfig != nil { t.Errorf("sub.BigqueryConfig should be nil, got %s", got.BigqueryConfig) } + + // Update the subscription to write to Cloud Storage. + csc := &pb.CloudStorageConfig{ + Bucket: "fake-bucket", + } + updateSub.CloudStorageConfig = csc + got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ + Subscription: updateSub, + UpdateMask: &field_mask.FieldMask{Paths: []string{"cloud_storage_config"}}, + }) + want2 := csc + want2.State = pb.CloudStorageConfig_ACTIVE + if diff := testutil.Diff(got.CloudStorageConfig, want2); diff != "" { + t.Errorf("sub.CloudStorageConfig mismatch: %s", diff) + } } func TestSubscriptionMessageOrdering(t *testing.T) { diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 5a97305ef877..4a9f09bd83bf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -32,6 +32,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" durpb "google.golang.org/protobuf/types/known/durationpb" fmpb "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -280,6 +281,117 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { return pbCfg } +// CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription. +type CloudStorageConfigState int + +const ( + // CloudStorageConfigStateUnspecified is the default value. This value is unused. + CloudStorageConfigStateUnspecified = iota + + // CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage. + CloudStorageConfigActive + + // CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors. + CloudStorageConfigPermissionDenied + + // CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist. + CloudStorageConfigNotFound +) + +// Configuration options for how to write the message data to Cloud Storage. +type isCloudStorageOutputFormat interface { + isCloudStorageOutputFormat() +} + +// CloudStorageOutputFormatTextConfig is the configuration for writing +// message data in text format. Message payloads will be written to files +// as raw text, separated by a newline. +type CloudStorageOutputFormatTextConfig struct{} + +// CloudStorageOutputFormatAvroConfig is the configuration for writing +// message data in Avro format. Message payloads and metadata will be written +// to the files as an Avro binary. +type CloudStorageOutputFormatAvroConfig struct { + // When true, write the subscription name, message_id, publish_time, + // attributes, and ordering_key as additional fields in the output. + WriteMetadata bool +} + +func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {} + +func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {} + +// CloudStorageConfig configures the subscription to deliver to Cloud Storage. +type CloudStorageConfig struct { + // User-provided name for the Cloud Storage bucket. + // The bucket must be created by the user. The bucket name must be without + // any prefix like "gs://". See the [bucket naming + // requirements] (https://cloud.google.com/storage/docs/buckets#naming). + Bucket string + + // User-provided prefix for Cloud Storage filename. See the [object naming + // requirements](https://cloud.google.com/storage/docs/objects#naming). + FilenamePrefix string + + // User-provided suffix for Cloud Storage filename. See the [object naming + // requirements](https://cloud.google.com/storage/docs/objects#naming). + FilenameSuffix string + + // Configuration for how to write message data. Options are + // CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig. + // Defaults to text format. + OutputFormat isCloudStorageOutputFormat + + // The maximum duration that can elapse before a new Cloud Storage file is + // created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed + // the subscription's acknowledgement deadline. + MaxDuration optional.Duration + + // The maximum bytes that can be written to a Cloud Storage file before a new + // file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded + // in cases where messages are larger than the limit. + MaxBytes int64 + + // Output only. An output-only field that indicates whether or not the + // subscription can receive messages. + State CloudStorageConfigState +} + +func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig { + if cs == nil { + return nil + } + // For the purposes of the live service, an empty/zero-valued config + // is treated the same as nil and clearing this setting. + if (CloudStorageConfig{}) == *cs { + return nil + } + var dur *durationpb.Duration + if cs.MaxDuration != nil { + dur = durationpb.New(optional.ToDuration(cs.MaxDuration)) + } + pbCfg := &pb.CloudStorageConfig{ + Bucket: cs.Bucket, + FilenamePrefix: cs.FilenamePrefix, + FilenameSuffix: cs.FilenameSuffix, + MaxDuration: dur, + MaxBytes: cs.MaxBytes, + State: pb.CloudStorageConfig_State(cs.State), + } + if out := cs.OutputFormat; out != nil { + if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok { + pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{} + } else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok { + pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{ + AvroConfig: &pb.CloudStorageConfig_AvroConfig{ + WriteMetadata: cfg.WriteMetadata, + }, + } + } + } + return pbCfg +} + // SubscriptionState denotes the possible states for a Subscription. type SubscriptionState int @@ -296,7 +408,9 @@ const ( SubscriptionStateResourceError ) -// SubscriptionConfig describes the configuration of a subscription. +// SubscriptionConfig describes the configuration of a subscription. If none of +// PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will +// pull and ack messages using API methods. At most one of these fields may be set. type SubscriptionConfig struct { // The fully qualified identifier for the subscription, in the format "projects//subscriptions/" name string @@ -305,17 +419,23 @@ type SubscriptionConfig struct { Topic *Topic // If push delivery is used with this subscription, this field is - // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, - // but not both. If both are empty, then the subscriber will pull and ack - // messages using API methods. + // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, + // or `CloudStorageConfig` can be set. If all are empty, then the + // subscriber will pull and ack messages using API methods. PushConfig PushConfig // If delivery to BigQuery is used with this subscription, this field is - // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, - // but not both. If both are empty, then the subscriber will pull and ack - // messages using API methods. + // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, + // or `CloudStorageConfig` can be set. If all are empty, then the + // subscriber will pull and ack messages using API methods. BigQueryConfig BigQueryConfig + // If delivery to Cloud Storage is used with this subscription, this field is + // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, + // or `CloudStorageConfig` can be set. If all are empty, then the + // subscriber will pull and ack messages using API methods. + CloudStorageConfig CloudStorageConfig + // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this @@ -438,10 +558,8 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { pbPushConfig = cfg.PushConfig.toProto() } - var pbBigQueryConfig *pb.BigQueryConfig - if cfg.BigQueryConfig.Table != "" { - pbBigQueryConfig = cfg.BigQueryConfig.toProto() - } + pbBigQueryConfig := cfg.BigQueryConfig.toProto() + pbCloudStorageConfig := cfg.CloudStorageConfig.toProto() var retentionDuration *durpb.Duration if cfg.RetentionDuration != 0 { retentionDuration = durpb.New(cfg.RetentionDuration) @@ -459,6 +577,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { Topic: cfg.Topic.name, PushConfig: pbPushConfig, BigqueryConfig: pbBigQueryConfig, + CloudStorageConfig: pbCloudStorageConfig, AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), RetainAckedMessages: cfg.RetainAckedMessages, MessageRetentionDuration: retentionDuration, @@ -507,6 +626,9 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil { subC.BigQueryConfig = *bq } + if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil { + subC.CloudStorageConfig = *cs + } return subC, nil } @@ -543,6 +665,31 @@ func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig { return bq } +func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig { + if pbCSC == nil { + return nil + } + + csc := &CloudStorageConfig{ + Bucket: pbCSC.GetBucket(), + FilenamePrefix: pbCSC.GetFilenamePrefix(), + FilenameSuffix: pbCSC.GetFilenameSuffix(), + MaxBytes: pbCSC.GetMaxBytes(), + State: CloudStorageConfigState(pbCSC.GetState()), + } + if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 { + csc.MaxDuration = dur + } + if out := pbCSC.OutputFormat; out != nil { + if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok { + csc.OutputFormat = &CloudStorageOutputFormatTextConfig{} + } else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok { + csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()} + } + } + return csc +} + // DeadLetterPolicy specifies the conditions for dead lettering messages in // a subscription. type DeadLetterPolicy struct { @@ -786,14 +933,21 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { // SubscriptionConfigToUpdate describes how to update a subscription. type SubscriptionConfigToUpdate struct { - // If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig. + // If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig + // can be set. // If currently in push mode, set this value to the zero value to revert to a Pull based subscription. PushConfig *PushConfig - // If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig. + // If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig + // can be set. // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, BigQueryConfig *BigQueryConfig + // If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig + // can be set. + // If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription, + CloudStorageConfig *CloudStorageConfig + // If non-zero, the ack deadline is changed. AckDeadline time.Duration @@ -855,6 +1009,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update psub.BigqueryConfig = cfg.BigQueryConfig.toProto() paths = append(paths, "bigquery_config") } + if cfg.CloudStorageConfig != nil { + psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() + paths = append(paths, "cloud_storage_config") + } if cfg.AckDeadline != 0 { psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) paths = append(paths, "ack_deadline_seconds") diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index ef11dcde51c5..19d64457d6a5 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -497,6 +497,73 @@ func TestBigQuerySubscription(t *testing.T) { } } +func TestCloudStorageSubscription(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + bucket := "fake-bucket" + csCfg := CloudStorageConfig{ + Bucket: bucket, + FilenamePrefix: "some-prefix", + FilenameSuffix: "some-suffix", + OutputFormat: &CloudStorageOutputFormatAvroConfig{ + WriteMetadata: true, + }, + MaxDuration: 10 * time.Minute, + MaxBytes: 10e5, + } + + subConfig := SubscriptionConfig{ + Topic: topic, + CloudStorageConfig: csCfg, + } + csSub, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatal(err) + } + cfg, err := csSub.Config(ctx) + if err != nil { + t.Fatal(err) + } + + want := csCfg + want.State = CloudStorageConfigActive + if diff := testutil.Diff(cfg.CloudStorageConfig, want); diff != "" { + t.Fatalf("create cloud storage subscription mismatch: \n%s", diff) + } + + csCfg.OutputFormat = &CloudStorageOutputFormatTextConfig{} + cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{ + CloudStorageConfig: &csCfg, + }) + if err != nil { + t.Fatal(err) + } + got := cfg.CloudStorageConfig + want = csCfg + want.State = CloudStorageConfigActive + if diff := testutil.Diff(got, want); diff != "" { + t.Fatalf("update cloud storage subscription mismatch: \n%s", diff) + } + + // Test resetting to a pull based subscription. + cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{ + CloudStorageConfig: &CloudStorageConfig{}, + }) + if err != nil { + t.Fatal(err) + } + got = cfg.CloudStorageConfig + want = CloudStorageConfig{} + if diff := testutil.Diff(got, want); diff != "" { + t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff) + } +} + func TestExactlyOnceDelivery_AckSuccess(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background())