Skip to content

Commit

Permalink
bq sub for pubsub (#6346) (#12216)
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician authored Aug 1, 2022
1 parent b3b3f0e commit 377d688
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/6346.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: add `bigquery_config` to `google_pubsub_subscription`
```
146 changes: 146 additions & 0 deletions google/resource_pubsub_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,41 @@ for the call to the push endpoint.
If the subscriber never acknowledges the message, the Pub/Sub system
will eventually redeliver the message.`,
},
"bigquery_config": {
Type: schema.TypeList,
Optional: true,
Description: `If delivery to BigQuery is used with this subscription, this field is used to configure it.
Either pushConfig or bigQueryConfig can be set, but not both.
If both are empty, then the subscriber will pull and ack messages using API methods.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"table": {
Type: schema.TypeString,
Required: true,
Description: `The name of the table to which to write data, of the form {projectId}.{datasetId}.{tableId}`,
},
"drop_unknown_fields": {
Type: schema.TypeBool,
Optional: true,
Description: `When true and useTopicSchema is true, any fields that are a part of the topic schema that are not part of the BigQuery table schema are dropped when writing to BigQuery.
Otherwise, the schemas must be kept in sync and any messages with extra fields are not written and remain in the subscription's backlog.`,
},
"use_topic_schema": {
Type: schema.TypeBool,
Optional: true,
Description: `When true, use the topic's schema as the columns to write to in BigQuery, if it exists.`,
},
"write_metadata": {
Type: schema.TypeBool,
Optional: true,
Description: `When true, write the subscription name, messageId, publishTime, attributes, and orderingKey to additional columns in the table.
The subscription name, messageId, and publishTime fields are put in their own columns while all other message properties (other than data) are written to a JSON object in the attributes column.`,
},
},
},
ConflictsWith: []string{"push_config"},
},
"dead_letter_policy": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -293,6 +328,7 @@ Note: if not specified, the Push endpoint URL will be used.`,
},
},
},
ConflictsWith: []string{"bigquery_config"},
},
"retain_acked_messages": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -368,6 +404,12 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("labels"); !isEmptyValue(reflect.ValueOf(labelsProp)) && (ok || !reflect.DeepEqual(v, labelsProp)) {
obj["labels"] = labelsProp
}
bigqueryConfigProp, err := expandPubsubSubscriptionBigqueryConfig(d.Get("bigquery_config"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("bigquery_config"); !isEmptyValue(reflect.ValueOf(bigqueryConfigProp)) && (ok || !reflect.DeepEqual(v, bigqueryConfigProp)) {
obj["bigqueryConfig"] = bigqueryConfigProp
}
pushConfigProp, err := expandPubsubSubscriptionPushConfig(d.Get("push_config"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -553,6 +595,9 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er
if err := d.Set("labels", flattenPubsubSubscriptionLabels(res["labels"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("bigquery_config", flattenPubsubSubscriptionBigqueryConfig(res["bigqueryConfig"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("push_config", flattenPubsubSubscriptionPushConfig(res["pushConfig"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
Expand Down Expand Up @@ -609,6 +654,12 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("labels"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, labelsProp)) {
obj["labels"] = labelsProp
}
bigqueryConfigProp, err := expandPubsubSubscriptionBigqueryConfig(d.Get("bigquery_config"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("bigquery_config"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, bigqueryConfigProp)) {
obj["bigqueryConfig"] = bigqueryConfigProp
}
pushConfigProp, err := expandPubsubSubscriptionPushConfig(d.Get("push_config"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -669,6 +720,10 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
updateMask = append(updateMask, "labels")
}

if d.HasChange("bigquery_config") {
updateMask = append(updateMask, "bigqueryConfig")
}

if d.HasChange("push_config") {
updateMask = append(updateMask, "pushConfig")
}
Expand Down Expand Up @@ -794,6 +849,41 @@ func flattenPubsubSubscriptionLabels(v interface{}, d *schema.ResourceData, conf
return v
}

func flattenPubsubSubscriptionBigqueryConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["table"] =
flattenPubsubSubscriptionBigqueryConfigTable(original["table"], d, config)
transformed["use_topic_schema"] =
flattenPubsubSubscriptionBigqueryConfigUseTopicSchema(original["useTopicSchema"], d, config)
transformed["write_metadata"] =
flattenPubsubSubscriptionBigqueryConfigWriteMetadata(original["writeMetadata"], d, config)
transformed["drop_unknown_fields"] =
flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(original["dropUnknownFields"], d, config)
return []interface{}{transformed}
}
func flattenPubsubSubscriptionBigqueryConfigTable(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionBigqueryConfigUseTopicSchema(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionBigqueryConfigWriteMetadata(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionPushConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
Expand Down Expand Up @@ -989,6 +1079,62 @@ func expandPubsubSubscriptionLabels(v interface{}, d TerraformResourceData, conf
return m, nil
}

func expandPubsubSubscriptionBigqueryConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedTable, err := expandPubsubSubscriptionBigqueryConfigTable(original["table"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedTable); val.IsValid() && !isEmptyValue(val) {
transformed["table"] = transformedTable
}

transformedUseTopicSchema, err := expandPubsubSubscriptionBigqueryConfigUseTopicSchema(original["use_topic_schema"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedUseTopicSchema); val.IsValid() && !isEmptyValue(val) {
transformed["useTopicSchema"] = transformedUseTopicSchema
}

transformedWriteMetadata, err := expandPubsubSubscriptionBigqueryConfigWriteMetadata(original["write_metadata"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedWriteMetadata); val.IsValid() && !isEmptyValue(val) {
transformed["writeMetadata"] = transformedWriteMetadata
}

transformedDropUnknownFields, err := expandPubsubSubscriptionBigqueryConfigDropUnknownFields(original["drop_unknown_fields"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedDropUnknownFields); val.IsValid() && !isEmptyValue(val) {
transformed["dropUnknownFields"] = transformedDropUnknownFields
}

return transformed, nil
}

func expandPubsubSubscriptionBigqueryConfigTable(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigUseTopicSchema(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigWriteMetadata(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionPushConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
Expand Down
80 changes: 80 additions & 0 deletions google/resource_pubsub_subscription_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,86 @@ resource "google_pubsub_subscription" "example" {
`, context)
}

