Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bq sub for pubsub #12216

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/6346.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: add `bigquery_config` to `google_pubsub_subscription`
```
146 changes: 146 additions & 0 deletions google/resource_pubsub_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,41 @@ for the call to the push endpoint.
If the subscriber never acknowledges the message, the Pub/Sub system
will eventually redeliver the message.`,
},
"bigquery_config": {
Type: schema.TypeList,
Optional: true,
Description: `If delivery to BigQuery is used with this subscription, this field is used to configure it.
Either pushConfig or bigQueryConfig can be set, but not both.
If both are empty, then the subscriber will pull and ack messages using API methods.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"table": {
Type: schema.TypeString,
Required: true,
Description: `The name of the table to which to write data, of the form {projectId}.{datasetId}.{tableId}`,
},
"drop_unknown_fields": {
Type: schema.TypeBool,
Optional: true,
Description: `When true and useTopicSchema is true, any fields that are a part of the topic schema that are not part of the BigQuery table schema are dropped when writing to BigQuery.
Otherwise, the schemas must be kept in sync and any messages with extra fields are not written and remain in the subscription's backlog.`,
},
"use_topic_schema": {
Type: schema.TypeBool,
Optional: true,
Description: `When true, use the topic's schema as the columns to write to in BigQuery, if it exists.`,
},
"write_metadata": {
Type: schema.TypeBool,
Optional: true,
Description: `When true, write the subscription name, messageId, publishTime, attributes, and orderingKey to additional columns in the table.
The subscription name, messageId, and publishTime fields are put in their own columns while all other message properties (other than data) are written to a JSON object in the attributes column.`,
},
},
},
ConflictsWith: []string{"push_config"},
},
"dead_letter_policy": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -293,6 +328,7 @@ Note: if not specified, the Push endpoint URL will be used.`,
},
},
},
ConflictsWith: []string{"bigquery_config"},
},
"retain_acked_messages": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -368,6 +404,12 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("labels"); !isEmptyValue(reflect.ValueOf(labelsProp)) && (ok || !reflect.DeepEqual(v, labelsProp)) {
obj["labels"] = labelsProp
}
bigqueryConfigProp, err := expandPubsubSubscriptionBigqueryConfig(d.Get("bigquery_config"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("bigquery_config"); !isEmptyValue(reflect.ValueOf(bigqueryConfigProp)) && (ok || !reflect.DeepEqual(v, bigqueryConfigProp)) {
obj["bigqueryConfig"] = bigqueryConfigProp
}
pushConfigProp, err := expandPubsubSubscriptionPushConfig(d.Get("push_config"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -553,6 +595,9 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er
if err := d.Set("labels", flattenPubsubSubscriptionLabels(res["labels"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("bigquery_config", flattenPubsubSubscriptionBigqueryConfig(res["bigqueryConfig"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("push_config", flattenPubsubSubscriptionPushConfig(res["pushConfig"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
Expand Down Expand Up @@ -609,6 +654,12 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("labels"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, labelsProp)) {
obj["labels"] = labelsProp
}
bigqueryConfigProp, err := expandPubsubSubscriptionBigqueryConfig(d.Get("bigquery_config"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("bigquery_config"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, bigqueryConfigProp)) {
obj["bigqueryConfig"] = bigqueryConfigProp
}
pushConfigProp, err := expandPubsubSubscriptionPushConfig(d.Get("push_config"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -669,6 +720,10 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
updateMask = append(updateMask, "labels")
}

if d.HasChange("bigquery_config") {
updateMask = append(updateMask, "bigqueryConfig")
}

if d.HasChange("push_config") {
updateMask = append(updateMask, "pushConfig")
}
Expand Down Expand Up @@ -794,6 +849,41 @@ func flattenPubsubSubscriptionLabels(v interface{}, d *schema.ResourceData, conf
return v
}

func flattenPubsubSubscriptionBigqueryConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["table"] =
flattenPubsubSubscriptionBigqueryConfigTable(original["table"], d, config)
transformed["use_topic_schema"] =
flattenPubsubSubscriptionBigqueryConfigUseTopicSchema(original["useTopicSchema"], d, config)
transformed["write_metadata"] =
flattenPubsubSubscriptionBigqueryConfigWriteMetadata(original["writeMetadata"], d, config)
transformed["drop_unknown_fields"] =
flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(original["dropUnknownFields"], d, config)
return []interface{}{transformed}
}
func flattenPubsubSubscriptionBigqueryConfigTable(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionBigqueryConfigUseTopicSchema(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionBigqueryConfigWriteMetadata(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionPushConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
Expand Down Expand Up @@ -989,6 +1079,62 @@ func expandPubsubSubscriptionLabels(v interface{}, d TerraformResourceData, conf
return m, nil
}

func expandPubsubSubscriptionBigqueryConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedTable, err := expandPubsubSubscriptionBigqueryConfigTable(original["table"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedTable); val.IsValid() && !isEmptyValue(val) {
transformed["table"] = transformedTable
}

transformedUseTopicSchema, err := expandPubsubSubscriptionBigqueryConfigUseTopicSchema(original["use_topic_schema"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedUseTopicSchema); val.IsValid() && !isEmptyValue(val) {
transformed["useTopicSchema"] = transformedUseTopicSchema
}

transformedWriteMetadata, err := expandPubsubSubscriptionBigqueryConfigWriteMetadata(original["write_metadata"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedWriteMetadata); val.IsValid() && !isEmptyValue(val) {
transformed["writeMetadata"] = transformedWriteMetadata
}

transformedDropUnknownFields, err := expandPubsubSubscriptionBigqueryConfigDropUnknownFields(original["drop_unknown_fields"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedDropUnknownFields); val.IsValid() && !isEmptyValue(val) {
transformed["dropUnknownFields"] = transformedDropUnknownFields
}

return transformed, nil
}

func expandPubsubSubscriptionBigqueryConfigTable(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigUseTopicSchema(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigWriteMetadata(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionPushConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
Expand Down
80 changes: 80 additions & 0 deletions google/resource_pubsub_subscription_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,86 @@ resource "google_pubsub_subscription" "example" {
`, context)
}

