From e65bbb8f3c41ed052f6642f2ff7904d8f22a9db0 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 May 2023 22:54:46 +0000 Subject: [PATCH 1/6] feat(pubsub): add support for cloud storage subscriptions --- pubsub/pstest/fake.go | 11 +++ pubsub/pstest/fake_test.go | 16 ++++ pubsub/subscription.go | 176 ++++++++++++++++++++++++++++++++++-- pubsub/subscription_test.go | 54 +++++++++++ 4 files changed, 248 insertions(+), 9 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index f34b46ddbb64..17790f654811 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -505,6 +505,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p } else if ps.BigqueryConfig.Table != "" { ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE } + if ps.CloudStorageConfig == nil { + ps.CloudStorageConfig = &pb.CloudStorageConfig{} + } else if ps.CloudStorageConfig.Bucket != "" { + ps.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE + } ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration var deadLetterTopic *topic if ps.DeadLetterPolicy != nil { @@ -611,6 +616,12 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti sub.proto.GetBigqueryConfig().State = pb.BigQueryConfig_ACTIVE } + case "cloud_storage_config": + sub.proto.CloudStorageConfig = req.GetSubscription().GetCloudStorageConfig() + if sub.proto.GetCloudStorageConfig().GetBucket() != "" { + sub.proto.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE + } + case "ack_deadline_seconds": a := req.Subscription.AckDeadlineSeconds if err := checkAckDeadline(a); err != nil { diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 871dfd0a0625..d4f278f043c0 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1530,6 +1530,7 @@ func TestStreaming_SubscriptionProperties(t *testing.T) { } } +// Test switching between the various subscription types: push to endpoint, bigquery, cloud storage, and pull. func TestSubscriptionPushPull(t *testing.T) { ctx := context.Background() pclient, sclient, _, cleanup := newFake(ctx, t) @@ -1586,6 +1587,21 @@ func TestSubscriptionPushPull(t *testing.T) { if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" { t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff) } + + // Update the subscription to write to Cloud Storage. + csc := &pb.CloudStorageConfig{ + Bucket: "fake-bucket", + } + updateSub.CloudStorageConfig = csc + got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ + Subscription: updateSub, + UpdateMask: &field_mask.FieldMask{Paths: []string{"cloud_storage_config"}}, + }) + want2 := csc + want2.State = pb.CloudStorageConfig_ACTIVE + if diff := testutil.Diff(got.CloudStorageConfig, want2); diff != "" { + t.Errorf("sub.CloudStorageConfig mismatch: %s", diff) + } } func TestSubscriptionMessageOrdering(t *testing.T) { diff --git a/pubsub/subscription.go b/pubsub/subscription.go index b48daab46870..b2b60018d9d7 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -33,6 +33,7 @@ import ( fmpb "google.golang.org/genproto/protobuf/field_mask" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" durpb "google.golang.org/protobuf/types/known/durationpb" vkit "cloud.google.com/go/pubsub/apiv1" @@ -275,6 +276,111 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { return pbCfg } +// CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription. +type CloudStorageConfigState int + +const ( + // CloudStorageConfigStateUnspecified is the default value. This value is unused. + CloudStorageConfigStateUnspecified = iota + + // CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage. + CloudStorageConfigActive + + // CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors. + CloudStorageConfigPermissionDenied + + // CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist. + CloudStorageConfigNotFound +) + +// Configuration options for how to write the message data to Cloud Storage. +type isCloudStorageOutputFormat interface { + isCloudStorageOutputFormat() +} + +// CloudStorageOutputFormat_TextConfig is the configuration for writing +// message data in text format. Message paylods will be written to files +// as raw text, separated by a newline. +type CloudStorageOutputFormat_TextConfig struct{} + +// Configuration for writing message data in Avro format. +// Message payloads and metadata will be written to the files as an Avro binary. +type CloudStorageOutputFormat_AvroConfig struct { + // When true, write the subscription name, message_id, publish_time, + // attributes, and ordering_key as additional fields in the output. + WriteMetadata bool +} + +func (*CloudStorageOutputFormat_TextConfig) isCloudStorageOutputFormat() {} + +func (*CloudStorageOutputFormat_AvroConfig) isCloudStorageOutputFormat() {} + +// CloudStorageConfig configures the subscription to deliver to Cloud Storage. +type CloudStorageConfig struct { + // User-provided name for the Cloud Storage bucket. + // The bucket must be created by the user. The bucket name must be without + // any prefix like "gs://". See the [bucket naming + // requirements] (https://cloud.google.com/storage/docs/buckets#naming). + Bucket string + + // User-provided prefix for Cloud Storage filename. See the [object naming + // requirements](https://cloud.google.com/storage/docs/objects#naming). + FilenamePrefix string + + // User-provided suffix for Cloud Storage filename. See the [object naming + // requirements](https://cloud.google.com/storage/docs/objects#naming). + FilenameSuffix string + + // Configuration for how to write message data. Options are + // CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig. + // Defaults to text format. + OutputFormat isCloudStorageOutputFormat + + // The maximum duration that can elapse before a new Cloud Storage file is + // created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed + // the subscription's acknowledgement deadline. + MaxDuration optional.Duration + + // The maximum bytes that can be written to a Cloud Storage file before a new + // file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded + // in cases where messages are larger than the limit. + MaxBytes int64 + + // Output only. An output-only field that indicates whether or not the + // subscription can receive messages. + State CloudStorageConfigState +} + +func (bc *CloudStorageConfig) toProto() *pb.CloudStorageConfig { + if bc == nil { + return nil + } + var dur *durationpb.Duration + if bc.MaxDuration != nil { + dur = durationpb.New(optional.ToDuration(bc.MaxDuration)) + } + pbCfg := &pb.CloudStorageConfig{ + Bucket: bc.Bucket, + FilenamePrefix: bc.FilenamePrefix, + FilenameSuffix: bc.FilenameSuffix, + MaxDuration: dur, + MaxBytes: bc.MaxBytes, + State: pb.CloudStorageConfig_State(bc.State), + } + if out := bc.OutputFormat; out != nil { + if _, ok := out.(*CloudStorageOutputFormat_TextConfig); ok { + pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{} + } else if cfg, ok := out.(*CloudStorageOutputFormat_AvroConfig); ok { + pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{ + AvroConfig: &pb.CloudStorageConfig_AvroConfig{ + WriteMetadata: cfg.WriteMetadata, + }, + } + } + } + return pbCfg +} + // SubscriptionState denotes the possible states for a Subscription. type SubscriptionState int @@ -291,7 +397,9 @@ const ( SubscriptionStateResourceError ) -// SubscriptionConfig describes the configuration of a subscription. +// SubscriptionConfig describes the configuration of a subscription. If none of +// PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will +// pull and ack messages using API methods. At most one of these fields may be set. type SubscriptionConfig struct { // The fully qualified identifier for the subscription, in the format "projects//subscriptions/" name string @@ -300,17 +408,23 @@ type SubscriptionConfig struct { Topic *Topic // If push delivery is used with this subscription, this field is - // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, - // but not both. If both are empty, then the subscriber will pull and ack - // messages using API methods. + // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, + // and `CloudStorageConfig` can be set. If all are empty, then the + // subscriber will pull and ack messages using API methods. PushConfig PushConfig // If delivery to BigQuery is used with this subscription, this field is - // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, - // but not both. If both are empty, then the subscriber will pull and ack - // messages using API methods. + // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, + // and `CloudStorageConfig` can be set. If all are empty, then the + // subscriber will pull and ack messages using API methods. BigQueryConfig BigQueryConfig + // If delivery to Cloud Storage is used with this subscription, this field is + // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, + // and `CloudStorageConfig` can be set. If all are empty, then the + // subscriber will pull and ack messages using API methods. + CloudStorageConfig CloudStorageConfig + // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this @@ -437,6 +551,10 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { if cfg.BigQueryConfig.Table != "" { pbBigQueryConfig = cfg.BigQueryConfig.toProto() } + var pbCloudStorageConfig *pb.CloudStorageConfig + if cfg.CloudStorageConfig.Bucket != "" { + pbCloudStorageConfig = cfg.CloudStorageConfig.toProto() + } var retentionDuration *durpb.Duration if cfg.RetentionDuration != 0 { retentionDuration = durpb.New(cfg.RetentionDuration) @@ -454,6 +572,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { Topic: cfg.Topic.name, PushConfig: pbPushConfig, BigqueryConfig: pbBigQueryConfig, + CloudStorageConfig: pbCloudStorageConfig, AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), RetainAckedMessages: cfg.RetainAckedMessages, MessageRetentionDuration: retentionDuration, @@ -502,6 +621,9 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil { subC.BigQueryConfig = *bq } + if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil { + subC.CloudStorageConfig = *cs + } return subC, nil } @@ -538,6 +660,31 @@ func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig { return bq } +func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig { + if pbCSC == nil { + return nil + } + + csc := &CloudStorageConfig{ + Bucket: pbCSC.GetBucket(), + FilenamePrefix: pbCSC.GetFilenamePrefix(), + FilenameSuffix: pbCSC.GetFilenameSuffix(), + MaxBytes: pbCSC.GetMaxBytes(), + State: CloudStorageConfigState(pbCSC.GetState()), + } + if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 { + csc.MaxDuration = dur + } + if out := pbCSC.OutputFormat; out != nil { + if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok { + csc.OutputFormat = &CloudStorageOutputFormat_TextConfig{} + } else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok { + csc.OutputFormat = &CloudStorageOutputFormat_AvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()} + } + } + return csc +} + // DeadLetterPolicy specifies the conditions for dead lettering messages in // a subscription. type DeadLetterPolicy struct { @@ -781,14 +928,21 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { // SubscriptionConfigToUpdate describes how to update a subscription. type SubscriptionConfigToUpdate struct { - // If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig. + // If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig + // can be set. // If currently in push mode, set this value to the zero value to revert to a Pull based subscription. PushConfig *PushConfig - // If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig. + // If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig + // can be set. // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, BigQueryConfig *BigQueryConfig + // If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig + // can be set. + // If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription, + CloudStorageConfig *CloudStorageConfig + // If non-zero, the ack deadline is changed. AckDeadline time.Duration @@ -850,6 +1004,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update psub.BigqueryConfig = cfg.BigQueryConfig.toProto() paths = append(paths, "bigquery_config") } + if cfg.CloudStorageConfig != nil { + psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() + paths = append(paths, "cloud_storage_config") + } if cfg.AckDeadline != 0 { psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) paths = append(paths, "ack_deadline_seconds") diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index ef11dcde51c5..c480a89c69e4 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -497,6 +497,60 @@ func TestBigQuerySubscription(t *testing.T) { } } +func TestCloudStorageSubscription(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + bucket := "fake-bucket" + csCfg := CloudStorageConfig{ + Bucket: bucket, + FilenamePrefix: "some-prefix", + FilenameSuffix: "some-suffix", + OutputFormat: &CloudStorageOutputFormat_AvroConfig{ + WriteMetadata: true, + }, + MaxDuration: 10 * time.Minute, + MaxBytes: 10e5, + } + + subConfig := SubscriptionConfig{ + Topic: topic, + CloudStorageConfig: csCfg, + } + csSub, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatal(err) + } + cfg, err := csSub.Config(ctx) + if err != nil { + t.Fatal(err) + } + + want := csCfg + want.State = CloudStorageConfigActive + if diff := testutil.Diff(cfg.CloudStorageConfig, want); diff != "" { + t.Fatalf("create cloud storage subscription mismatch: \n%s", diff) + } + + // Test resetting to a pull based subscription. + cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{ + CloudStorageConfig: &CloudStorageConfig{}, + }) + if err != nil { + t.Fatal(err) + } + got := cfg.CloudStorageConfig + want = CloudStorageConfig{} + if diff := testutil.Diff(got, want); diff != "" { + t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff) + } + +} + func TestExactlyOnceDelivery_AckSuccess(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) From cbe372656fe0a08137942d9d11f6b35882c4e421 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 26 May 2023 10:57:03 -0700 Subject: [PATCH 2/6] fix exported type style --- pubsub/subscription.go | 25 +++++++++++++------------ pubsub/subscription_test.go | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index b2b60018d9d7..7d78cbca1b42 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -298,22 +298,23 @@ type isCloudStorageOutputFormat interface { isCloudStorageOutputFormat() } -// CloudStorageOutputFormat_TextConfig is the configuration for writing -// message data in text format. Message paylods will be written to files +// CloudStorageOutputFormatTextConfig is the configuration for writing +// message data in text format. Message payloads will be written to files // as raw text, separated by a newline. -type CloudStorageOutputFormat_TextConfig struct{} +type CloudStorageOutputFormatTextConfig struct{} -// Configuration for writing message data in Avro format. -// Message payloads and metadata will be written to the files as an Avro binary. -type CloudStorageOutputFormat_AvroConfig struct { +// CloudStorageOutputFormatAvroConfig is the configuration for writing +// message data in Avro format. Message payloads and metadata will be written +// to the files as an Avro binary. +type CloudStorageOutputFormatAvroConfig struct { // When true, write the subscription name, message_id, publish_time, // attributes, and ordering_key as additional fields in the output. WriteMetadata bool } -func (*CloudStorageOutputFormat_TextConfig) isCloudStorageOutputFormat() {} +func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {} -func (*CloudStorageOutputFormat_AvroConfig) isCloudStorageOutputFormat() {} +func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {} // CloudStorageConfig configures the subscription to deliver to Cloud Storage. type CloudStorageConfig struct { @@ -368,9 +369,9 @@ func (bc *CloudStorageConfig) toProto() *pb.CloudStorageConfig { State: pb.CloudStorageConfig_State(bc.State), } if out := bc.OutputFormat; out != nil { - if _, ok := out.(*CloudStorageOutputFormat_TextConfig); ok { + if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok { pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{} - } else if cfg, ok := out.(*CloudStorageOutputFormat_AvroConfig); ok { + } else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok { pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{ AvroConfig: &pb.CloudStorageConfig_AvroConfig{ WriteMetadata: cfg.WriteMetadata, @@ -677,9 +678,9 @@ func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig { } if out := pbCSC.OutputFormat; out != nil { if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok { - csc.OutputFormat = &CloudStorageOutputFormat_TextConfig{} + csc.OutputFormat = &CloudStorageOutputFormatTextConfig{} } else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok { - csc.OutputFormat = &CloudStorageOutputFormat_AvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()} + csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()} } } return csc diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index c480a89c69e4..9180158d00b6 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -510,7 +510,7 @@ func TestCloudStorageSubscription(t *testing.T) { Bucket: bucket, FilenamePrefix: "some-prefix", FilenameSuffix: "some-suffix", - OutputFormat: &CloudStorageOutputFormat_AvroConfig{ + OutputFormat: &CloudStorageOutputFormatAvroConfig{ WriteMetadata: true, }, MaxDuration: 10 * time.Minute, From a6b3e8e11d7de4ec11880aeec2268b582b2daef3 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 1 Jun 2023 16:42:59 -0700 Subject: [PATCH 3/6] fix bad sentinel value in gcs and bq subs --- pubsub/pstest/fake.go | 16 ++++++++-------- pubsub/subscription.go | 8 +++++++- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 0e78ccab789c..cbe7632ddd19 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -499,14 +499,10 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p if ps.PushConfig == nil { ps.PushConfig = &pb.PushConfig{} } - if ps.BigqueryConfig == nil { - ps.BigqueryConfig = &pb.BigQueryConfig{} - } else if ps.BigqueryConfig.Table != "" { + if ps.BigqueryConfig != nil && ps.BigqueryConfig.Table != "" { ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE } - if ps.CloudStorageConfig == nil { - ps.CloudStorageConfig = &pb.CloudStorageConfig{} - } else if ps.CloudStorageConfig.Bucket != "" { + if ps.CloudStorageConfig != nil && ps.CloudStorageConfig.Bucket != "" { ps.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE } ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration @@ -611,13 +607,17 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "bigquery_config": sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() - if sub.proto.GetBigqueryConfig().GetTable() != "" { + // As long as the bq config is not nil, we assume it's valid + // without additional checks. + if sub.proto.GetBigqueryConfig() != nil { sub.proto.GetBigqueryConfig().State = pb.BigQueryConfig_ACTIVE } case "cloud_storage_config": sub.proto.CloudStorageConfig = req.GetSubscription().GetCloudStorageConfig() - if sub.proto.GetCloudStorageConfig().GetBucket() != "" { + // As long as the storage config is not nil, we assume it's valid + // without additional checks. + if sub.proto.GetCloudStorageConfig() != nil { sub.proto.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 7d78cbca1b42..54e6775d49a3 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -553,6 +553,8 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { pbBigQueryConfig = cfg.BigQueryConfig.toProto() } var pbCloudStorageConfig *pb.CloudStorageConfig + // Use the bucket as sentinel value here. If it is blank, + // that's equivalent to clearing the config and reverting to pull. if cfg.CloudStorageConfig.Bucket != "" { pbCloudStorageConfig = cfg.CloudStorageConfig.toProto() } @@ -1006,7 +1008,11 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update paths = append(paths, "bigquery_config") } if cfg.CloudStorageConfig != nil { - psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() + if cfg.CloudStorageConfig.Bucket == "" { + psub.CloudStorageConfig = nil + } else { + psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() + } paths = append(paths, "cloud_storage_config") } if cfg.AckDeadline != 0 { From 5d0a7c84f554088c188a5b20d42a150fc55f17c7 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 15 Jun 2023 13:54:02 -0700 Subject: [PATCH 4/6] address review comments --- pubsub/subscription.go | 43 ++++++++++++++++--------------------- pubsub/subscription_test.go | 17 +++++++++++++-- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index b289127d0b8a..ab84c7982ece 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -357,23 +357,28 @@ type CloudStorageConfig struct { State CloudStorageConfigState } -func (bc *CloudStorageConfig) toProto() *pb.CloudStorageConfig { - if bc == nil { +func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig { + if cs == nil { + return nil + } + // For the purposes of the live service, an empty/zero-valued config + // is treated the same as nil and clearing this setting. + if (CloudStorageConfig{}) == *cs { return nil } var dur *durationpb.Duration - if bc.MaxDuration != nil { - dur = durationpb.New(optional.ToDuration(bc.MaxDuration)) + if cs.MaxDuration != nil { + dur = durationpb.New(optional.ToDuration(cs.MaxDuration)) } pbCfg := &pb.CloudStorageConfig{ - Bucket: bc.Bucket, - FilenamePrefix: bc.FilenamePrefix, - FilenameSuffix: bc.FilenameSuffix, + Bucket: cs.Bucket, + FilenamePrefix: cs.FilenamePrefix, + FilenameSuffix: cs.FilenameSuffix, MaxDuration: dur, - MaxBytes: bc.MaxBytes, - State: pb.CloudStorageConfig_State(bc.State), + MaxBytes: cs.MaxBytes, + State: pb.CloudStorageConfig_State(cs.State), } - if out := bc.OutputFormat; out != nil { + if out := cs.OutputFormat; out != nil { if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok { pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{} } else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok { @@ -553,16 +558,8 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { pbPushConfig = cfg.PushConfig.toProto() } - var pbBigQueryConfig *pb.BigQueryConfig - if cfg.BigQueryConfig.Table != "" { - pbBigQueryConfig = cfg.BigQueryConfig.toProto() - } - var pbCloudStorageConfig *pb.CloudStorageConfig - // Use the bucket as sentinel value here. If it is blank, - // that's equivalent to clearing the config and reverting to pull. - if cfg.CloudStorageConfig.Bucket != "" { - pbCloudStorageConfig = cfg.CloudStorageConfig.toProto() - } + pbBigQueryConfig := cfg.BigQueryConfig.toProto() + pbCloudStorageConfig := cfg.CloudStorageConfig.toProto() var retentionDuration *durpb.Duration if cfg.RetentionDuration != 0 { retentionDuration = durpb.New(cfg.RetentionDuration) @@ -1013,11 +1010,7 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update paths = append(paths, "bigquery_config") } if cfg.CloudStorageConfig != nil { - if cfg.CloudStorageConfig.Bucket == "" { - psub.CloudStorageConfig = nil - } else { - psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() - } + psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() paths = append(paths, "cloud_storage_config") } if cfg.AckDeadline != 0 { diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 9180158d00b6..19d64457d6a5 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -536,6 +536,20 @@ func TestCloudStorageSubscription(t *testing.T) { t.Fatalf("create cloud storage subscription mismatch: \n%s", diff) } + csCfg.OutputFormat = &CloudStorageOutputFormatTextConfig{} + cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{ + CloudStorageConfig: &csCfg, + }) + if err != nil { + t.Fatal(err) + } + got := cfg.CloudStorageConfig + want = csCfg + want.State = CloudStorageConfigActive + if diff := testutil.Diff(got, want); diff != "" { + t.Fatalf("update cloud storage subscription mismatch: \n%s", diff) + } + // Test resetting to a pull based subscription. cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{ CloudStorageConfig: &CloudStorageConfig{}, @@ -543,12 +557,11 @@ func TestCloudStorageSubscription(t *testing.T) { if err != nil { t.Fatal(err) } - got := cfg.CloudStorageConfig + got = cfg.CloudStorageConfig want = CloudStorageConfig{} if diff := testutil.Diff(got, want); diff != "" { t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff) } - } func TestExactlyOnceDelivery_AckSuccess(t *testing.T) { From 725858e05c31d2c802795cba7bc9a72bf4a239c5 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 15 Jun 2023 14:01:21 -0700 Subject: [PATCH 5/6] revert change that erroneously deletes bq config check --- pubsub/pstest/fake.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index a424151804c8..f999af01c176 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -611,6 +611,7 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "bigquery_config": // If bq config is nil here, it will be cleared. // Otherwise, we'll consider the subscription active if any table is set. + sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() if sub.proto.GetBigqueryConfig() != nil { if sub.proto.GetBigqueryConfig().GetTable() != "" { sub.proto.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE From 40655f165c763613553bbf387433027e434253c4 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 23 Jun 2023 16:28:00 -0700 Subject: [PATCH 6/6] fix wording in one of comment --- pubsub/subscription.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index ab84c7982ece..4a9f09bd83bf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -420,19 +420,19 @@ type SubscriptionConfig struct { // If push delivery is used with this subscription, this field is // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, - // and `CloudStorageConfig` can be set. If all are empty, then the + // or `CloudStorageConfig` can be set. If all are empty, then the // subscriber will pull and ack messages using API methods. PushConfig PushConfig // If delivery to BigQuery is used with this subscription, this field is // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, - // and `CloudStorageConfig` can be set. If all are empty, then the + // or `CloudStorageConfig` can be set. If all are empty, then the // subscriber will pull and ack messages using API methods. BigQueryConfig BigQueryConfig // If delivery to Cloud Storage is used with this subscription, this field is // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, - // and `CloudStorageConfig` can be set. If all are empty, then the + // or `CloudStorageConfig` can be set. If all are empty, then the // subscriber will pull and ack messages using API methods. CloudStorageConfig CloudStorageConfig