Skip to content

Commit

Permalink
Add bigquery subscription type support
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Danyliuk <viktor.danyliuk@databand.ai>
  • Loading branch information
POD666 committed Oct 11, 2022
1 parent d85e3cc commit e03ae44
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 1 deletion.
24 changes: 24 additions & 0 deletions apis/pubsub/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type SubscriptionParameters struct {
// +optional
PushConfig *PushConfig `json:"pushConfig,omitempty"`

// BigQueryConfig is a parameter which configures bigquery delivery.
// +optional
BigQueryConfig *BigQueryConfig `json:"bigQueryConfig,omitempty"`

// RetainAckedMessages is a message which indicates whether to retain acknowledged
// messages. If true, then messages are not expunged from the
// subscription's backlog, even if they are acknowledged, until they
Expand Down Expand Up @@ -143,6 +147,26 @@ type PushConfig struct {
PushEndpoint string `json:"pushEndpoint,omitempty"`
}

// BigQueryConfig contains configuration for a bigquery delivery endpoint.
type BigQueryConfig struct {
// Bigquery table to deliver messages to.
Table string `json:"table,omitempty"`

// When enabled, the topic schema will be used when writing to BigQuery. Else,
// tes the message bytes to a column called data in BigQuery.
UseTopicSchema bool `json:"useTopicSchema,omitempty"`

// When enabled, the metadata of each message is written to additional columns in
// the BigQuery table. Else, the metadata is not written to the BigQuery table.
// https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata
WriteMetadata bool `json:"writeMetadata,omitempty"`

// When enabled along with the "Use topic schema" option, any field that is present in
// the topic schema but not in the BigQuery schema will be dropped. Else, messages with extra fields are not written
// and remain in the subscription backlog.
DropUnknownFields bool `json:"dropUnknownFields,omitempty"`
}

// OidcToken contains information needed for generating an OpenID Connect token
type OidcToken struct {
// Audience is the "audience" to be used when generating OIDC token.
Expand Down
20 changes: 20 additions & 0 deletions apis/pubsub/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,31 @@ spec:
value of 10 seconds is used.
format: int64
type: integer
bigQueryConfig:
description: BigQueryConfig is a parameter which configures bigquery
delivery.
properties:
dropUnknownFields:
description: When enabled along with the "Use topic schema"
option, any field that is present in the topic schema but
not in the BigQuery schema will be dropped. Else, messages
with extra fields are not written and remain in the subscription
backlog.
type: boolean
table:
description: Bigquery table to deliver messages to.
type: string
useTopicSchema:
description: When enabled, the topic schema will be used when
writing to BigQuery. Else, tes the message bytes to a column
called data in BigQuery.
type: boolean
writeMetadata:
description: When enabled, the metadata of each message is
written to additional columns in the BigQuery table. Else,
the metadata is not written to the BigQuery table. https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata
type: boolean
type: object
deadLetterPolicy:
description: DeadLetterPolicy is the policy that specifies the
conditions for dead lettering messages in this subscription.
Expand Down
29 changes: 28 additions & 1 deletion pkg/clients/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func GenerateSubscription(projectID, name string, p v1alpha1.SubscriptionParamet
setDeadLetterPolicy(projectID, p, s)
setExpirationPolicy(p, s)
setPushConfig(p, s)
setBigQueryConfig(p, s)
setRetryPolicy(p, s)

return s
Expand Down Expand Up @@ -85,6 +86,18 @@ func setPushConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) {
}
}

// setBigQueryConfig sets BigQueryConfig of subscription based on SubscriptionParameters.
func setBigQueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) {
if p.BigQueryConfig != nil {
s.BigqueryConfig = &pubsub.BigQueryConfig{
Table: p.BigQueryConfig.Table,
UseTopicSchema: p.BigQueryConfig.UseTopicSchema,
WriteMetadata: p.BigQueryConfig.WriteMetadata,
DropUnknownFields: p.BigQueryConfig.DropUnknownFields,
}
}
}

// setExpirationPolicy sets ExpirationPolicy of subscription based on SubscriptionParameters.
func setExpirationPolicy(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) {
if p.ExpirationPolicy != nil {
Expand Down Expand Up @@ -169,6 +182,15 @@ func LateInitialize(p *v1alpha1.SubscriptionParameters, s pubsub.Subscription) {
}
}

if p.BigQueryConfig == nil && s.BigqueryConfig != nil {
p.BigQueryConfig = &v1alpha1.BigQueryConfig{
Table: s.BigqueryConfig.Table,
DropUnknownFields: s.BigqueryConfig.DropUnknownFields,
UseTopicSchema: s.BigqueryConfig.UseTopicSchema,
WriteMetadata: s.BigqueryConfig.WriteMetadata,
}
}

if p.RetryPolicy == nil && s.RetryPolicy != nil {
p.RetryPolicy = &v1alpha1.RetryPolicy{
MaximumBackoff: s.RetryPolicy.MaximumBackoff,
Expand All @@ -195,7 +217,7 @@ func IsUpToDate(projectID string, p v1alpha1.SubscriptionParameters, s pubsub.Su
// GenerateUpdateRequest produces an UpdateSubscriptionRequest with the difference
// between SubscriptionParameters and Subscription.
// enableMessageOrdering, deadLetterPolicy, topic are not mutable
func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest {
func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest { // nolint:gocyclo
observed := &v1alpha1.SubscriptionParameters{}
LateInitialize(observed, s)

Expand Down Expand Up @@ -245,6 +267,11 @@ func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pub
setPushConfig(p, us.Subscription)
}

if !cmp.Equal(p.BigQueryConfig, observed.BigQueryConfig) {
mask = append(mask, "bigQueryConfig")
setBigQueryConfig(p, us.Subscription)
}

if !cmp.Equal(p.RetryPolicy, observed.RetryPolicy) {
mask = append(mask, "retryPolicy")
setRetryPolicy(p, us.Subscription)
Expand Down
Loading

0 comments on commit e03ae44

Please sign in to comment.