func TestAccPubsubSubscription_pubsubSubscriptionPushBqExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": randString(t, 10),
}

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscription_pubsubSubscriptionPushBqExample(context),
},
{
ResourceName: "google_pubsub_subscription.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"topic"},
},
},
})
}

func testAccPubsubSubscription_pubsubSubscriptionPushBqExample(context map[string]interface{}) string {
return Nprintf(`
resource "google_pubsub_topic" "example" {
name = "tf-test-example-topic%{random_suffix}"
}
resource "google_pubsub_subscription" "example" {
name = "tf-test-example-subscription%{random_suffix}"
topic = google_pubsub_topic.example.name
bigquery_config {
table = "${google_bigquery_table.test.project}.${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}"
}
depends_on = [google_project_iam_member.viewer, google_project_iam_member.editor]
}
data "google_project" "project" {
}
resource "google_project_iam_member" "viewer" {
project = data.google_project.project.project_id
role = "roles/bigquery.metadataViewer"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}
resource "google_project_iam_member" "editor" {
project = data.google_project.project.project_id
role = "roles/bigquery.dataEditor"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}
resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_example_dataset%{random_suffix}"
}
resource "google_bigquery_table" "test" {
deletion_protection = false
table_id = "tf_test_example_table%{random_suffix}"
dataset_id = google_bigquery_dataset.test.dataset_id
schema = <<EOF
[
{
"name": "data",
"type": "STRING",
"mode": "NULLABLE",
"description": "The data"
}
]
EOF
}
`, context)
}

func testAccCheckPubsubSubscriptionDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for name, rs := range s.RootModule().Resources {
Expand Down
Loading

0 comments on commit 377d688

Please sign in to comment.