Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Pub/Sub Subscription support for specifying a service account #18444

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10967.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: added `bigquery_config.service_account_email` field to `google_pubsub_subscription` resource
```
48 changes: 48 additions & 0 deletions google/services/pubsub/resource_pubsub_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ If all three are empty, then the subscriber will pull and ack messages using API
Description: `When true and use_topic_schema or use_table_schema is true, any fields that are a part of the topic schema or message schema that
are not part of the BigQuery table schema are dropped when writing to BigQuery. Otherwise, the schemas must be kept in sync
and any messages with extra fields are not written and remain in the subscription's backlog.`,
},
"service_account_email": {
Type: schema.TypeString,
Optional: true,
Description: `The service account to use to write to BigQuery. If not specified, the Pub/Sub
[service agent](https://cloud.google.com/iam/docs/service-agents),
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com, is used.`,
},
"use_table_schema": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -209,6 +216,13 @@ May not exceed the subscription's acknowledgement deadline.
A duration in seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".`,
Default: "300s",
},
"service_account_email": {
Type: schema.TypeString,
Optional: true,
Description: `The service account to use to write to Cloud Storage. If not specified, the Pub/Sub
[service agent](https://cloud.google.com/iam/docs/service-agents),
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com, is used.`,
},
"state": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -1090,6 +1104,8 @@ func flattenPubsubSubscriptionBigqueryConfig(v interface{}, d *schema.ResourceDa
flattenPubsubSubscriptionBigqueryConfigWriteMetadata(original["writeMetadata"], d, config)
transformed["drop_unknown_fields"] =
flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(original["dropUnknownFields"], d, config)
transformed["service_account_email"] =
flattenPubsubSubscriptionBigqueryConfigServiceAccountEmail(original["serviceAccountEmail"], d, config)
return []interface{}{transformed}
}
func flattenPubsubSubscriptionBigqueryConfigTable(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
Expand All @@ -1112,6 +1128,10 @@ func flattenPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d *
return v
}

