Skip to content

Commit

Permalink
feat: Add Pub/Sub Schema support for changing definitions via revisio…
Browse files Browse the repository at this point in the history
…ns (#8079) (#14857)

* Add support for Pub/Sub schema evolution

* Add example for Pub/Sub schema evolution

* Remove changes not to be made in a single PR

* Remove changes not to be made in a single PR

* Remove changes not to be made in a single PR

* Fix spacing

* Test update, import

* Reduce flakiness

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician authored Jun 8, 2023
1 parent f86079e commit d898bd8
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .changelog/8079.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: Allowed `definition` of `google_pubsub_schema` to change without deleting and recreating the resource by using schema revisions (https://cloud.google.com/pubsub/docs/schemas#commit-schema-revision)
```
14 changes: 6 additions & 8 deletions google/resource_pubsub_schema_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func TestAccPubsubSchema_pubsubSchemaBasicExample(t *testing.T) {
Config: testAccPubsubSchema_pubsubSchemaBasicExample(context),
},
{
ResourceName: "google_pubsub_schema.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"definition"},
ResourceName: "google_pubsub_schema.example",
ImportState: true,
ImportStateVerify: true,
},
},
})
Expand Down Expand Up @@ -82,10 +81,9 @@ func TestAccPubsubSchema_pubsubSchemaProtobufExample(t *testing.T) {
Config: testAccPubsubSchema_pubsubSchemaProtobufExample(context),
},
{
ResourceName: "google_pubsub_schema.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"definition"},
ResourceName: "google_pubsub_schema.example",
ImportState: true,
ImportStateVerify: true,
},
},
})
Expand Down
86 changes: 86 additions & 0 deletions google/resource_pubsub_schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package google

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-provider-google/google/acctest"
)

func TestAccPubsubSchema_update(t *testing.T) {
t.Parallel()

schema := fmt.Sprintf("tf-test-schema-%s", RandString(t, 10))

VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
ExternalProviders: map[string]resource.ExternalProvider{
"time": {},
},
Steps: []resource.TestStep{
{
Config: testAccPubsubSchema_basic(schema),
},
{
ResourceName: "google_pubsub_schema.foo",
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccPubsubSchema_updated(schema),
},
{
ResourceName: "google_pubsub_schema.foo",
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccPubsubSchema_basic(schema string) string {
return fmt.Sprintf(`
resource "google_pubsub_schema" "foo" {
name = "%s"
type = "PROTOCOL_BUFFER"
definition = "syntax = \"proto3\";\nmessage Results {\nstring message_request = 1;\nstring message_response = 2;\n}"
}
# Need to introduce delay for updates in order for tests to complete
# successfully due to caching effects.
resource "time_sleep" "wait_121_seconds" {
create_duration = "121s"
lifecycle {
replace_triggered_by = [
google_pubsub_schema.foo
]
}
}
`, schema)
}

func testAccPubsubSchema_updated(schema string) string {
return fmt.Sprintf(`
resource "google_pubsub_schema" "foo" {
name = "%s"
type = "PROTOCOL_BUFFER"
definition = "syntax = \"proto3\";\nmessage Results {\nstring message_request = 1;\nstring message_response = 2;\nstring timestamp_request = 3;\n}"
}
# Need to introduce delay for updates in order for tests to complete
# successfully due to caching effects.
resource "time_sleep" "wait_121_seconds" {
create_duration = "121s"
lifecycle {
replace_triggered_by = [
google_pubsub_schema.foo
]
}
}
`, schema)
}
90 changes: 88 additions & 2 deletions google/services/pubsub/resource_pubsub_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func ResourcePubsubSchema() *schema.Resource {
return &schema.Resource{
Create: resourcePubsubSchemaCreate,
Read: resourcePubsubSchemaRead,
Update: resourcePubsubSchemaUpdate,
Delete: resourcePubsubSchemaDelete,

Importer: &schema.ResourceImporter{
Expand All @@ -42,6 +43,7 @@ func ResourcePubsubSchema() *schema.Resource {

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(20 * time.Minute),
Update: schema.DefaultTimeout(20 * time.Minute),
Delete: schema.DefaultTimeout(20 * time.Minute),
},

Expand All @@ -56,15 +58,13 @@ func ResourcePubsubSchema() *schema.Resource {
"definition": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: `The definition of the schema.
This should contain a string representing the full definition of the schema
that is a valid schema definition of the type specified in type.`,
},
"type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: verify.ValidateEnum([]string{"TYPE_UNSPECIFIED", "PROTOCOL_BUFFER", "AVRO", ""}),
Description: `The type of the schema definition Default value: "TYPE_UNSPECIFIED" Possible values: ["TYPE_UNSPECIFIED", "PROTOCOL_BUFFER", "AVRO"]`,
Default: "TYPE_UNSPECIFIED",
Expand Down Expand Up @@ -235,13 +235,87 @@ func resourcePubsubSchemaRead(d *schema.ResourceData, meta interface{}) error {
if err := d.Set("type", flattenPubsubSchemaType(res["type"], d, config)); err != nil {
return fmt.Errorf("Error reading Schema: %s", err)
}
if err := d.Set("definition", flattenPubsubSchemaDefinition(res["definition"], d, config)); err != nil {
return fmt.Errorf("Error reading Schema: %s", err)
}
if err := d.Set("name", flattenPubsubSchemaName(res["name"], d, config)); err != nil {
return fmt.Errorf("Error reading Schema: %s", err)
}

return nil
}

func resourcePubsubSchemaUpdate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*transport_tpg.Config)
userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent)
if err != nil {
return err
}

billingProject := ""

project, err := tpgresource.GetProject(d, config)
if err != nil {
return fmt.Errorf("Error fetching project for Schema: %s", err)
}
billingProject = project

obj := make(map[string]interface{})
typeProp, err := expandPubsubSchemaType(d.Get("type"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("type"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, typeProp)) {
obj["type"] = typeProp
}
definitionProp, err := expandPubsubSchemaDefinition(d.Get("definition"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("definition"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, definitionProp)) {
obj["definition"] = definitionProp
}
nameProp, err := expandPubsubSchemaName(d.Get("name"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("name"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, nameProp)) {
obj["name"] = nameProp
}

obj, err = resourcePubsubSchemaUpdateEncoder(d, meta, obj)
if err != nil {
return err
}

url, err := tpgresource.ReplaceVars(d, config, "{{PubsubBasePath}}projects/{{project}}/schemas/{{name}}:commit")
if err != nil {
return err
}

log.Printf("[DEBUG] Updating Schema %q: %#v", d.Id(), obj)

// err == nil indicates that the billing_project value was found
if bp, err := tpgresource.GetBillingProject(d, config); err == nil {
billingProject = bp
}

res, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "POST",
Project: billingProject,
RawURL: url,
UserAgent: userAgent,
Body: obj,
Timeout: d.Timeout(schema.TimeoutUpdate),
})

if err != nil {
return fmt.Errorf("Error updating Schema %q: %s", d.Id(), err)
} else {
log.Printf("[DEBUG] Finished updating Schema %q: %#v", d.Id(), res)
}

return resourcePubsubSchemaRead(d, meta)
}

func resourcePubsubSchemaDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*transport_tpg.Config)
userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent)
Expand Down Expand Up @@ -316,6 +390,10 @@ func flattenPubsubSchemaType(v interface{}, d *schema.ResourceData, config *tran
return v
}

func flattenPubsubSchemaDefinition(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubSchemaName(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return v
Expand All @@ -334,3 +412,11 @@ func expandPubsubSchemaDefinition(v interface{}, d tpgresource.TerraformResource
func expandPubsubSchemaName(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return tpgresource.GetResourceNameFromSelfLink(v.(string)), nil
}

func resourcePubsubSchemaUpdateEncoder(d *schema.ResourceData, meta interface{}, obj map[string]interface{}) (map[string]interface{}, error) {
newObj := make(map[string]interface{})
newObj["name"] = d.Id()
obj["name"] = d.Id()
newObj["schema"] = obj
return newObj, nil
}
1 change: 1 addition & 0 deletions website/docs/r/pubsub_schema.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ This resource provides the following
[Timeouts](https://developer.hashicorp.com/terraform/plugin/sdkv2/resources/retries-and-customizable-timeouts) configuration options:

- `create` - Default is 20 minutes.
- `update` - Default is 20 minutes.
- `delete` - Default is 20 minutes.

## Import
Expand Down

0 comments on commit d898bd8

Please sign in to comment.