diff --git a/.changelog/6346.txt b/.changelog/6346.txt new file mode 100644 index 00000000000..8080dcbed48 --- /dev/null +++ b/.changelog/6346.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +pubsub: add `bigquery_config` to `google_pubsub_subscription` +``` diff --git a/google/resource_pubsub_subscription.go b/google/resource_pubsub_subscription.go index 9e096c4f09d..af594eda8eb 100644 --- a/google/resource_pubsub_subscription.go +++ b/google/resource_pubsub_subscription.go @@ -91,6 +91,41 @@ for the call to the push endpoint. If the subscriber never acknowledges the message, the Pub/Sub system will eventually redeliver the message.`, }, + "bigquery_config": { + Type: schema.TypeList, + Optional: true, + Description: `If delivery to BigQuery is used with this subscription, this field is used to configure it. +Either pushConfig or bigQueryConfig can be set, but not both. +If both are empty, then the subscriber will pull and ack messages using API methods.`, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "table": { + Type: schema.TypeString, + Required: true, + Description: `The name of the table to which to write data, of the form {projectId}.{datasetId}.{tableId}`, + }, + "drop_unknown_fields": { + Type: schema.TypeBool, + Optional: true, + Description: `When true and useTopicSchema is true, any fields that are a part of the topic schema that are not part of the BigQuery table schema are dropped when writing to BigQuery. +Otherwise, the schemas must be kept in sync and any messages with extra fields are not written and remain in the subscription's backlog.`, + }, + "use_topic_schema": { + Type: schema.TypeBool, + Optional: true, + Description: `When true, use the topic's schema as the columns to write to in BigQuery, if it exists.`, + }, + "write_metadata": { + Type: schema.TypeBool, + Optional: true, + Description: `When true, write the subscription name, messageId, publishTime, attributes, and orderingKey to additional columns in the table. +The subscription name, messageId, and publishTime fields are put in their own columns while all other message properties (other than data) are written to a JSON object in the attributes column.`, + }, + }, + }, + ConflictsWith: []string{"push_config"}, + }, "dead_letter_policy": { Type: schema.TypeList, Optional: true, @@ -293,6 +328,7 @@ Note: if not specified, the Push endpoint URL will be used.`, }, }, }, + ConflictsWith: []string{"bigquery_config"}, }, "retain_acked_messages": { Type: schema.TypeBool, @@ -368,6 +404,12 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{}) } else if v, ok := d.GetOkExists("labels"); !isEmptyValue(reflect.ValueOf(labelsProp)) && (ok || !reflect.DeepEqual(v, labelsProp)) { obj["labels"] = labelsProp } + bigqueryConfigProp, err := expandPubsubSubscriptionBigqueryConfig(d.Get("bigquery_config"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("bigquery_config"); !isEmptyValue(reflect.ValueOf(bigqueryConfigProp)) && (ok || !reflect.DeepEqual(v, bigqueryConfigProp)) { + obj["bigqueryConfig"] = bigqueryConfigProp + } pushConfigProp, err := expandPubsubSubscriptionPushConfig(d.Get("push_config"), d, config) if err != nil { return err @@ -553,6 +595,9 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er if err := d.Set("labels", flattenPubsubSubscriptionLabels(res["labels"], d, config)); err != nil { return fmt.Errorf("Error reading Subscription: %s", err) } + if err := d.Set("bigquery_config", flattenPubsubSubscriptionBigqueryConfig(res["bigqueryConfig"], d, config)); err != nil { + return fmt.Errorf("Error reading Subscription: %s", err) + } if err := d.Set("push_config", flattenPubsubSubscriptionPushConfig(res["pushConfig"], d, config)); err != nil { return fmt.Errorf("Error reading Subscription: %s", err) } @@ -609,6 +654,12 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{}) } else if v, ok := d.GetOkExists("labels"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, labelsProp)) { obj["labels"] = labelsProp } + bigqueryConfigProp, err := expandPubsubSubscriptionBigqueryConfig(d.Get("bigquery_config"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("bigquery_config"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, bigqueryConfigProp)) { + obj["bigqueryConfig"] = bigqueryConfigProp + } pushConfigProp, err := expandPubsubSubscriptionPushConfig(d.Get("push_config"), d, config) if err != nil { return err @@ -669,6 +720,10 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{}) updateMask = append(updateMask, "labels") } + if d.HasChange("bigquery_config") { + updateMask = append(updateMask, "bigqueryConfig") + } + if d.HasChange("push_config") { updateMask = append(updateMask, "pushConfig") } @@ -794,6 +849,41 @@ func flattenPubsubSubscriptionLabels(v interface{}, d *schema.ResourceData, conf return v } +func flattenPubsubSubscriptionBigqueryConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} { + if v == nil { + return nil + } + original := v.(map[string]interface{}) + if len(original) == 0 { + return nil + } + transformed := make(map[string]interface{}) + transformed["table"] = + flattenPubsubSubscriptionBigqueryConfigTable(original["table"], d, config) + transformed["use_topic_schema"] = + flattenPubsubSubscriptionBigqueryConfigUseTopicSchema(original["useTopicSchema"], d, config) + transformed["write_metadata"] = + flattenPubsubSubscriptionBigqueryConfigWriteMetadata(original["writeMetadata"], d, config) + transformed["drop_unknown_fields"] = + flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(original["dropUnknownFields"], d, config) + return []interface{}{transformed} +} +func flattenPubsubSubscriptionBigqueryConfigTable(v interface{}, d *schema.ResourceData, config *Config) interface{} { + return v +} + +func flattenPubsubSubscriptionBigqueryConfigUseTopicSchema(v interface{}, d *schema.ResourceData, config *Config) interface{} { + return v +} + +func flattenPubsubSubscriptionBigqueryConfigWriteMetadata(v interface{}, d *schema.ResourceData, config *Config) interface{} { + return v +} + +func flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d *schema.ResourceData, config *Config) interface{} { + return v +} + func flattenPubsubSubscriptionPushConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} { if v == nil { return nil @@ -989,6 +1079,62 @@ func expandPubsubSubscriptionLabels(v interface{}, d TerraformResourceData, conf return m, nil } +func expandPubsubSubscriptionBigqueryConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + l := v.([]interface{}) + if len(l) == 0 || l[0] == nil { + return nil, nil + } + raw := l[0] + original := raw.(map[string]interface{}) + transformed := make(map[string]interface{}) + + transformedTable, err := expandPubsubSubscriptionBigqueryConfigTable(original["table"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedTable); val.IsValid() && !isEmptyValue(val) { + transformed["table"] = transformedTable + } + + transformedUseTopicSchema, err := expandPubsubSubscriptionBigqueryConfigUseTopicSchema(original["use_topic_schema"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedUseTopicSchema); val.IsValid() && !isEmptyValue(val) { + transformed["useTopicSchema"] = transformedUseTopicSchema + } + + transformedWriteMetadata, err := expandPubsubSubscriptionBigqueryConfigWriteMetadata(original["write_metadata"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedWriteMetadata); val.IsValid() && !isEmptyValue(val) { + transformed["writeMetadata"] = transformedWriteMetadata + } + + transformedDropUnknownFields, err := expandPubsubSubscriptionBigqueryConfigDropUnknownFields(original["drop_unknown_fields"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedDropUnknownFields); val.IsValid() && !isEmptyValue(val) { + transformed["dropUnknownFields"] = transformedDropUnknownFields + } + + return transformed, nil +} + +func expandPubsubSubscriptionBigqueryConfigTable(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + return v, nil +} + +func expandPubsubSubscriptionBigqueryConfigUseTopicSchema(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + return v, nil +} + +func expandPubsubSubscriptionBigqueryConfigWriteMetadata(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + return v, nil +} + +func expandPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + return v, nil +} + func expandPubsubSubscriptionPushConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { l := v.([]interface{}) if len(l) == 0 || l[0] == nil { diff --git a/google/resource_pubsub_subscription_generated_test.go b/google/resource_pubsub_subscription_generated_test.go index 7d56755ab26..527c817507d 100644 --- a/google/resource_pubsub_subscription_generated_test.go +++ b/google/resource_pubsub_subscription_generated_test.go @@ -179,6 +179,86 @@ resource "google_pubsub_subscription" "example" { `, context) } +func TestAccPubsubSubscription_pubsubSubscriptionPushBqExample(t *testing.T) { + t.Parallel() + + context := map[string]interface{}{ + "random_suffix": randString(t, 10), + } + + vcrTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccPubsubSubscription_pubsubSubscriptionPushBqExample(context), + }, + { + ResourceName: "google_pubsub_subscription.example", + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"topic"}, + }, + }, + }) +} + +func testAccPubsubSubscription_pubsubSubscriptionPushBqExample(context map[string]interface{}) string { + return Nprintf(` +resource "google_pubsub_topic" "example" { + name = "tf-test-example-topic%{random_suffix}" +} + +resource "google_pubsub_subscription" "example" { + name = "tf-test-example-subscription%{random_suffix}" + topic = google_pubsub_topic.example.name + + bigquery_config { + table = "${google_bigquery_table.test.project}.${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}" + } + + depends_on = [google_project_iam_member.viewer, google_project_iam_member.editor] +} + +data "google_project" "project" { +} + +resource "google_project_iam_member" "viewer" { + project = data.google_project.project.project_id + role = "roles/bigquery.metadataViewer" + member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com" +} + +resource "google_project_iam_member" "editor" { + project = data.google_project.project.project_id + role = "roles/bigquery.dataEditor" + member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com" +} + +resource "google_bigquery_dataset" "test" { + dataset_id = "tf_test_example_dataset%{random_suffix}" +} + +resource "google_bigquery_table" "test" { + deletion_protection = false + table_id = "tf_test_example_table%{random_suffix}" + dataset_id = google_bigquery_dataset.test.dataset_id + + schema = < + + Open in Cloud Shell + + +## Example Usage - Pubsub Subscription Push Bq + + +```hcl +resource "google_pubsub_topic" "example" { + name = "example-topic" +} + +resource "google_pubsub_subscription" "example" { + name = "example-subscription" + topic = google_pubsub_topic.example.name + + bigquery_config { + table = "${google_bigquery_table.test.project}.${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}" + } + + depends_on = [google_project_iam_member.viewer, google_project_iam_member.editor] +} + +data "google_project" "project" { +} + +resource "google_project_iam_member" "viewer" { + project = data.google_project.project.project_id + role = "roles/bigquery.metadataViewer" + member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com" +} + +resource "google_project_iam_member" "editor" { + project = data.google_project.project.project_id + role = "roles/bigquery.dataEditor" + member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com" +} + +resource "google_bigquery_dataset" "test" { + dataset_id = "example_dataset" +} + +resource "google_bigquery_table" "test" { + deletion_protection = false + table_id = "example_table" + dataset_id = google_bigquery_dataset.test.dataset_id + + schema = <The `bigquery_config` block supports: + +* `table` - + (Required) + The name of the table to which to write data, of the form {projectId}.{datasetId}.{tableId} + +* `use_topic_schema` - + (Optional) + When true, use the topic's schema as the columns to write to in BigQuery, if it exists. + +* `write_metadata` - + (Optional) + When true, write the subscription name, messageId, publishTime, attributes, and orderingKey to additional columns in the table. + The subscription name, messageId, and publishTime fields are put in their own columns while all other message properties (other than data) are written to a JSON object in the attributes column. + +* `drop_unknown_fields` - + (Optional) + When true and useTopicSchema is true, any fields that are a part of the topic schema that are not part of the BigQuery table schema are dropped when writing to BigQuery. + Otherwise, the schemas must be kept in sync and any messages with extra fields are not written and remain in the subscription's backlog. + The `push_config` block supports: * `oidc_token` -