func flattenPubsubSubscriptionBigqueryConfigServiceAccountEmail(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubSubscriptionCloudStorageConfig(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
Expand All @@ -1137,6 +1157,8 @@ func flattenPubsubSubscriptionCloudStorageConfig(v interface{}, d *schema.Resour
flattenPubsubSubscriptionCloudStorageConfigState(original["state"], d, config)
transformed["avro_config"] =
flattenPubsubSubscriptionCloudStorageConfigAvroConfig(original["avroConfig"], d, config)
transformed["service_account_email"] =
flattenPubsubSubscriptionCloudStorageConfigServiceAccountEmail(original["serviceAccountEmail"], d, config)
return []interface{}{transformed}
}
func flattenPubsubSubscriptionCloudStorageConfigBucket(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
Expand Down Expand Up @@ -1197,6 +1219,10 @@ func flattenPubsubSubscriptionCloudStorageConfigAvroConfigWriteMetadata(v interf
return v
}

func flattenPubsubSubscriptionCloudStorageConfigServiceAccountEmail(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubSubscriptionPushConfig(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
Expand Down Expand Up @@ -1463,6 +1489,13 @@ func expandPubsubSubscriptionBigqueryConfig(v interface{}, d tpgresource.Terrafo
transformed["dropUnknownFields"] = transformedDropUnknownFields
}

transformedServiceAccountEmail, err := expandPubsubSubscriptionBigqueryConfigServiceAccountEmail(original["service_account_email"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedServiceAccountEmail); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["serviceAccountEmail"] = transformedServiceAccountEmail
}

return transformed, nil
}

Expand All @@ -1486,6 +1519,10 @@ func expandPubsubSubscriptionBigqueryConfigDropUnknownFields(v interface{}, d tp
return v, nil
}

func expandPubsubSubscriptionBigqueryConfigServiceAccountEmail(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionCloudStorageConfig(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
Expand Down Expand Up @@ -1551,6 +1588,13 @@ func expandPubsubSubscriptionCloudStorageConfig(v interface{}, d tpgresource.Ter
transformed["avroConfig"] = transformedAvroConfig
}

transformedServiceAccountEmail, err := expandPubsubSubscriptionCloudStorageConfigServiceAccountEmail(original["service_account_email"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedServiceAccountEmail); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["serviceAccountEmail"] = transformedServiceAccountEmail
}

return transformed, nil
}

Expand Down Expand Up @@ -1605,6 +1649,10 @@ func expandPubsubSubscriptionCloudStorageConfigAvroConfigWriteMetadata(v interfa
return v, nil
}

func expandPubsubSubscriptionCloudStorageConfigServiceAccountEmail(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionPushConfig(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
Expand Down
162 changes: 162 additions & 0 deletions google/services/pubsub/resource_pubsub_subscription_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,92 @@ EOF
`, context)
}

func TestAccPubsubSubscription_pubsubSubscriptionPushBqServiceAccountExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscription_pubsubSubscriptionPushBqServiceAccountExample(context),
},
{
ResourceName: "google_pubsub_subscription.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"labels", "terraform_labels", "topic"},
},
},
})
}

func testAccPubsubSubscription_pubsubSubscriptionPushBqServiceAccountExample(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_pubsub_topic" "example" {
name = "tf-test-example-topic%{random_suffix}"
}

resource "google_pubsub_subscription" "example" {
name = "tf-test-example-subscription%{random_suffix}"
topic = google_pubsub_topic.example.id

bigquery_config {
table = "${google_bigquery_table.test.project}.${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}"
service_account_email = google_service_account.bq_write_service_account.email
}

depends_on = [google_service_account.bq_write_service_account, google_project_iam_member.viewer, google_project_iam_member.editor]
}

data "google_project" "project" {
}

resource "google_service_account" "bq_write_service_account" {
account_id = "tf-test-example-bqw%{random_suffix}"
display_name = "BQ Write Service Account"
}

resource "google_project_iam_member" "viewer" {
project = data.google_project.project.project_id
role = "roles/bigquery.metadataViewer"
member = "serviceAccount:${google_service_account.bq_write_service_account.email}"
}

resource "google_project_iam_member" "editor" {
project = data.google_project.project.project_id
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.bq_write_service_account.email}"
}

resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_example_dataset%{random_suffix}"
}

resource "google_bigquery_table" "test" {
deletion_protection = false
table_id = "tf_test_example_table%{random_suffix}"
dataset_id = google_bigquery_dataset.test.dataset_id

schema = <<EOF
[
{
"name": "data",
"type": "STRING",
"mode": "NULLABLE",
"description": "The data"
}
]
EOF
}
`, context)
}

func TestAccPubsubSubscription_pubsubSubscriptionPushCloudstorageExample(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -487,6 +573,82 @@ resource "google_storage_bucket_iam_member" "admin" {
`, context)
}

func TestAccPubsubSubscription_pubsubSubscriptionPushCloudstorageServiceAccountExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscription_pubsubSubscriptionPushCloudstorageServiceAccountExample(context),
},
{
ResourceName: "google_pubsub_subscription.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"labels", "terraform_labels", "topic"},
},
},
})
}

func testAccPubsubSubscription_pubsubSubscriptionPushCloudstorageServiceAccountExample(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_storage_bucket" "example" {
name = "tf-test-example-bucket%{random_suffix}"
location = "US"
uniform_bucket_level_access = true
}

resource "google_pubsub_topic" "example" {
name = "tf-test-example-topic%{random_suffix}"
}

resource "google_pubsub_subscription" "example" {
name = "tf-test-example-subscription%{random_suffix}"
topic = google_pubsub_topic.example.id

cloud_storage_config {
bucket = google_storage_bucket.example.name

filename_prefix = "pre-"
filename_suffix = "-%{random_suffix}"
filename_datetime_format = "YYYY-MM-DD/hh_mm_ssZ"

max_bytes = 1000
max_duration = "300s"

service_account_email = google_service_account.storage_write_service_account.email
}
depends_on = [
google_service_account.storage_write_service_account,
google_storage_bucket.example,
google_storage_bucket_iam_member.admin,
]
}

data "google_project" "project" {
}

resource "google_service_account" "storage_write_service_account" {
account_id = "tf-test-example-stw%{random_suffix}"
display_name = "Storage Write Service Account"
}

resource "google_storage_bucket_iam_member" "admin" {
bucket = google_storage_bucket.example.name
role = "roles/storage.admin"
member = "serviceAccount:${google_service_account.storage_write_service_account.email}"
}
`, context)
}

func testAccCheckPubsubSubscriptionDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for name, rs := range s.RootModule().Resources {
Expand Down
Loading