Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingestion settings to google_pubsub_topic #17604

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/9985.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: added `ingestion_data_source_settings` field to `google_pubsub_topic` resource
```
189 changes: 189 additions & 0 deletions google/services/pubsub/resource_pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,53 @@ func ResourcePubsubTopic() *schema.Resource {
DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName,
Description: `Name of the topic.`,
},
"ingestion_data_source_settings": {
Type: schema.TypeList,
Optional: true,
Description: `Settings for ingestion from a data source into this topic.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"aws_kinesis": {
Type: schema.TypeList,
Optional: true,
Description: `Settings for ingestion from Amazon Kinesis Data Streams.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"aws_role_arn": {
Type: schema.TypeString,
Required: true,
Description: `AWS role ARN to be used for Federated Identity authentication with
Kinesis. Check the Pub/Sub docs for how to set up this role and the
required permissions that need to be attached to it.`,
},
"consumer_arn": {
Type: schema.TypeString,
Required: true,
Description: `The Kinesis consumer ARN to used for ingestion in
Enhanced Fan-Out mode. The consumer must be already
created and ready to be used.`,
},
"gcp_service_account": {
Type: schema.TypeString,
Required: true,
Description: `The GCP service account to be used for Federated Identity authentication
with Kinesis (via a 'AssumeRoleWithWebIdentity' call for the provided
role). The 'awsRoleArn' must be set up with 'accounts.google.com:sub'
equals to this service account number.`,
},
"stream_arn": {
Type: schema.TypeString,
Required: true,
Description: `The Kinesis stream ARN to ingest data from.`,
},
},
},
},
},
},
},
"kms_key_name": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -207,6 +254,12 @@ func resourcePubsubTopicCreate(d *schema.ResourceData, meta interface{}) error {
} else if v, ok := d.GetOkExists("message_retention_duration"); !tpgresource.IsEmptyValue(reflect.ValueOf(messageRetentionDurationProp)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) {
obj["messageRetentionDuration"] = messageRetentionDurationProp
}
ingestionDataSourceSettingsProp, err := expandPubsubTopicIngestionDataSourceSettings(d.Get("ingestion_data_source_settings"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("ingestion_data_source_settings"); !tpgresource.IsEmptyValue(reflect.ValueOf(ingestionDataSourceSettingsProp)) && (ok || !reflect.DeepEqual(v, ingestionDataSourceSettingsProp)) {
obj["ingestionDataSourceSettings"] = ingestionDataSourceSettingsProp
}
labelsProp, err := expandPubsubTopicEffectiveLabels(d.Get("effective_labels"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -371,6 +424,9 @@ func resourcePubsubTopicRead(d *schema.ResourceData, meta interface{}) error {
if err := d.Set("message_retention_duration", flattenPubsubTopicMessageRetentionDuration(res["messageRetentionDuration"], d, config)); err != nil {
return fmt.Errorf("Error reading Topic: %s", err)
}
if err := d.Set("ingestion_data_source_settings", flattenPubsubTopicIngestionDataSourceSettings(res["ingestionDataSourceSettings"], d, config)); err != nil {
return fmt.Errorf("Error reading Topic: %s", err)
}
if err := d.Set("terraform_labels", flattenPubsubTopicTerraformLabels(res["labels"], d, config)); err != nil {
return fmt.Errorf("Error reading Topic: %s", err)
}
Expand Down Expand Up @@ -421,6 +477,12 @@ func resourcePubsubTopicUpdate(d *schema.ResourceData, meta interface{}) error {
} else if v, ok := d.GetOkExists("message_retention_duration"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) {
obj["messageRetentionDuration"] = messageRetentionDurationProp
}
ingestionDataSourceSettingsProp, err := expandPubsubTopicIngestionDataSourceSettings(d.Get("ingestion_data_source_settings"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("ingestion_data_source_settings"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, ingestionDataSourceSettingsProp)) {
obj["ingestionDataSourceSettings"] = ingestionDataSourceSettingsProp
}
labelsProp, err := expandPubsubTopicEffectiveLabels(d.Get("effective_labels"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -457,6 +519,10 @@ func resourcePubsubTopicUpdate(d *schema.ResourceData, meta interface{}) error {
updateMask = append(updateMask, "messageRetentionDuration")
}

if d.HasChange("ingestion_data_source_settings") {
updateMask = append(updateMask, "ingestionDataSourceSettings")
}

if d.HasChange("effective_labels") {
updateMask = append(updateMask, "labels")
}
Expand Down Expand Up @@ -632,6 +698,54 @@ func flattenPubsubTopicMessageRetentionDuration(v interface{}, d *schema.Resourc
return v
}

func flattenPubsubTopicIngestionDataSourceSettings(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["aws_kinesis"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesis(original["awsKinesis"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesis(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["stream_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(original["streamArn"], d, config)
transformed["consumer_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(original["consumerArn"], d, config)
transformed["aws_role_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(original["awsRoleArn"], d, config)
transformed["gcp_service_account"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(original["gcpServiceAccount"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicTerraformLabels(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return v
Expand Down Expand Up @@ -720,6 +834,81 @@ func expandPubsubTopicMessageRetentionDuration(v interface{}, d tpgresource.Terr
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettings(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedAwsKinesis, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesis(original["aws_kinesis"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedAwsKinesis); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["awsKinesis"] = transformedAwsKinesis
}

return transformed, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesis(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedStreamArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(original["stream_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedStreamArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["streamArn"] = transformedStreamArn
}

transformedConsumerArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(original["consumer_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedConsumerArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["consumerArn"] = transformedConsumerArn
}

transformedAwsRoleArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(original["aws_role_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedAwsRoleArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["awsRoleArn"] = transformedAwsRoleArn
}

transformedGcpServiceAccount, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(original["gcp_service_account"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedGcpServiceAccount); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["gcpServiceAccount"] = transformedGcpServiceAccount
}

return transformed, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicEffectiveLabels(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) {
if v == nil {
return map[string]string{}, nil
Expand Down
43 changes: 43 additions & 0 deletions google/services/pubsub/resource_pubsub_topic_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,49 @@ resource "google_pubsub_topic" "example" {
`, context)
}