func TestAccPubsubSubscription_pubsubSubscriptionPushBqExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": randString(t, 10),
}

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscription_pubsubSubscriptionPushBqExample(context),
},
{
ResourceName: "google_pubsub_subscription.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"topic"},
},
},
})
}

func testAccPubsubSubscription_pubsubSubscriptionPushBqExample(context map[string]interface{}) string {
return Nprintf(`
resource "google_pubsub_topic" "example" {
name = "tf-test-example-topic%{random_suffix}"
}
resource "google_pubsub_subscription" "example" {
name = "tf-test-example-subscription%{random_suffix}"
topic = google_pubsub_topic.example.name
bigquery_config {
table = "${google_bigquery_table.test.project}.${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}"
}
depends_on = [google_project_iam_member.viewer, google_project_iam_member.editor]
}
data "google_project" "project" {
}
resource "google_project_iam_member" "viewer" {
project = data.google_project.project.project_id
role = "roles/bigquery.metadataViewer"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}
resource "google_project_iam_member" "editor" {
project = data.google_project.project.project_id
role = "roles/bigquery.dataEditor"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}
resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_example_dataset%{random_suffix}"
}
resource "google_bigquery_table" "test" {
deletion_protection = false
table_id = "tf_test_example_table%{random_suffix}"
dataset_id = google_bigquery_dataset.test.dataset_id
schema = <<EOF
[
{
"name": "data",
"type": "STRING",
"mode": "NULLABLE",
"description": "The data"
}
]
EOF
}
`, context)
}

func testAccCheckPubsubSubscriptionDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for name, rs := range s.RootModule().Resources {
Expand Down
Loading