func TestAccPubsubTopic_pubsubTopicIngestionKinesisExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubTopicDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubTopic_pubsubTopicIngestionKinesisExample(context),
},
{
ResourceName: "google_pubsub_topic.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"labels", "terraform_labels"},
},
},
})
}

func testAccPubsubTopic_pubsubTopicIngestionKinesisExample(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_pubsub_topic" "example" {
name = "tf-test-example-topic%{random_suffix}"

# Outside of automated terraform-provider-google CI tests, these values must be of actual AWS resources for the test to pass.
ingestion_data_source_settings {
aws_kinesis {
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
gcp_service_account = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
}
}
}
`, context)
}

func testAccCheckPubsubTopicDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for name, rs := range s.RootModule().Resources {
Expand Down
68 changes: 68 additions & 0 deletions google/services/pubsub/resource_pubsub_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,38 @@ func TestAccPubsubTopic_migration(t *testing.T) {
})
}

func TestAccPubsubTopic_kinesisIngestionUpdate(t *testing.T) {
t.Parallel()

topic := fmt.Sprintf("tf-test-topic-%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubTopicDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubTopic_updateWithKinesisIngestionSettings(topic),
},
{
ResourceName: "google_pubsub_topic.foo",
ImportStateId: topic,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccPubsubTopic_updateWithUpdatedKinesisIngestionSettings(topic),
},
{
ResourceName: "google_pubsub_topic.foo",
ImportStateId: topic,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccPubsubTopic_update(topic, key, value string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
Expand Down Expand Up @@ -214,3 +246,39 @@ resource "google_pubsub_topic" "bar" {
}
`, schema, topic)
}

func testAccPubsubTopic_updateWithKinesisIngestionSettings(topic string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
name = "%s"

# Outside of automated terraform-provider-google CI tests, these values must be of actual AWS resources for the test to pass.
ingestion_data_source_settings {
aws_kinesis {
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
gcp_service_account = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
}
}
}
`, topic)
}

func testAccPubsubTopic_updateWithUpdatedKinesisIngestionSettings(topic string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
name = "%s"

# Outside of automated terraform-provider-google CI tests, these values must be of actual AWS resources for the test to pass.
ingestion_data_source_settings {
aws_kinesis {
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/updated-fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/updated-fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/updated-fake-role-name"
gcp_service_account = "updated-fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
}
}
}
`, topic)
}
Loading
Loading